You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/04 09:30:22 UTC

[1/9] mesos git commit: Refactor synchronized to use mutex, recursive_mutex, atomic_flag.

Repository: mesos
Updated Branches:
  refs/heads/master f6c663927 -> 94c0f68dc


Refactor synchronized to use mutex, recursive_mutex, atomic_flag.

The synchronized(m) macro now supports passing any of the following by
pointer or reference:
  1. `std::mutex`
  2. `std::recursive_mutex`
  3. `std::atomic_flag`

You no longer need to initialize your synchronization primitives using
`Synchronizable` and `SYNCHRONIZED_INITIALIZER`.  Remember to
initialize `std::atomic_flag` with `ATOMIC_FLAG_INIT`.

Review: https://reviews.apache.org/r/32356


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f9d7738
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f9d7738
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f9d7738

Branch: refs/heads/master
Commit: 9f9d7738e0b7a0ff2bbd91ec8abdd41bef079261
Parents: f6c6639
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:19:14 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:19:15 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/clock.cpp        |  28 +++---
 3rdparty/libprocess/src/libev.cpp        |   7 +-
 3rdparty/libprocess/src/libev.hpp        |   5 +-
 3rdparty/libprocess/src/process.cpp      |  64 +++++++-------
 3rdparty/libprocess/src/synchronized.hpp | 120 ++++++++------------------
 5 files changed, 86 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9f9d7738/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
index bff5310..afeb43c 100644
--- a/3rdparty/libprocess/src/clock.cpp
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -2,6 +2,7 @@
 
 #include <list>
 #include <map>
+#include <mutex>
 #include <set>
 
 #include <process/clock.hpp>
@@ -21,6 +22,7 @@
 
 using std::list;
 using std::map;
+using std::recursive_mutex;
 using std::set;
 
 namespace process {
@@ -29,7 +31,7 @@ namespace process {
 // timer so that we can have two timers that have the same timeout. We
 // exploit that the map is SORTED!
 static map<Time, list<Timer>>* timers = new map<Time, list<Timer>>();
-static synchronizable(timers) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+static recursive_mutex* timers_mutex = new recursive_mutex();
 
 
 // We namespace the clock related variables to keep them well
@@ -126,7 +128,7 @@ void tick(const Time& time)
 {
   list<Timer> timedout;
 
-  synchronized (timers) {
+  synchronized (timers_mutex) {
     // We pass NULL to be explicit about the fact that we want the
     // global clock time, even though it's unnecessary ('tick' is
     // called from the event loop, not a Process context).
@@ -173,7 +175,7 @@ void tick(const Time& time)
   // Mark 'settling' as false since there are not any more timers
   // that will expire before the paused time and we've finished
   // executing expired timers.
-  synchronized (timers) {
+  synchronized (timers_mutex) {
     if (clock::paused &&
         (timers->size() == 0 ||
          timers->begin()->first > *clock::current)) {
@@ -200,7 +202,7 @@ Time Clock::now()
 
 Time Clock::now(ProcessBase* process)
 {
-  synchronized (timers) {
+  synchronized (timers_mutex) {
     if (Clock::paused()) {
       if (process != NULL) {
         if (clock::currents->count(process) != 0) {
@@ -244,7 +246,7 @@ Timer Clock::timer(
           << " in the future (" << timeout.time() << ")";
 
   // Add the timer.
-  synchronized (timers) {
+  synchronized (timers_mutex) {
     if (timers->size() == 0 ||
         timer.timeout().time() < timers->begin()->first) {
       // Need to interrupt the loop to update/set timer repeat.
@@ -266,7 +268,7 @@ Timer Clock::timer(
 bool Clock::cancel(const Timer& timer)
 {
   bool canceled = false;
-  synchronized (timers) {
+  synchronized (timers_mutex) {
     // Check if the timeout is still pending, and if so, erase it. In
     // addition, erase an empty list if we just removed the last
     // timeout.
@@ -288,7 +290,7 @@ void Clock::pause()
 {
   process::initialize(); // To make sure the event loop is ready.
 
-  synchronized (timers) {
+  synchronized (timers_mutex) {
     if (!clock::paused) {
       *clock::initial = *clock::current = now();
       clock::paused = true;
@@ -320,7 +322,7 @@ void Clock::resume()
 {
   process::initialize(); // To make sure the event loop is ready.
 
-  synchronized (timers) {
+  synchronized (timers_mutex) {
     if (clock::paused) {
       VLOG(2) << "Clock resumed at " << clock::current;
 
@@ -337,7 +339,7 @@ void Clock::resume()
 
 void Clock::advance(const Duration& duration)
 {
-  synchronized (timers) {
+  synchronized (timers_mutex) {
     if (clock::paused) {
       *clock::advanced += duration;
       *clock::current += duration;
@@ -355,7 +357,7 @@ void Clock::advance(const Duration& duration)
 
 void Clock::advance(ProcessBase* process, const Duration& duration)
 {
-  synchronized (timers) {
+  synchronized (timers_mutex) {
     if (clock::paused) {
       Time current = now(process);
       current += duration;
@@ -374,7 +376,7 @@ void Clock::advance(ProcessBase* process, const Duration& duration)
 
 void Clock::update(const Time& time)
 {
-  synchronized (timers) {
+  synchronized (timers_mutex) {
     if (clock::paused) {
       if (*clock::current < time) {
         *clock::advanced += (time - *clock::current);
@@ -393,7 +395,7 @@ void Clock::update(const Time& time)
 
 void Clock::update(ProcessBase* process, const Time& time, Update update)
 {
-  synchronized (timers) {
+  synchronized (timers_mutex) {
     if (clock::paused) {
       if (now(process) < time || update == Clock::FORCE) {
         VLOG(2) << "Clock of " << process->self() << " updated to " << time;
@@ -418,7 +420,7 @@ void Clock::order(ProcessBase* from, ProcessBase* to)
 
 bool Clock::settled()
 {
-  synchronized (timers) {
+  synchronized (timers_mutex) {
     CHECK(clock::paused);
 
     if (clock::settling) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f9d7738/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
index 610dfb8..2b8c68d 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -1,5 +1,6 @@
 #include <ev.h>
 
+#include <mutex>
 #include <queue>
 
 #include <stout/duration.hpp>
@@ -19,7 +20,7 @@ ev_async async_watcher;
 
 std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
 
-synchronizable(watchers);
+std::mutex* watchers_mutex = new std::mutex();
 
 std::queue<lambda::function<void(void)>>* functions =
   new std::queue<lambda::function<void(void)>>();
@@ -29,7 +30,7 @@ ThreadLocal<bool>* _in_event_loop_ = new ThreadLocal<bool>();
 
 void handle_async(struct ev_loop* loop, ev_async* _, int revents)
 {
-  synchronized (watchers) {
+  synchronized (watchers_mutex) {
     // Start all the new I/O watchers.
     while (!watchers->empty()) {
       ev_io* watcher = watchers->front();
@@ -47,8 +48,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
 
 void EventLoop::initialize()
 {
-  synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
-
   loop = ev_default_loop(EVFLAG_AUTO);
 
   ev_async_init(&async_watcher, handle_async);

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f9d7738/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
index e4a403d..6c2015b 100644
--- a/3rdparty/libprocess/src/libev.hpp
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -3,6 +3,7 @@
 
 #include <ev.h>
 
+#include <mutex>
 #include <queue>
 
 #include <process/future.hpp>
@@ -27,7 +28,7 @@ extern ev_async async_watcher;
 // TODO(benh): Replace this queue with functions that we put in
 // 'functions' below that perform the ev_io_start themselves.
 extern std::queue<ev_io*>* watchers;
-extern synchronizable(watchers);
+extern std::mutex* watchers_mutex;
 
 // Queue of functions to be invoked asynchronously within the vent
 // loop (protected by 'watchers' above).
@@ -72,7 +73,7 @@ Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
   Future<T> future = promise->future();
 
   // Enqueue the function.
-  synchronized (watchers) {
+  synchronized (watchers_mutex) {
     functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f9d7738/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 4a5ab79..834670b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -34,6 +34,7 @@
 #include <list>
 #include <map>
 #include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
+#include <mutex>
 #include <queue>
 #include <set>
 #include <sstream>
@@ -120,10 +121,10 @@ namespace ID {
 string generate(const string& prefix)
 {
   static map<string, int>* prefixes = new map<string, int>();
-  static synchronizable(prefixes) = SYNCHRONIZED_INITIALIZER;
+  static std::mutex* prefixes_mutex = new std::mutex();
 
   int id;
-  synchronized (prefixes) {
+  synchronized (prefixes_mutex) {
     int& _id = (*prefixes)[prefix];
     _id += 1;
     id = _id;
@@ -323,7 +324,7 @@ private:
   map<int, HttpProxy*> proxies;
 
   // Protects instance variables.
-  synchronizable(this);
+  std::recursive_mutex mutex;
 };
 
 
@@ -370,14 +371,14 @@ private:
 
   // Map of all local spawned and running processes.
   map<string, ProcessBase*> processes;
-  synchronizable(processes);
+  std::recursive_mutex processes_mutex;
 
-  // Gates for waiting threads (protected by synchronizable(processes)).
+  // Gates for waiting threads (protected by processes_mutex).
   map<ProcessBase*, Gate*> gates;
 
   // Queue of runnable processes (implemented using list).
   list<ProcessBase*> runq;
-  synchronizable(runq);
+  std::recursive_mutex runq_mutex;
 
   // Number of running processes, to support Clock::settle operation.
   int running;
@@ -409,7 +410,7 @@ static Gate* gate = new Gate();
 // recursive in case a filterer wants to do anything fancy (which is
 // possible and likely given that filters will get used for testing).
 static Filter* filterer = NULL;
-static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+static std::recursive_mutex* filterer_mutex = new std::recursive_mutex();
 
 // Global garbage collector.
 PID<GarbageCollector> gc;
@@ -1157,10 +1158,7 @@ void HttpProxy::stream(
 }
 
 
-SocketManager::SocketManager()
-{
-  synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-}
+SocketManager::SocketManager() {}
 
 
 SocketManager::~SocketManager() {}
@@ -1168,7 +1166,7 @@ SocketManager::~SocketManager() {}
 
 void SocketManager::accepted(const Socket& socket)
 {
-  synchronized (this) {
+  synchronized (mutex) {
     sockets[socket] = new Socket(socket);
   }
 }
@@ -1264,7 +1262,7 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
   Option<Socket> socket = None();
   bool connect = false;
 
-  synchronized (this) {
+  synchronized (mutex) {
     // Check if the socket address is remote and there isn't a persistant link.
     if (to.address != __address__  && persists.count(to.address) == 0) {
       // Okay, no link, let's create a socket.
@@ -1313,7 +1311,7 @@ PID<HttpProxy> SocketManager::proxy(const Socket& socket)
 {
   HttpProxy* proxy = NULL;
 
-  synchronized (this) {
+  synchronized (mutex) {
     // This socket might have been asked to get closed (e.g., remote
     // side hang up) while a process is attempting to handle an HTTP
     // request. Thus, if there is no more socket, return an empty PID.
@@ -1420,7 +1418,7 @@ void SocketManager::send(Encoder* encoder, bool persist)
 {
   CHECK(encoder != NULL);
 
-  synchronized (this) {
+  synchronized (mutex) {
     Socket socket = encoder->socket();
     if (sockets.count(socket) > 0) {
       // Update whether or not this socket should get disposed after
@@ -1517,7 +1515,7 @@ void SocketManager::send(Message* message)
   Option<Socket> socket = None();
   bool connect = false;
 
-  synchronized (this) {
+  synchronized (mutex) {
     // Check if there is already a socket.
     bool persist = persists.count(address) > 0;
     bool temp = temps.count(address) > 0;
@@ -1587,7 +1585,7 @@ Encoder* SocketManager::next(int s)
 {
   HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
 
-  synchronized (this) {
+  synchronized (mutex) {
     // We cannot assume 'sockets.count(s) > 0' here because it's
     // possible that 's' has been removed with a a call to
     // SocketManager::close. For example, it could be the case that a
@@ -1659,7 +1657,7 @@ void SocketManager::close(int s)
 {
   HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
 
-  synchronized (this) {
+  synchronized (mutex) {
     // This socket might not be active if it was already asked to get
     // closed (e.g., a write on the socket failed so we try and close
     // it and then later the recv side of the socket gets closed so we
@@ -1747,7 +1745,7 @@ void SocketManager::exited(const Address& address)
   // into ProcessManager ... then we wouldn't have to convince
   // ourselves that the accesses to each Process object will always be
   // valid.
-  synchronized (this) {
+  synchronized (mutex) {
     if (!links.remotes.contains(address)) {
       return; // No linkees for this socket address!
     }
@@ -1788,7 +1786,7 @@ void SocketManager::exited(ProcessBase* process)
   // can update the clocks of linked processes as appropriate.
   const Time time = Clock::now(process);
 
-  synchronized (this) {
+  synchronized (mutex) {
     // If this process had linked to anything, we need to clean
     // up any pointers to it. Also, if this process was the last
     // linker to a remote linkee, we must remove linkee from the
@@ -1843,8 +1841,6 @@ void SocketManager::exited(ProcessBase* process)
 ProcessManager::ProcessManager(const string& _delegate)
   : delegate(_delegate)
 {
-  synchronizer(processes) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-  synchronizer(runq) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
   running = 0;
   __sync_synchronize(); // Ensure write to 'running' visible in other threads.
 }
@@ -1857,7 +1853,7 @@ ProcessManager::~ProcessManager()
   // or process the whole map as terminating one process might
   // trigger other terminations. Deal with them one at a time.
   do {
-    synchronized (processes) {
+    synchronized (processes_mutex) {
       process = !processes.empty() ? processes.begin()->second : NULL;
     }
     if (process != NULL) {
@@ -1871,7 +1867,7 @@ ProcessManager::~ProcessManager()
 ProcessReference ProcessManager::use(const UPID& pid)
 {
   if (pid.address == __address__) {
-    synchronized (processes) {
+    synchronized (processes_mutex) {
       if (processes.count(pid.id) > 0) {
         // Note that the ProcessReference constructor _must_ get
         // called while holding the lock on processes so that waiting
@@ -2059,7 +2055,7 @@ UPID ProcessManager::spawn(ProcessBase* process, bool manage)
 {
   CHECK(process != NULL);
 
-  synchronized (processes) {
+  synchronized (processes_mutex) {
     if (processes.count(process->pid.id) > 0) {
       return UPID();
     } else {
@@ -2125,7 +2121,7 @@ void ProcessManager::resume(ProcessBase* process)
       CHECK(event != NULL);
 
       // Determine if we should filter this event.
-      synchronized (filterer) {
+      synchronized (filterer_mutex) {
         if (filterer != NULL) {
           bool filter = false;
           struct FilterVisitor : EventVisitor
@@ -2231,7 +2227,7 @@ void ProcessManager::cleanup(ProcessBase* process)
   Gate* gate = NULL;
 
   // Remove process.
-  synchronized (processes) {
+  synchronized (processes_mutex) {
     // Wait for all process references to get cleaned up.
     while (process->refs > 0) {
 #if defined(__i386__) || defined(__x86_64__)
@@ -2354,7 +2350,7 @@ bool ProcessManager::wait(const UPID& pid)
   ProcessBase* process = NULL; // Set to non-null if we donate thread.
 
   // Try and approach the gate if necessary.
-  synchronized (processes) {
+  synchronized (processes_mutex) {
     if (processes.count(pid.id) > 0) {
       process = processes[pid.id];
       CHECK(process->state != ProcessBase::TERMINATED);
@@ -2370,7 +2366,7 @@ bool ProcessManager::wait(const UPID& pid)
       // Check if it is runnable in order to donate this thread.
       if (process->state == ProcessBase::BOTTOM ||
           process->state == ProcessBase::READY) {
-        synchronized (runq) {
+        synchronized (runq_mutex) {
           list<ProcessBase*>::iterator it =
             find(runq.begin(), runq.end(), process);
           if (it != runq.end()) {
@@ -2429,7 +2425,7 @@ void ProcessManager::enqueue(ProcessBase* process)
   // it's not running. Otherwise, check and see which thread this
   // process was last running on, and put it on that threads runq.
 
-  synchronized (runq) {
+  synchronized (runq_mutex) {
     CHECK(find(runq.begin(), runq.end(), process) == runq.end());
     runq.push_back(process);
   }
@@ -2447,7 +2443,7 @@ ProcessBase* ProcessManager::dequeue()
 
   ProcessBase* process = NULL;
 
-  synchronized (runq) {
+  synchronized (runq_mutex) {
     if (!runq.empty()) {
       process = runq.front();
       runq.pop_front();
@@ -2482,7 +2478,7 @@ void ProcessManager::settle()
 
     done = true; // Assume to start that we are settled.
 
-    synchronized (runq) {
+    synchronized (runq_mutex) {
       if (!runq.empty()) {
         done = false;
         continue;
@@ -2509,7 +2505,7 @@ Future<Response> ProcessManager::__processes__(const Request&)
 {
   JSON::Array array;
 
-  synchronized (processes) {
+  synchronized (processes_mutex) {
     foreachvalue (ProcessBase* process, process_manager->processes) {
       JSON::Object object;
       object.values["id"] = process->pid.id;
@@ -2906,7 +2902,7 @@ void filter(Filter *filter)
 {
   process::initialize();
 
-  synchronized (filterer) {
+  synchronized (filterer_mutex) {
     filterer = filter;
   }
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f9d7738/3rdparty/libprocess/src/synchronized.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/synchronized.hpp b/3rdparty/libprocess/src/synchronized.hpp
index 6a341b8..462c46c 100644
--- a/3rdparty/libprocess/src/synchronized.hpp
+++ b/3rdparty/libprocess/src/synchronized.hpp
@@ -1,111 +1,61 @@
 #ifndef __SYNCHRONIZABLE_HPP__
 #define __SYNCHRONIZABLE_HPP__
 
-#include <pthread.h>
-
-#include <iostream>
-
-
-class Synchronizable
+#include <atomic>
+#include <mutex>
+#include <type_traits>
+
+// A helper class for the synchronized(m) macro. It is an RAII 'guard'
+// for a synchronization primitive 'T'. The general template handles
+// cases such as 'std::mutex' and 'std::recursive_mutex'.
+template <typename T>
+class Synchronized
 {
 public:
-  Synchronizable()
-    : initialized(false) {}
+  Synchronized(T* _lock) : lock(CHECK_NOTNULL(_lock)) { lock->lock(); }
+  Synchronized(T** _lock) : Synchronized(*CHECK_NOTNULL(_lock)) {}
 
-  explicit Synchronizable(int _type)
-    : type(_type), initialized(false)
-  {
-    initialize();
-  }
-
-  Synchronizable(const Synchronizable &that)
-  {
-    type = that.type;
-    initialize();
-  }
-
-  Synchronizable & operator = (const Synchronizable &that)
-  {
-    type = that.type;
-    initialize();
-    return *this;
-  }
-
-  void acquire()
-  {
-    if (!initialized) {
-      ABORT("synchronizable not initialized");
-    }
-    pthread_mutex_lock(&mutex);
-  }
-
-  void release()
-  {
-    if (!initialized) {
-      ABORT("synchronizable not initialized");
-    }
-    pthread_mutex_unlock(&mutex);
-  }
+  ~Synchronized() { lock->unlock(); }
 
+  operator bool() const { return true; }
 private:
-  void initialize()
-  {
-    if (!initialized) {
-      pthread_mutexattr_t attr;
-      pthread_mutexattr_init(&attr);
-      pthread_mutexattr_settype(&attr, type);
-      pthread_mutex_init(&mutex, &attr);
-      pthread_mutexattr_destroy(&attr);
-      initialized = true;
-    } else {
-      ABORT("synchronizable already initialized");
-    }
-  }
-
-  int type;
-  bool initialized;
-  pthread_mutex_t mutex;
+  T* lock;
 };
 
 
-class Synchronized
+// A specialization of the Synchronized class for 'std::atomic_flag'.
+// This is necessary as the locking functions are different.
+template <>
+class Synchronized<std::atomic_flag>
 {
 public:
-  explicit Synchronized(Synchronizable *_synchronizable)
-    : synchronizable(_synchronizable)
+  Synchronized(std::atomic_flag* _flag) : flag(CHECK_NOTNULL(_flag))
   {
-    synchronizable->acquire();
+    while (flag->test_and_set(std::memory_order_acquire)) {}
   }
+  Synchronized(std::atomic_flag** _flag)
+    : Synchronized(*CHECK_NOTNULL(_flag)) {}
 
   ~Synchronized()
   {
-    synchronizable->release();
+    flag->clear(std::memory_order_release);
   }
 
-  operator bool () { return true; }
-
+  operator bool() const { return true; }
 private:
-  Synchronizable *synchronizable;
+  std::atomic_flag* flag;
 };
 
 
-#define synchronized(s)                                                 \
-  if (Synchronized __synchronized ## s = Synchronized(&__synchronizable_ ## s))
-
-#define synchronizable(s)                       \
-  Synchronizable __synchronizable_ ## s
-
-#define synchronizer(s)                         \
-  (__synchronizable_ ## s)
-
-
-#define SYNCHRONIZED_INITIALIZER                \
-  Synchronizable(PTHREAD_MUTEX_NORMAL)
-
-#define SYNCHRONIZED_INITIALIZER_DEBUG          \
-  Synchronizable(PTHREAD_MUTEX_ERRORCHECK)
-
-#define SYNCHRONIZED_INITIALIZER_RECURSIVE      \
-  Synchronizable(PTHREAD_MUTEX_RECURSIVE)
+// A macro for acquiring a scoped 'guard' on any type that can satisfy
+// the 'Synchronized' interface. Currently this includes 'std::mutex',
+// 'std::recursive_mutex' and 'std::atomic_flag'.
+// Example:
+//   std::mutex m;
+//   synchronized (m) {
+//     // Do something under the lock.
+//   }
+#define synchronized(m)                                                 \
+  if (auto __ ## __file__ ## _ ## __line__ ## __lock = Synchronized<typename std::remove_pointer<decltype(m)>::type>(&m)) // NOLINT(whitespace/line_length)
 
 #endif // __SYNCHRONIZABLE_HPP__


[5/9] mesos git commit: Refactor Mutex to use synchronized.

Posted by be...@apache.org.
Refactor Mutex to use synchronized.

Review: https://reviews.apache.org/r/32360


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e8d6f914
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e8d6f914
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e8d6f914

Branch: refs/heads/master
Commit: e8d6f9145a6107860b1c98f141ff714e43bd3378
Parents: 37373fb
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:25:37 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:25:38 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/mutex.hpp | 18 +++++++-----------
 1 file changed, 7 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e8d6f914/3rdparty/libprocess/include/process/mutex.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/mutex.hpp b/3rdparty/libprocess/include/process/mutex.hpp
index 99dd14f..8fff089 100644
--- a/3rdparty/libprocess/include/process/mutex.hpp
+++ b/3rdparty/libprocess/include/process/mutex.hpp
@@ -1,14 +1,15 @@
 #ifndef __PROCESS_MUTEX_HPP__
 #define __PROCESS_MUTEX_HPP__
 
+#include <atomic>
 #include <memory>
 #include <queue>
 
 #include <process/future.hpp>
-#include <process/internal.hpp>
 #include <process/owned.hpp>
 
 #include <stout/nothing.hpp>
+#include <stout/synchronized.hpp>
 
 namespace process {
 
@@ -21,8 +22,7 @@ public:
   {
     Future<Nothing> future = Nothing();
 
-    internal::acquire(&data->lock);
-    {
+    synchronized (data->lock) {
       if (!data->locked) {
         data->locked = true;
       } else {
@@ -31,7 +31,6 @@ public:
         future = promise->future();
       }
     }
-    internal::release(&data->lock);
 
     return future;
   }
@@ -43,8 +42,7 @@ public:
     // trigger callbacks that try to reacquire the lock.
     Owned<Promise<Nothing>> promise;
 
-    internal::acquire(&data->lock);
-    {
+    synchronized (data->lock) {
       if (!data->promises.empty()) {
         // TODO(benh): Skip a future that has been discarded?
         promise = data->promises.front();
@@ -53,7 +51,6 @@ public:
         data->locked = false;
       }
     }
-    internal::release(&data->lock);
 
     if (promise.get() != NULL) {
       promise->set(Nothing());
@@ -63,7 +60,7 @@ public:
 private:
   struct Data
   {
-    Data() : lock(0), locked(false) {}
+    Data() : lock(ATOMIC_FLAG_INIT), locked(false) {}
 
     ~Data()
     {
@@ -71,9 +68,8 @@ private:
     }
 
     // Rather than use a process to serialize access to the mutex's
-    // internal data we use a low-level "lock" which we acquire and
-    // release using atomic builtins.
-    int lock;
+    // internal data we use a 'std::atomic_flag'.
+    std::atomic_flag lock;
 
     // Describes the state of this mutex.
     bool locked;


[4/9] mesos git commit: Refactor Future to use synchronized.

Posted by be...@apache.org.
Refactor Future to use synchronized.

Review: https://reviews.apache.org/r/32358


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37373fb3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37373fb3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37373fb3

Branch: refs/heads/master
Commit: 37373fb3318770b4971831c15c563f399787d065
Parents: 889daa9
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:25:23 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:25:24 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp | 52 ++++++---------------
 1 file changed, 15 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/37373fb3/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index c22d6c8..75cbe12 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -4,6 +4,7 @@
 #include <assert.h>
 #include <stdlib.h> // For abort.
 
+#include <atomic>
 #include <iostream>
 #include <list>
 #include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
@@ -14,7 +15,6 @@
 #include <glog/logging.h>
 
 #include <process/clock.hpp>
-#include <process/internal.hpp>
 #include <process/latch.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
@@ -27,6 +27,7 @@
 #include <stout/none.hpp>
 #include <stout/option.hpp>
 #include <stout/preprocessor.hpp>
+#include <stout/synchronized.hpp>
 #include <stout/try.hpp>
 
 namespace process {
@@ -383,7 +384,7 @@ private:
 
     void clearAllCallbacks();
 
-    int lock;
+    std::atomic_flag lock;
     State state;
     bool discard;
     bool associated;
@@ -598,8 +599,7 @@ bool Promise<T>::associate(const Future<T>& future)
 {
   bool associated = false;
 
-  internal::acquire(&f.data->lock);
-  {
+  synchronized (f.data->lock) {
     // Don't associate if this promise has completed. Note that this
     // does not include if Future::discard was called on this future
     // since in that case that would still leave the future PENDING
@@ -616,7 +616,6 @@ bool Promise<T>::associate(const Future<T>& future)
       // another.
     }
   }
-  internal::release(&f.data->lock);
 
   // Note that we do the actual associating after releasing the lock
   // above to avoid deadlocking by attempting to require the lock
@@ -763,14 +762,12 @@ bool Promise<T>::discard(Future<T> future)
 
   bool result = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == Future<T>::PENDING) {
       data->state = Future<T>::DISCARDED;
       result = true;
     }
   }
-  internal::release(&data->lock);
 
   // Invoke all callbacks associated with this future being
   // DISCARDED. We don't need a lock because the state is now in
@@ -797,7 +794,7 @@ Future<T> Future<T>::failed(const std::string& message)
 
 template <typename T>
 Future<T>::Data::Data()
-  : lock(0),
+  : lock(ATOMIC_FLAG_INIT),
     state(PENDING),
     discard(false),
     associated(false),
@@ -912,8 +909,7 @@ bool Future<T>::discard()
   bool result = false;
 
   std::vector<DiscardCallback> callbacks;
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (!data->discard && data->state == PENDING) {
       result = data->discard = true;
 
@@ -929,7 +925,6 @@ bool Future<T>::discard()
       data->onDiscardCallbacks.clear();
     }
   }
-  internal::release(&data->lock);
 
   // Invoke all callbacks associated with doing a discard on this
   // future. We don't need a lock because 'Data::discard' should now
@@ -1007,14 +1002,12 @@ bool Future<T>::await(const Duration& duration) const
 
   bool pending = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == PENDING) {
       pending = true;
       data->onAnyCallbacks.push_back(lambda::bind(&internal::awaited, latch));
     }
   }
-  internal::release(&data->lock);
 
   if (pending) {
     return latch->await(duration);
@@ -1058,15 +1051,13 @@ const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
 {
   bool run = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->discard) {
       run = true;
     } else if (data->state == PENDING) {
       data->onDiscardCallbacks.emplace_back(std::move(callback));
     }
   }
-  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
@@ -1082,15 +1073,13 @@ const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
 {
   bool run = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == READY) {
       run = true;
     } else if (data->state == PENDING) {
       data->onReadyCallbacks.emplace_back(std::move(callback));
     }
   }
-  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
@@ -1106,15 +1095,13 @@ const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
 {
   bool run = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == FAILED) {
       run = true;
     } else if (data->state == PENDING) {
       data->onFailedCallbacks.emplace_back(std::move(callback));
     }
   }
-  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
@@ -1130,15 +1117,13 @@ const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
 {
   bool run = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == DISCARDED) {
       run = true;
     } else if (data->state == PENDING) {
       data->onDiscardedCallbacks.emplace_back(std::move(callback));
     }
   }
-  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
@@ -1154,15 +1139,13 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
 {
   bool run = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == PENDING) {
       data->onAnyCallbacks.emplace_back(std::move(callback));
     } else {
       run = true;
     }
   }
-  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
@@ -1172,7 +1155,6 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
   return *this;
 }
 
-
 namespace internal {
 
 // NOTE: We need to name this 'thenf' versus 'then' to distinguish it
@@ -1360,15 +1342,13 @@ bool Future<T>::set(const T& _t)
 {
   bool result = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == PENDING) {
       data->t = new T(_t);
       data->state = READY;
       result = true;
     }
   }
-  internal::release(&data->lock);
 
   // Invoke all callbacks associated with this future being READY. We
   // don't need a lock because the state is now in READY so there
@@ -1389,15 +1369,13 @@ bool Future<T>::fail(const std::string& _message)
 {
   bool result = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == PENDING) {
       data->message = new std::string(_message);
       data->state = FAILED;
       result = true;
     }
   }
-  internal::release(&data->lock);
 
   // Invoke all callbacks associated with this future being FAILED. We
   // don't need a lock because the state is now in FAILED so there


[7/9] mesos git commit: Refactor Metrics::Timer to use synchronized.

Posted by be...@apache.org.
Refactor Metrics::Timer to use synchronized.

Review: https://reviews.apache.org/r/32362


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8323bcab
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8323bcab
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8323bcab

Branch: refs/heads/master
Commit: 8323bcab310e0f45a53ba46df8b6d90e1cabbbf3
Parents: 4153a7e
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:26:04 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:26:04 2015 -0700

----------------------------------------------------------------------
 .../include/process/metrics/timer.hpp           | 24 +++++++-------------
 1 file changed, 8 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8323bcab/3rdparty/libprocess/include/process/metrics/timer.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/timer.hpp b/3rdparty/libprocess/include/process/metrics/timer.hpp
index b6f9fbd..92cef2b 100644
--- a/3rdparty/libprocess/include/process/metrics/timer.hpp
+++ b/3rdparty/libprocess/include/process/metrics/timer.hpp
@@ -1,18 +1,19 @@
 #ifndef __PROCESS_METRICS_TIMER_HPP__
 #define __PROCESS_METRICS_TIMER_HPP__
 
+#include <atomic>
 #include <memory>
 #include <string>
 
 #include <process/clock.hpp>
 #include <process/future.hpp>
-#include <process/internal.hpp>
 
 #include <process/metrics/metric.hpp>
 
 #include <stout/duration.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/option.hpp>
+#include <stout/synchronized.hpp>
 #include <stout/try.hpp>
 
 namespace process {
@@ -33,15 +34,13 @@ public:
   {
     Future<double> value;
 
-    process::internal::acquire(&data->lock);
-    {
+    synchronized (data->lock) {
       if (data->lastValue.isSome()) {
         value = data->lastValue.get();
       } else {
         value = Failure("No value");
       }
     }
-    process::internal::release(&data->lock);
 
     return value;
   }
@@ -49,11 +48,9 @@ public:
   // Start the Timer.
   void start()
   {
-    process::internal::acquire(&data->lock);
-    {
+    synchronized (data->lock) {
       data->start = Clock::now();
     }
-    process::internal::release(&data->lock);
   }
 
   // Stop the Timer.
@@ -65,15 +62,13 @@ public:
 
     double value;
 
-    process::internal::acquire(&data->lock);
-    {
+    synchronized (data->lock) {
       t = T(stop - data->start);
 
       data->lastValue = t.value();
 
       value = data->lastValue.get();
     }
-    process::internal::release(&data->lock);
 
     push(value);
 
@@ -94,9 +89,8 @@ public:
 
 private:
   struct Data {
-    Data() : lock(0) {}
-
-    int lock;
+    Data() : lock(ATOMIC_FLAG_INIT) {}
+    std::atomic_flag lock;
     Time start;
     Option<double> lastValue;
   };
@@ -107,12 +101,10 @@ private:
 
     double value;
 
-    process::internal::acquire(&that.data->lock);
-    {
+    synchronized (that.data->lock) {
       that.data->lastValue = T(stop - start).value();
       value = that.data->lastValue.get();
     }
-    process::internal::release(&that.data->lock);
 
     that.push(value);
   }


[9/9] mesos git commit: Refactor http to use synchronized.

Posted by be...@apache.org.
Refactor http to use synchronized.

Review: https://reviews.apache.org/r/32364


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/94c0f68d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/94c0f68d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/94c0f68d

Branch: refs/heads/master
Commit: 94c0f68dc8a32b6cd768c18d6d8d65d40833296f
Parents: 8c2d8d4
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:26:28 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:26:29 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/http.hpp | 11 +++++++----
 3rdparty/libprocess/src/http.cpp             | 22 ++++++----------------
 2 files changed, 13 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/94c0f68d/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index bba62b3..51a00f5 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -3,6 +3,7 @@
 
 #include <stdint.h>
 
+#include <atomic>
 #include <iosfwd>
 #include <memory>
 #include <queue>
@@ -191,12 +192,14 @@ public:
 private:
   struct Data
   {
-    Data() : lock(0), readEnd(Reader::OPEN), writeEnd(Writer::OPEN) {}
+    Data()
+      : lock(ATOMIC_FLAG_INIT),
+        readEnd(Reader::OPEN),
+        writeEnd(Writer::OPEN) {}
 
     // Rather than use a process to serialize access to the pipe's
-    // internal data we use a low-level "lock" which we acquire and
-    // release using atomic builtins.
-    int lock;
+    // internal data we use a 'std::atomic_flag'.
+    std::atomic_flag lock;
 
     Reader::State readEnd;
     Writer::State writeEnd;

http://git-wip-us.apache.org/repos/asf/mesos/blob/94c0f68d/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 1d318b9..0898335 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -18,7 +18,6 @@
 
 #include <process/future.hpp>
 #include <process/http.hpp>
-#include <process/internal.hpp>
 #include <process/owned.hpp>
 #include <process/socket.hpp>
 
@@ -30,6 +29,7 @@
 #include <stout/numify.hpp>
 #include <stout/option.hpp>
 #include <stout/strings.hpp>
+#include <stout/synchronized.hpp>
 #include <stout/try.hpp>
 
 #include "decoder.hpp"
@@ -179,8 +179,7 @@ Future<string> Pipe::Reader::read()
 {
   Future<string> future;
 
-  process::internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->readEnd == Reader::CLOSED) {
       future = Failure("closed");
     } else if (!data->writes.empty()) {
@@ -196,7 +195,6 @@ Future<string> Pipe::Reader::read()
       future = data->reads.back()->future();
     }
   }
-  process::internal::release(&data->lock);
 
   return future;
 }
@@ -208,8 +206,7 @@ bool Pipe::Reader::close()
   bool notify = false;
   queue<Owned<Promise<string>>> reads;
 
-  process::internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->readEnd == Reader::OPEN) {
       // Throw away outstanding data.
       while (!data->writes.empty()) {
@@ -226,7 +223,6 @@ bool Pipe::Reader::close()
       notify = data->writeEnd == Writer::OPEN;
     }
   }
-  process::internal::release(&data->lock);
 
   // NOTE: We transition the promises outside the critical section
   // to avoid triggering callbacks that try to reacquire the lock.
@@ -250,8 +246,7 @@ bool Pipe::Writer::write(const string& s)
   bool written = false;
   Owned<Promise<string>> read;
 
-  process::internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     // Ignore writes if either end of the pipe is closed or failed!
     if (data->writeEnd == Writer::OPEN && data->readEnd == Reader::OPEN) {
       // Don't bother surfacing empty writes to the readers.
@@ -266,7 +261,6 @@ bool Pipe::Writer::write(const string& s)
       written = true;
     }
   }
-  process::internal::release(&data->lock);
 
   // NOTE: We set the promise outside the critical section to avoid
   // triggering callbacks that try to reacquire the lock.
@@ -283,8 +277,7 @@ bool Pipe::Writer::close()
   bool closed = false;
   queue<Owned<Promise<string>>> reads;
 
-  process::internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->writeEnd == Writer::OPEN) {
       // Extract all the pending reads so we can complete them.
       std::swap(data->reads, reads);
@@ -293,7 +286,6 @@ bool Pipe::Writer::close()
       closed = true;
     }
   }
-  process::internal::release(&data->lock);
 
   // NOTE: We set the promises outside the critical section to avoid
   // triggering callbacks that try to reacquire the lock.
@@ -311,8 +303,7 @@ bool Pipe::Writer::fail(const string& message)
   bool failed = false;
   queue<Owned<Promise<string>>> reads;
 
-  process::internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->writeEnd == Writer::OPEN) {
       // Extract all the pending reads so we can fail them.
       std::swap(data->reads, reads);
@@ -322,7 +313,6 @@ bool Pipe::Writer::fail(const string& message)
       failed = true;
     }
   }
-  process::internal::release(&data->lock);
 
   // NOTE: We set the promises outside the critical section to avoid
   // triggering callbacks that try to reacquire the lock.


[2/9] mesos git commit: Move synchronized.hpp into stout.

Posted by be...@apache.org.
Move synchronized.hpp into stout.

Review: https://reviews.apache.org/r/35012


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/90c10cbd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/90c10cbd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/90c10cbd

Branch: refs/heads/master
Commit: 90c10cbd6ff183b2ed00cfd98451ea5b9abd9f5b
Parents: 9f9d773
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:24:45 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:24:45 2015 -0700

----------------------------------------------------------------------
 .../3rdparty/stout/include/Makefile.am          |  1 +
 .../stout/include/stout/synchronized.hpp        | 75 ++++++++++++++++++++
 2 files changed, 76 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/90c10cbd/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/Makefile.am b/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
index 79239d7..6ac2f04 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
@@ -72,6 +72,7 @@ nobase_include_HEADERS =		\
   stout/strings.hpp			\
   stout/subcommand.hpp			\
   stout/svn.hpp				\
+  stout/synchronized.hpp		\
   stout/tests/utils.hpp			\
   stout/thread.hpp			\
   stout/try.hpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/90c10cbd/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
new file mode 100644
index 0000000..60eaf26
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STOUT_SYNCHRONIZED_HPP__
+#define __STOUT_SYNCHRONIZED_HPP__
+
+#include <atomic>
+#include <mutex>
+#include <type_traits>
+
+// A helper class for the synchronized(m) macro. It is an RAII 'guard'
+// for a synchronization primitive 'T'. The general template handles
+// cases such as 'std::mutex' and 'std::recursive_mutex'.
+template <typename T>
+class Synchronized
+{
+public:
+  Synchronized(T* _lock) : lock(CHECK_NOTNULL(_lock)) { lock->lock(); }
+  Synchronized(T** _lock) : Synchronized(*CHECK_NOTNULL(_lock)) {}
+
+  ~Synchronized() { lock->unlock(); }
+
+  operator bool() const { return true; }
+private:
+  T* lock;
+};
+
+
+// A specialization of the Synchronized class for 'std::atomic_flag'.
+// This is necessary as the locking functions are different.
+template <>
+class Synchronized<std::atomic_flag>
+{
+public:
+  Synchronized(std::atomic_flag* _flag) : flag(CHECK_NOTNULL(_flag))
+  {
+    while (flag->test_and_set(std::memory_order_acquire)) {}
+  }
+  Synchronized(std::atomic_flag** _flag)
+    : Synchronized(*CHECK_NOTNULL(_flag)) {}
+
+  ~Synchronized()
+  {
+    flag->clear(std::memory_order_release);
+  }
+
+  operator bool() const { return true; }
+private:
+  std::atomic_flag* flag;
+};
+
+
+// A macro for acquiring a scoped 'guard' on any type that can satisfy
+// the 'Synchronized' interface. Currently this includes 'std::mutex',
+// 'std::recursive_mutex' and 'std::atomic_flag'.
+// Example:
+//   std::mutex m;
+//   synchronized (m) {
+//     // Do something under the lock.
+//   }
+#define synchronized(m)                                                 \
+  if (auto __ ## __file__ ## _ ## __line__ ## __lock = Synchronized<typename std::remove_pointer<decltype(m)>::type>(&m)) // NOLINT(whitespace/line_length)
+
+#endif // __STOUT_SYNCHRONIZED_HPP__


[6/9] mesos git commit: Refactor Queue to use synchronized.

Posted by be...@apache.org.
Refactor Queue to use synchronized.

Review: https://reviews.apache.org/r/32361


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4153a7e0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4153a7e0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4153a7e0

Branch: refs/heads/master
Commit: 4153a7e0681eb4131bd8a5395e4e038ce62324e8
Parents: e8d6f91
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:25:51 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:25:51 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/queue.hpp | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4153a7e0/3rdparty/libprocess/include/process/queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/queue.hpp b/3rdparty/libprocess/include/process/queue.hpp
index df8efc0..1496b38 100644
--- a/3rdparty/libprocess/include/process/queue.hpp
+++ b/3rdparty/libprocess/include/process/queue.hpp
@@ -1,14 +1,16 @@
 #ifndef __PROCESS_QUEUE_HPP__
 #define __PROCESS_QUEUE_HPP__
 
+#include <atomic>
 #include <deque>
 #include <memory>
 #include <queue>
 
 #include <process/future.hpp>
-#include <process/internal.hpp>
 #include <process/owned.hpp>
 
+#include <stout/synchronized.hpp>
+
 namespace process {
 
 template <typename T>
@@ -24,8 +26,7 @@ public:
     // trigger callbacks that try to reacquire the lock.
     Owned<Promise<T>> promise;
 
-    internal::acquire(&data->lock);
-    {
+    synchronized (data->lock) {
       if (data->promises.empty()) {
         data->elements.push(t);
       } else {
@@ -33,7 +34,6 @@ public:
         data->promises.pop_front();
       }
     }
-    internal::release(&data->lock);
 
     if (promise.get() != NULL) {
       promise->set(t);
@@ -44,8 +44,7 @@ public:
   {
     Future<T> future;
 
-    internal::acquire(&data->lock);
-    {
+    synchronized (data->lock) {
       if (data->elements.empty()) {
         data->promises.push_back(Owned<Promise<T>>(new Promise<T>()));
         future = data->promises.back()->future();
@@ -54,7 +53,6 @@ public:
         data->elements.pop();
       }
     }
-    internal::release(&data->lock);
 
     return future;
   }
@@ -62,7 +60,7 @@ public:
 private:
   struct Data
   {
-    Data() : lock(0) {}
+    Data() : lock(ATOMIC_FLAG_INIT) {}
 
     ~Data()
     {
@@ -70,9 +68,8 @@ private:
     }
 
     // Rather than use a process to serialize access to the queue's
-    // internal data we use a low-level "lock" which we acquire and
-    // release using atomic builtins.
-    int lock;
+    // internal data we use a 'std::atomic_flag'.
+    std::atomic_flag lock;
 
     // Represents "waiters" for elements from the queue.
     std::deque<Owned<Promise<T>>> promises;


[8/9] mesos git commit: Refactor Metrics::Metric to use synchronized.

Posted by be...@apache.org.
Refactor Metrics::Metric to use synchronized.

Review: https://reviews.apache.org/r/32363


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8c2d8d47
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8c2d8d47
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8c2d8d47

Branch: refs/heads/master
Commit: 8c2d8d47f2bed2c15a5db2d0dc1afb33025f4af0
Parents: 8323bca
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:26:16 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:26:16 2015 -0700

----------------------------------------------------------------------
 .../libprocess/include/process/metrics/metric.hpp    | 15 ++++++---------
 1 file changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8c2d8d47/3rdparty/libprocess/include/process/metrics/metric.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/metric.hpp b/3rdparty/libprocess/include/process/metrics/metric.hpp
index a7be2d7..44a7d5a 100644
--- a/3rdparty/libprocess/include/process/metrics/metric.hpp
+++ b/3rdparty/libprocess/include/process/metrics/metric.hpp
@@ -1,17 +1,18 @@
 #ifndef __PROCESS_METRICS_METRIC_HPP__
 #define __PROCESS_METRICS_METRIC_HPP__
 
+#include <atomic>
 #include <memory>
 #include <string>
 
 #include <process/future.hpp>
-#include <process/internal.hpp>
 #include <process/owned.hpp>
 #include <process/statistics.hpp>
 #include <process/timeseries.hpp>
 
 #include <stout/duration.hpp>
 #include <stout/option.hpp>
+#include <stout/synchronized.hpp>
 
 namespace process {
 namespace metrics {
@@ -33,11 +34,9 @@ public:
     Option<Statistics<double>> statistics = None();
 
     if (data->history.isSome()) {
-      internal::acquire(&data->lock);
-      {
+      synchronized (data->lock) {
         statistics = Statistics<double>::from(*data->history.get());
       }
-      internal::release(&data->lock);
     }
 
     return statistics;
@@ -53,11 +52,9 @@ protected:
     if (data->history.isSome()) {
       Time now = Clock::now();
 
-      internal::acquire(&data->lock);
-      {
+      synchronized (data->lock) {
         data->history.get()->set(value, now);
       }
-      internal::release(&data->lock);
     }
   }
 
@@ -65,7 +62,7 @@ private:
   struct Data {
     Data(const std::string& _name, const Option<Duration>& window)
       : name(_name),
-        lock(0),
+        lock(ATOMIC_FLAG_INIT),
         history(None())
     {
       if (window.isSome()) {
@@ -76,7 +73,7 @@ private:
 
     const std::string name;
 
-    int lock;
+    std::atomic_flag lock;
 
     Option<Owned<TimeSeries<double>>> history;
   };


[3/9] mesos git commit: Move synchronized.hpp out of libprocess.

Posted by be...@apache.org.
Move synchronized.hpp out of libprocess.

Review: https://reviews.apache.org/r/35013


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/889daa98
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/889daa98
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/889daa98

Branch: refs/heads/master
Commit: 889daa989fd19a12afb2560faef5d58d9af5db99
Parents: 90c10cb
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:25:01 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:25:02 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am          |  1 -
 3rdparty/libprocess/src/clock.cpp        |  2 +-
 3rdparty/libprocess/src/libev.hpp        |  3 +-
 3rdparty/libprocess/src/libevent.cpp     |  3 +-
 3rdparty/libprocess/src/process.cpp      |  2 +-
 3rdparty/libprocess/src/synchronized.hpp | 61 ---------------------------
 6 files changed, 5 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/889daa98/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 6e8972f..489ce35 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -51,7 +51,6 @@ libprocess_la_SOURCES =		\
   src/reap.cpp			\
   src/socket.cpp		\
   src/subprocess.cpp		\
-  src/synchronized.hpp		\
   src/timeseries.cpp
 
 libprocess_la_CPPFLAGS =		\

http://git-wip-us.apache.org/repos/asf/mesos/blob/889daa98/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
index afeb43c..dd726c1 100644
--- a/3rdparty/libprocess/src/clock.cpp
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -14,11 +14,11 @@
 #include <stout/duration.hpp>
 #include <stout/foreach.hpp>
 #include <stout/lambda.hpp>
+#include <stout/synchronized.hpp>
 #include <stout/try.hpp>
 #include <stout/unreachable.hpp>
 
 #include "event_loop.hpp"
-#include "synchronized.hpp"
 
 using std::list;
 using std::map;

http://git-wip-us.apache.org/repos/asf/mesos/blob/889daa98/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
index 6c2015b..a0a2f49 100644
--- a/3rdparty/libprocess/src/libev.hpp
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -10,10 +10,9 @@
 #include <process/owned.hpp>
 
 #include <stout/lambda.hpp>
+#include <stout/synchronized.hpp>
 #include <stout/thread.hpp>
 
-#include "synchronized.hpp"
-
 namespace process {
 
 // Event loop.

http://git-wip-us.apache.org/repos/asf/mesos/blob/889daa98/3rdparty/libprocess/src/libevent.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.cpp b/3rdparty/libprocess/src/libevent.cpp
index d27fcb9..fb03859 100644
--- a/3rdparty/libprocess/src/libevent.cpp
+++ b/3rdparty/libprocess/src/libevent.cpp
@@ -5,9 +5,10 @@
 
 #include <process/logging.hpp>
 
+#include <stout/synchronized.hpp>
+
 #include "event_loop.hpp"
 #include "libevent.hpp"
-#include "synchronized.hpp"
 
 namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/889daa98/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 834670b..f1d3e5b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -74,6 +74,7 @@
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/strings.hpp>
+#include <stout/synchronized.hpp>
 #include <stout/thread.hpp>
 #include <stout/unreachable.hpp>
 
@@ -83,7 +84,6 @@
 #include "event_loop.hpp"
 #include "gate.hpp"
 #include "process_reference.hpp"
-#include "synchronized.hpp"
 
 using namespace process::metrics::internal;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/889daa98/3rdparty/libprocess/src/synchronized.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/synchronized.hpp b/3rdparty/libprocess/src/synchronized.hpp
deleted file mode 100644
index 462c46c..0000000
--- a/3rdparty/libprocess/src/synchronized.hpp
+++ /dev/null
@@ -1,61 +0,0 @@
-#ifndef __SYNCHRONIZABLE_HPP__
-#define __SYNCHRONIZABLE_HPP__
-
-#include <atomic>
-#include <mutex>
-#include <type_traits>
-
-// A helper class for the synchronized(m) macro. It is an RAII 'guard'
-// for a synchronization primitive 'T'. The general template handles
-// cases such as 'std::mutex' and 'std::recursive_mutex'.
-template <typename T>
-class Synchronized
-{
-public:
-  Synchronized(T* _lock) : lock(CHECK_NOTNULL(_lock)) { lock->lock(); }
-  Synchronized(T** _lock) : Synchronized(*CHECK_NOTNULL(_lock)) {}
-
-  ~Synchronized() { lock->unlock(); }
-
-  operator bool() const { return true; }
-private:
-  T* lock;
-};
-
-
-// A specialization of the Synchronized class for 'std::atomic_flag'.
-// This is necessary as the locking functions are different.
-template <>
-class Synchronized<std::atomic_flag>
-{
-public:
-  Synchronized(std::atomic_flag* _flag) : flag(CHECK_NOTNULL(_flag))
-  {
-    while (flag->test_and_set(std::memory_order_acquire)) {}
-  }
-  Synchronized(std::atomic_flag** _flag)
-    : Synchronized(*CHECK_NOTNULL(_flag)) {}
-
-  ~Synchronized()
-  {
-    flag->clear(std::memory_order_release);
-  }
-
-  operator bool() const { return true; }
-private:
-  std::atomic_flag* flag;
-};
-
-
-// A macro for acquiring a scoped 'guard' on any type that can satisfy
-// the 'Synchronized' interface. Currently this includes 'std::mutex',
-// 'std::recursive_mutex' and 'std::atomic_flag'.
-// Example:
-//   std::mutex m;
-//   synchronized (m) {
-//     // Do something under the lock.
-//   }
-#define synchronized(m)                                                 \
-  if (auto __ ## __file__ ## _ ## __line__ ## __lock = Synchronized<typename std::remove_pointer<decltype(m)>::type>(&m)) // NOLINT(whitespace/line_length)
-
-#endif // __SYNCHRONIZABLE_HPP__