You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/04 09:30:22 UTC
[1/9] mesos git commit: Refactor synchronized to use mutex,
recursive_mutex, atomic_flag.
Repository: mesos
Updated Branches:
refs/heads/master f6c663927 -> 94c0f68dc
Refactor synchronized to use mutex, recursive_mutex, atomic_flag.
The synchronized(m) macro now supports passing any of the following by
pointer or reference:
1. `std::mutex`
2. `std::recursive_mutex`
3. `std::atomic_flag`
You no longer need to initialize your synchronization primitives using
`Synchronizable` and `SYNCHRONIZED_INITIALIZER`. Remember to
initialize `std::atomic_flag` with `ATOMIC_FLAG_INIT`.
Review: https://reviews.apache.org/r/32356
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f9d7738
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f9d7738
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f9d7738
Branch: refs/heads/master
Commit: 9f9d7738e0b7a0ff2bbd91ec8abdd41bef079261
Parents: f6c6639
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:19:14 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:19:15 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/src/clock.cpp | 28 +++---
3rdparty/libprocess/src/libev.cpp | 7 +-
3rdparty/libprocess/src/libev.hpp | 5 +-
3rdparty/libprocess/src/process.cpp | 64 +++++++-------
3rdparty/libprocess/src/synchronized.hpp | 120 ++++++++------------------
5 files changed, 86 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9f9d7738/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
index bff5310..afeb43c 100644
--- a/3rdparty/libprocess/src/clock.cpp
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -2,6 +2,7 @@
#include <list>
#include <map>
+#include <mutex>
#include <set>
#include <process/clock.hpp>
@@ -21,6 +22,7 @@
using std::list;
using std::map;
+using std::recursive_mutex;
using std::set;
namespace process {
@@ -29,7 +31,7 @@ namespace process {
// timer so that we can have two timers that have the same timeout. We
// exploit that the map is SORTED!
static map<Time, list<Timer>>* timers = new map<Time, list<Timer>>();
-static synchronizable(timers) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+static recursive_mutex* timers_mutex = new recursive_mutex();
// We namespace the clock related variables to keep them well
@@ -126,7 +128,7 @@ void tick(const Time& time)
{
list<Timer> timedout;
- synchronized (timers) {
+ synchronized (timers_mutex) {
// We pass NULL to be explicit about the fact that we want the
// global clock time, even though it's unnecessary ('tick' is
// called from the event loop, not a Process context).
@@ -173,7 +175,7 @@ void tick(const Time& time)
// Mark 'settling' as false since there are not any more timers
// that will expire before the paused time and we've finished
// executing expired timers.
- synchronized (timers) {
+ synchronized (timers_mutex) {
if (clock::paused &&
(timers->size() == 0 ||
timers->begin()->first > *clock::current)) {
@@ -200,7 +202,7 @@ Time Clock::now()
Time Clock::now(ProcessBase* process)
{
- synchronized (timers) {
+ synchronized (timers_mutex) {
if (Clock::paused()) {
if (process != NULL) {
if (clock::currents->count(process) != 0) {
@@ -244,7 +246,7 @@ Timer Clock::timer(
<< " in the future (" << timeout.time() << ")";
// Add the timer.
- synchronized (timers) {
+ synchronized (timers_mutex) {
if (timers->size() == 0 ||
timer.timeout().time() < timers->begin()->first) {
// Need to interrupt the loop to update/set timer repeat.
@@ -266,7 +268,7 @@ Timer Clock::timer(
bool Clock::cancel(const Timer& timer)
{
bool canceled = false;
- synchronized (timers) {
+ synchronized (timers_mutex) {
// Check if the timeout is still pending, and if so, erase it. In
// addition, erase an empty list if we just removed the last
// timeout.
@@ -288,7 +290,7 @@ void Clock::pause()
{
process::initialize(); // To make sure the event loop is ready.
- synchronized (timers) {
+ synchronized (timers_mutex) {
if (!clock::paused) {
*clock::initial = *clock::current = now();
clock::paused = true;
@@ -320,7 +322,7 @@ void Clock::resume()
{
process::initialize(); // To make sure the event loop is ready.
- synchronized (timers) {
+ synchronized (timers_mutex) {
if (clock::paused) {
VLOG(2) << "Clock resumed at " << clock::current;
@@ -337,7 +339,7 @@ void Clock::resume()
void Clock::advance(const Duration& duration)
{
- synchronized (timers) {
+ synchronized (timers_mutex) {
if (clock::paused) {
*clock::advanced += duration;
*clock::current += duration;
@@ -355,7 +357,7 @@ void Clock::advance(const Duration& duration)
void Clock::advance(ProcessBase* process, const Duration& duration)
{
- synchronized (timers) {
+ synchronized (timers_mutex) {
if (clock::paused) {
Time current = now(process);
current += duration;
@@ -374,7 +376,7 @@ void Clock::advance(ProcessBase* process, const Duration& duration)
void Clock::update(const Time& time)
{
- synchronized (timers) {
+ synchronized (timers_mutex) {
if (clock::paused) {
if (*clock::current < time) {
*clock::advanced += (time - *clock::current);
@@ -393,7 +395,7 @@ void Clock::update(const Time& time)
void Clock::update(ProcessBase* process, const Time& time, Update update)
{
- synchronized (timers) {
+ synchronized (timers_mutex) {
if (clock::paused) {
if (now(process) < time || update == Clock::FORCE) {
VLOG(2) << "Clock of " << process->self() << " updated to " << time;
@@ -418,7 +420,7 @@ void Clock::order(ProcessBase* from, ProcessBase* to)
bool Clock::settled()
{
- synchronized (timers) {
+ synchronized (timers_mutex) {
CHECK(clock::paused);
if (clock::settling) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/9f9d7738/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
index 610dfb8..2b8c68d 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -1,5 +1,6 @@
#include <ev.h>
+#include <mutex>
#include <queue>
#include <stout/duration.hpp>
@@ -19,7 +20,7 @@ ev_async async_watcher;
std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
-synchronizable(watchers);
+std::mutex* watchers_mutex = new std::mutex();
std::queue<lambda::function<void(void)>>* functions =
new std::queue<lambda::function<void(void)>>();
@@ -29,7 +30,7 @@ ThreadLocal<bool>* _in_event_loop_ = new ThreadLocal<bool>();
void handle_async(struct ev_loop* loop, ev_async* _, int revents)
{
- synchronized (watchers) {
+ synchronized (watchers_mutex) {
// Start all the new I/O watchers.
while (!watchers->empty()) {
ev_io* watcher = watchers->front();
@@ -47,8 +48,6 @@ void handle_async(struct ev_loop* loop, ev_async* _, int revents)
void EventLoop::initialize()
{
- synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
-
loop = ev_default_loop(EVFLAG_AUTO);
ev_async_init(&async_watcher, handle_async);
http://git-wip-us.apache.org/repos/asf/mesos/blob/9f9d7738/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
index e4a403d..6c2015b 100644
--- a/3rdparty/libprocess/src/libev.hpp
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -3,6 +3,7 @@
#include <ev.h>
+#include <mutex>
#include <queue>
#include <process/future.hpp>
@@ -27,7 +28,7 @@ extern ev_async async_watcher;
// TODO(benh): Replace this queue with functions that we put in
// 'functions' below that perform the ev_io_start themselves.
extern std::queue<ev_io*>* watchers;
-extern synchronizable(watchers);
+extern std::mutex* watchers_mutex;
// Queue of functions to be invoked asynchronously within the vent
// loop (protected by 'watchers' above).
@@ -72,7 +73,7 @@ Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
Future<T> future = promise->future();
// Enqueue the function.
- synchronized (watchers) {
+ synchronized (watchers_mutex) {
functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/9f9d7738/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 4a5ab79..834670b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -34,6 +34,7 @@
#include <list>
#include <map>
#include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
+#include <mutex>
#include <queue>
#include <set>
#include <sstream>
@@ -120,10 +121,10 @@ namespace ID {
string generate(const string& prefix)
{
static map<string, int>* prefixes = new map<string, int>();
- static synchronizable(prefixes) = SYNCHRONIZED_INITIALIZER;
+ static std::mutex* prefixes_mutex = new std::mutex();
int id;
- synchronized (prefixes) {
+ synchronized (prefixes_mutex) {
int& _id = (*prefixes)[prefix];
_id += 1;
id = _id;
@@ -323,7 +324,7 @@ private:
map<int, HttpProxy*> proxies;
// Protects instance variables.
- synchronizable(this);
+ std::recursive_mutex mutex;
};
@@ -370,14 +371,14 @@ private:
// Map of all local spawned and running processes.
map<string, ProcessBase*> processes;
- synchronizable(processes);
+ std::recursive_mutex processes_mutex;
- // Gates for waiting threads (protected by synchronizable(processes)).
+ // Gates for waiting threads (protected by processes_mutex).
map<ProcessBase*, Gate*> gates;
// Queue of runnable processes (implemented using list).
list<ProcessBase*> runq;
- synchronizable(runq);
+ std::recursive_mutex runq_mutex;
// Number of running processes, to support Clock::settle operation.
int running;
@@ -409,7 +410,7 @@ static Gate* gate = new Gate();
// recursive in case a filterer wants to do anything fancy (which is
// possible and likely given that filters will get used for testing).
static Filter* filterer = NULL;
-static synchronizable(filterer) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+static std::recursive_mutex* filterer_mutex = new std::recursive_mutex();
// Global garbage collector.
PID<GarbageCollector> gc;
@@ -1157,10 +1158,7 @@ void HttpProxy::stream(
}
-SocketManager::SocketManager()
-{
- synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-}
+SocketManager::SocketManager() {}
SocketManager::~SocketManager() {}
@@ -1168,7 +1166,7 @@ SocketManager::~SocketManager() {}
void SocketManager::accepted(const Socket& socket)
{
- synchronized (this) {
+ synchronized (mutex) {
sockets[socket] = new Socket(socket);
}
}
@@ -1264,7 +1262,7 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
Option<Socket> socket = None();
bool connect = false;
- synchronized (this) {
+ synchronized (mutex) {
// Check if the socket address is remote and there isn't a persistant link.
if (to.address != __address__ && persists.count(to.address) == 0) {
// Okay, no link, let's create a socket.
@@ -1313,7 +1311,7 @@ PID<HttpProxy> SocketManager::proxy(const Socket& socket)
{
HttpProxy* proxy = NULL;
- synchronized (this) {
+ synchronized (mutex) {
// This socket might have been asked to get closed (e.g., remote
// side hang up) while a process is attempting to handle an HTTP
// request. Thus, if there is no more socket, return an empty PID.
@@ -1420,7 +1418,7 @@ void SocketManager::send(Encoder* encoder, bool persist)
{
CHECK(encoder != NULL);
- synchronized (this) {
+ synchronized (mutex) {
Socket socket = encoder->socket();
if (sockets.count(socket) > 0) {
// Update whether or not this socket should get disposed after
@@ -1517,7 +1515,7 @@ void SocketManager::send(Message* message)
Option<Socket> socket = None();
bool connect = false;
- synchronized (this) {
+ synchronized (mutex) {
// Check if there is already a socket.
bool persist = persists.count(address) > 0;
bool temp = temps.count(address) > 0;
@@ -1587,7 +1585,7 @@ Encoder* SocketManager::next(int s)
{
HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
- synchronized (this) {
+ synchronized (mutex) {
// We cannot assume 'sockets.count(s) > 0' here because it's
// possible that 's' has been removed with a a call to
// SocketManager::close. For example, it could be the case that a
@@ -1659,7 +1657,7 @@ void SocketManager::close(int s)
{
HttpProxy* proxy = NULL; // Non-null if needs to be terminated.
- synchronized (this) {
+ synchronized (mutex) {
// This socket might not be active if it was already asked to get
// closed (e.g., a write on the socket failed so we try and close
// it and then later the recv side of the socket gets closed so we
@@ -1747,7 +1745,7 @@ void SocketManager::exited(const Address& address)
// into ProcessManager ... then we wouldn't have to convince
// ourselves that the accesses to each Process object will always be
// valid.
- synchronized (this) {
+ synchronized (mutex) {
if (!links.remotes.contains(address)) {
return; // No linkees for this socket address!
}
@@ -1788,7 +1786,7 @@ void SocketManager::exited(ProcessBase* process)
// can update the clocks of linked processes as appropriate.
const Time time = Clock::now(process);
- synchronized (this) {
+ synchronized (mutex) {
// If this process had linked to anything, we need to clean
// up any pointers to it. Also, if this process was the last
// linker to a remote linkee, we must remove linkee from the
@@ -1843,8 +1841,6 @@ void SocketManager::exited(ProcessBase* process)
ProcessManager::ProcessManager(const string& _delegate)
: delegate(_delegate)
{
- synchronizer(processes) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
- synchronizer(runq) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
running = 0;
__sync_synchronize(); // Ensure write to 'running' visible in other threads.
}
@@ -1857,7 +1853,7 @@ ProcessManager::~ProcessManager()
// or process the whole map as terminating one process might
// trigger other terminations. Deal with them one at a time.
do {
- synchronized (processes) {
+ synchronized (processes_mutex) {
process = !processes.empty() ? processes.begin()->second : NULL;
}
if (process != NULL) {
@@ -1871,7 +1867,7 @@ ProcessManager::~ProcessManager()
ProcessReference ProcessManager::use(const UPID& pid)
{
if (pid.address == __address__) {
- synchronized (processes) {
+ synchronized (processes_mutex) {
if (processes.count(pid.id) > 0) {
// Note that the ProcessReference constructor _must_ get
// called while holding the lock on processes so that waiting
@@ -2059,7 +2055,7 @@ UPID ProcessManager::spawn(ProcessBase* process, bool manage)
{
CHECK(process != NULL);
- synchronized (processes) {
+ synchronized (processes_mutex) {
if (processes.count(process->pid.id) > 0) {
return UPID();
} else {
@@ -2125,7 +2121,7 @@ void ProcessManager::resume(ProcessBase* process)
CHECK(event != NULL);
// Determine if we should filter this event.
- synchronized (filterer) {
+ synchronized (filterer_mutex) {
if (filterer != NULL) {
bool filter = false;
struct FilterVisitor : EventVisitor
@@ -2231,7 +2227,7 @@ void ProcessManager::cleanup(ProcessBase* process)
Gate* gate = NULL;
// Remove process.
- synchronized (processes) {
+ synchronized (processes_mutex) {
// Wait for all process references to get cleaned up.
while (process->refs > 0) {
#if defined(__i386__) || defined(__x86_64__)
@@ -2354,7 +2350,7 @@ bool ProcessManager::wait(const UPID& pid)
ProcessBase* process = NULL; // Set to non-null if we donate thread.
// Try and approach the gate if necessary.
- synchronized (processes) {
+ synchronized (processes_mutex) {
if (processes.count(pid.id) > 0) {
process = processes[pid.id];
CHECK(process->state != ProcessBase::TERMINATED);
@@ -2370,7 +2366,7 @@ bool ProcessManager::wait(const UPID& pid)
// Check if it is runnable in order to donate this thread.
if (process->state == ProcessBase::BOTTOM ||
process->state == ProcessBase::READY) {
- synchronized (runq) {
+ synchronized (runq_mutex) {
list<ProcessBase*>::iterator it =
find(runq.begin(), runq.end(), process);
if (it != runq.end()) {
@@ -2429,7 +2425,7 @@ void ProcessManager::enqueue(ProcessBase* process)
// it's not running. Otherwise, check and see which thread this
// process was last running on, and put it on that threads runq.
- synchronized (runq) {
+ synchronized (runq_mutex) {
CHECK(find(runq.begin(), runq.end(), process) == runq.end());
runq.push_back(process);
}
@@ -2447,7 +2443,7 @@ ProcessBase* ProcessManager::dequeue()
ProcessBase* process = NULL;
- synchronized (runq) {
+ synchronized (runq_mutex) {
if (!runq.empty()) {
process = runq.front();
runq.pop_front();
@@ -2482,7 +2478,7 @@ void ProcessManager::settle()
done = true; // Assume to start that we are settled.
- synchronized (runq) {
+ synchronized (runq_mutex) {
if (!runq.empty()) {
done = false;
continue;
@@ -2509,7 +2505,7 @@ Future<Response> ProcessManager::__processes__(const Request&)
{
JSON::Array array;
- synchronized (processes) {
+ synchronized (processes_mutex) {
foreachvalue (ProcessBase* process, process_manager->processes) {
JSON::Object object;
object.values["id"] = process->pid.id;
@@ -2906,7 +2902,7 @@ void filter(Filter *filter)
{
process::initialize();
- synchronized (filterer) {
+ synchronized (filterer_mutex) {
filterer = filter;
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/9f9d7738/3rdparty/libprocess/src/synchronized.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/synchronized.hpp b/3rdparty/libprocess/src/synchronized.hpp
index 6a341b8..462c46c 100644
--- a/3rdparty/libprocess/src/synchronized.hpp
+++ b/3rdparty/libprocess/src/synchronized.hpp
@@ -1,111 +1,61 @@
#ifndef __SYNCHRONIZABLE_HPP__
#define __SYNCHRONIZABLE_HPP__
-#include <pthread.h>
-
-#include <iostream>
-
-
-class Synchronizable
+#include <atomic>
+#include <mutex>
+#include <type_traits>
+
+// A helper class for the synchronized(m) macro. It is an RAII 'guard'
+// for a synchronization primitive 'T'. The general template handles
+// cases such as 'std::mutex' and 'std::recursive_mutex'.
+template <typename T>
+class Synchronized
{
public:
- Synchronizable()
- : initialized(false) {}
+ Synchronized(T* _lock) : lock(CHECK_NOTNULL(_lock)) { lock->lock(); }
+ Synchronized(T** _lock) : Synchronized(*CHECK_NOTNULL(_lock)) {}
- explicit Synchronizable(int _type)
- : type(_type), initialized(false)
- {
- initialize();
- }
-
- Synchronizable(const Synchronizable &that)
- {
- type = that.type;
- initialize();
- }
-
- Synchronizable & operator = (const Synchronizable &that)
- {
- type = that.type;
- initialize();
- return *this;
- }
-
- void acquire()
- {
- if (!initialized) {
- ABORT("synchronizable not initialized");
- }
- pthread_mutex_lock(&mutex);
- }
-
- void release()
- {
- if (!initialized) {
- ABORT("synchronizable not initialized");
- }
- pthread_mutex_unlock(&mutex);
- }
+ ~Synchronized() { lock->unlock(); }
+ operator bool() const { return true; }
private:
- void initialize()
- {
- if (!initialized) {
- pthread_mutexattr_t attr;
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_settype(&attr, type);
- pthread_mutex_init(&mutex, &attr);
- pthread_mutexattr_destroy(&attr);
- initialized = true;
- } else {
- ABORT("synchronizable already initialized");
- }
- }
-
- int type;
- bool initialized;
- pthread_mutex_t mutex;
+ T* lock;
};
-class Synchronized
+// A specialization of the Synchronized class for 'std::atomic_flag'.
+// This is necessary as the locking functions are different.
+template <>
+class Synchronized<std::atomic_flag>
{
public:
- explicit Synchronized(Synchronizable *_synchronizable)
- : synchronizable(_synchronizable)
+ Synchronized(std::atomic_flag* _flag) : flag(CHECK_NOTNULL(_flag))
{
- synchronizable->acquire();
+ while (flag->test_and_set(std::memory_order_acquire)) {}
}
+ Synchronized(std::atomic_flag** _flag)
+ : Synchronized(*CHECK_NOTNULL(_flag)) {}
~Synchronized()
{
- synchronizable->release();
+ flag->clear(std::memory_order_release);
}
- operator bool () { return true; }
-
+ operator bool() const { return true; }
private:
- Synchronizable *synchronizable;
+ std::atomic_flag* flag;
};
-#define synchronized(s) \
- if (Synchronized __synchronized ## s = Synchronized(&__synchronizable_ ## s))
-
-#define synchronizable(s) \
- Synchronizable __synchronizable_ ## s
-
-#define synchronizer(s) \
- (__synchronizable_ ## s)
-
-
-#define SYNCHRONIZED_INITIALIZER \
- Synchronizable(PTHREAD_MUTEX_NORMAL)
-
-#define SYNCHRONIZED_INITIALIZER_DEBUG \
- Synchronizable(PTHREAD_MUTEX_ERRORCHECK)
-
-#define SYNCHRONIZED_INITIALIZER_RECURSIVE \
- Synchronizable(PTHREAD_MUTEX_RECURSIVE)
+// A macro for acquiring a scoped 'guard' on any type that can satisfy
+// the 'Synchronized' interface. Currently this includes 'std::mutex',
+// 'std::recursive_mutex' and 'std::atomic_flag'.
+// Example:
+// std::mutex m;
+// synchronized (m) {
+// // Do something under the lock.
+// }
+#define synchronized(m) \
+ if (auto __ ## __file__ ## _ ## __line__ ## __lock = Synchronized<typename std::remove_pointer<decltype(m)>::type>(&m)) // NOLINT(whitespace/line_length)
#endif // __SYNCHRONIZABLE_HPP__
[5/9] mesos git commit: Refactor Mutex to use synchronized.
Posted by be...@apache.org.
Refactor Mutex to use synchronized.
Review: https://reviews.apache.org/r/32360
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e8d6f914
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e8d6f914
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e8d6f914
Branch: refs/heads/master
Commit: e8d6f9145a6107860b1c98f141ff714e43bd3378
Parents: 37373fb
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:25:37 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:25:38 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/mutex.hpp | 18 +++++++-----------
1 file changed, 7 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e8d6f914/3rdparty/libprocess/include/process/mutex.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/mutex.hpp b/3rdparty/libprocess/include/process/mutex.hpp
index 99dd14f..8fff089 100644
--- a/3rdparty/libprocess/include/process/mutex.hpp
+++ b/3rdparty/libprocess/include/process/mutex.hpp
@@ -1,14 +1,15 @@
#ifndef __PROCESS_MUTEX_HPP__
#define __PROCESS_MUTEX_HPP__
+#include <atomic>
#include <memory>
#include <queue>
#include <process/future.hpp>
-#include <process/internal.hpp>
#include <process/owned.hpp>
#include <stout/nothing.hpp>
+#include <stout/synchronized.hpp>
namespace process {
@@ -21,8 +22,7 @@ public:
{
Future<Nothing> future = Nothing();
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (!data->locked) {
data->locked = true;
} else {
@@ -31,7 +31,6 @@ public:
future = promise->future();
}
}
- internal::release(&data->lock);
return future;
}
@@ -43,8 +42,7 @@ public:
// trigger callbacks that try to reacquire the lock.
Owned<Promise<Nothing>> promise;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (!data->promises.empty()) {
// TODO(benh): Skip a future that has been discarded?
promise = data->promises.front();
@@ -53,7 +51,6 @@ public:
data->locked = false;
}
}
- internal::release(&data->lock);
if (promise.get() != NULL) {
promise->set(Nothing());
@@ -63,7 +60,7 @@ public:
private:
struct Data
{
- Data() : lock(0), locked(false) {}
+ Data() : lock(ATOMIC_FLAG_INIT), locked(false) {}
~Data()
{
@@ -71,9 +68,8 @@ private:
}
// Rather than use a process to serialize access to the mutex's
- // internal data we use a low-level "lock" which we acquire and
- // release using atomic builtins.
- int lock;
+ // internal data we use a 'std::atomic_flag'.
+ std::atomic_flag lock;
// Describes the state of this mutex.
bool locked;
[4/9] mesos git commit: Refactor Future to use synchronized.
Posted by be...@apache.org.
Refactor Future to use synchronized.
Review: https://reviews.apache.org/r/32358
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37373fb3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37373fb3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37373fb3
Branch: refs/heads/master
Commit: 37373fb3318770b4971831c15c563f399787d065
Parents: 889daa9
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:25:23 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:25:24 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/future.hpp | 52 ++++++---------------
1 file changed, 15 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/37373fb3/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index c22d6c8..75cbe12 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -4,6 +4,7 @@
#include <assert.h>
#include <stdlib.h> // For abort.
+#include <atomic>
#include <iostream>
#include <list>
#include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
@@ -14,7 +15,6 @@
#include <glog/logging.h>
#include <process/clock.hpp>
-#include <process/internal.hpp>
#include <process/latch.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
@@ -27,6 +27,7 @@
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/preprocessor.hpp>
+#include <stout/synchronized.hpp>
#include <stout/try.hpp>
namespace process {
@@ -383,7 +384,7 @@ private:
void clearAllCallbacks();
- int lock;
+ std::atomic_flag lock;
State state;
bool discard;
bool associated;
@@ -598,8 +599,7 @@ bool Promise<T>::associate(const Future<T>& future)
{
bool associated = false;
- internal::acquire(&f.data->lock);
- {
+ synchronized (f.data->lock) {
// Don't associate if this promise has completed. Note that this
// does not include if Future::discard was called on this future
// since in that case that would still leave the future PENDING
@@ -616,7 +616,6 @@ bool Promise<T>::associate(const Future<T>& future)
// another.
}
}
- internal::release(&f.data->lock);
// Note that we do the actual associating after releasing the lock
// above to avoid deadlocking by attempting to require the lock
@@ -763,14 +762,12 @@ bool Promise<T>::discard(Future<T> future)
bool result = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == Future<T>::PENDING) {
data->state = Future<T>::DISCARDED;
result = true;
}
}
- internal::release(&data->lock);
// Invoke all callbacks associated with this future being
// DISCARDED. We don't need a lock because the state is now in
@@ -797,7 +794,7 @@ Future<T> Future<T>::failed(const std::string& message)
template <typename T>
Future<T>::Data::Data()
- : lock(0),
+ : lock(ATOMIC_FLAG_INIT),
state(PENDING),
discard(false),
associated(false),
@@ -912,8 +909,7 @@ bool Future<T>::discard()
bool result = false;
std::vector<DiscardCallback> callbacks;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (!data->discard && data->state == PENDING) {
result = data->discard = true;
@@ -929,7 +925,6 @@ bool Future<T>::discard()
data->onDiscardCallbacks.clear();
}
}
- internal::release(&data->lock);
// Invoke all callbacks associated with doing a discard on this
// future. We don't need a lock because 'Data::discard' should now
@@ -1007,14 +1002,12 @@ bool Future<T>::await(const Duration& duration) const
bool pending = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == PENDING) {
pending = true;
data->onAnyCallbacks.push_back(lambda::bind(&internal::awaited, latch));
}
}
- internal::release(&data->lock);
if (pending) {
return latch->await(duration);
@@ -1058,15 +1051,13 @@ const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
{
bool run = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->discard) {
run = true;
} else if (data->state == PENDING) {
data->onDiscardCallbacks.emplace_back(std::move(callback));
}
}
- internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
@@ -1082,15 +1073,13 @@ const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
{
bool run = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == READY) {
run = true;
} else if (data->state == PENDING) {
data->onReadyCallbacks.emplace_back(std::move(callback));
}
}
- internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
@@ -1106,15 +1095,13 @@ const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
{
bool run = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == FAILED) {
run = true;
} else if (data->state == PENDING) {
data->onFailedCallbacks.emplace_back(std::move(callback));
}
}
- internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
@@ -1130,15 +1117,13 @@ const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
{
bool run = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == DISCARDED) {
run = true;
} else if (data->state == PENDING) {
data->onDiscardedCallbacks.emplace_back(std::move(callback));
}
}
- internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
@@ -1154,15 +1139,13 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
{
bool run = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == PENDING) {
data->onAnyCallbacks.emplace_back(std::move(callback));
} else {
run = true;
}
}
- internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
@@ -1172,7 +1155,6 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
return *this;
}
-
namespace internal {
// NOTE: We need to name this 'thenf' versus 'then' to distinguish it
@@ -1360,15 +1342,13 @@ bool Future<T>::set(const T& _t)
{
bool result = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == PENDING) {
data->t = new T(_t);
data->state = READY;
result = true;
}
}
- internal::release(&data->lock);
// Invoke all callbacks associated with this future being READY. We
// don't need a lock because the state is now in READY so there
@@ -1389,15 +1369,13 @@ bool Future<T>::fail(const std::string& _message)
{
bool result = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == PENDING) {
data->message = new std::string(_message);
data->state = FAILED;
result = true;
}
}
- internal::release(&data->lock);
// Invoke all callbacks associated with this future being FAILED. We
// don't need a lock because the state is now in FAILED so there
[7/9] mesos git commit: Refactor Metrics::Timer to use synchronized.
Posted by be...@apache.org.
Refactor Metrics::Timer to use synchronized.
Review: https://reviews.apache.org/r/32362
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8323bcab
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8323bcab
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8323bcab
Branch: refs/heads/master
Commit: 8323bcab310e0f45a53ba46df8b6d90e1cabbbf3
Parents: 4153a7e
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:26:04 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:26:04 2015 -0700
----------------------------------------------------------------------
.../include/process/metrics/timer.hpp | 24 +++++++-------------
1 file changed, 8 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8323bcab/3rdparty/libprocess/include/process/metrics/timer.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/timer.hpp b/3rdparty/libprocess/include/process/metrics/timer.hpp
index b6f9fbd..92cef2b 100644
--- a/3rdparty/libprocess/include/process/metrics/timer.hpp
+++ b/3rdparty/libprocess/include/process/metrics/timer.hpp
@@ -1,18 +1,19 @@
#ifndef __PROCESS_METRICS_TIMER_HPP__
#define __PROCESS_METRICS_TIMER_HPP__
+#include <atomic>
#include <memory>
#include <string>
#include <process/clock.hpp>
#include <process/future.hpp>
-#include <process/internal.hpp>
#include <process/metrics/metric.hpp>
#include <stout/duration.hpp>
#include <stout/hashmap.hpp>
#include <stout/option.hpp>
+#include <stout/synchronized.hpp>
#include <stout/try.hpp>
namespace process {
@@ -33,15 +34,13 @@ public:
{
Future<double> value;
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->lastValue.isSome()) {
value = data->lastValue.get();
} else {
value = Failure("No value");
}
}
- process::internal::release(&data->lock);
return value;
}
@@ -49,11 +48,9 @@ public:
// Start the Timer.
void start()
{
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
data->start = Clock::now();
}
- process::internal::release(&data->lock);
}
// Stop the Timer.
@@ -65,15 +62,13 @@ public:
double value;
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
t = T(stop - data->start);
data->lastValue = t.value();
value = data->lastValue.get();
}
- process::internal::release(&data->lock);
push(value);
@@ -94,9 +89,8 @@ public:
private:
struct Data {
- Data() : lock(0) {}
-
- int lock;
+ Data() : lock(ATOMIC_FLAG_INIT) {}
+ std::atomic_flag lock;
Time start;
Option<double> lastValue;
};
@@ -107,12 +101,10 @@ private:
double value;
- process::internal::acquire(&that.data->lock);
- {
+ synchronized (that.data->lock) {
that.data->lastValue = T(stop - start).value();
value = that.data->lastValue.get();
}
- process::internal::release(&that.data->lock);
that.push(value);
}
[9/9] mesos git commit: Refactor http to use synchronized.
Posted by be...@apache.org.
Refactor http to use synchronized.
Review: https://reviews.apache.org/r/32364
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/94c0f68d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/94c0f68d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/94c0f68d
Branch: refs/heads/master
Commit: 94c0f68dc8a32b6cd768c18d6d8d65d40833296f
Parents: 8c2d8d4
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:26:28 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:26:29 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/http.hpp | 11 +++++++----
3rdparty/libprocess/src/http.cpp | 22 ++++++----------------
2 files changed, 13 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/94c0f68d/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index bba62b3..51a00f5 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -3,6 +3,7 @@
#include <stdint.h>
+#include <atomic>
#include <iosfwd>
#include <memory>
#include <queue>
@@ -191,12 +192,14 @@ public:
private:
struct Data
{
- Data() : lock(0), readEnd(Reader::OPEN), writeEnd(Writer::OPEN) {}
+ Data()
+ : lock(ATOMIC_FLAG_INIT),
+ readEnd(Reader::OPEN),
+ writeEnd(Writer::OPEN) {}
// Rather than use a process to serialize access to the pipe's
- // internal data we use a low-level "lock" which we acquire and
- // release using atomic builtins.
- int lock;
+ // internal data we use a 'std::atomic_flag'.
+ std::atomic_flag lock;
Reader::State readEnd;
Writer::State writeEnd;
http://git-wip-us.apache.org/repos/asf/mesos/blob/94c0f68d/3rdparty/libprocess/src/http.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/http.cpp b/3rdparty/libprocess/src/http.cpp
index 1d318b9..0898335 100644
--- a/3rdparty/libprocess/src/http.cpp
+++ b/3rdparty/libprocess/src/http.cpp
@@ -18,7 +18,6 @@
#include <process/future.hpp>
#include <process/http.hpp>
-#include <process/internal.hpp>
#include <process/owned.hpp>
#include <process/socket.hpp>
@@ -30,6 +29,7 @@
#include <stout/numify.hpp>
#include <stout/option.hpp>
#include <stout/strings.hpp>
+#include <stout/synchronized.hpp>
#include <stout/try.hpp>
#include "decoder.hpp"
@@ -179,8 +179,7 @@ Future<string> Pipe::Reader::read()
{
Future<string> future;
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->readEnd == Reader::CLOSED) {
future = Failure("closed");
} else if (!data->writes.empty()) {
@@ -196,7 +195,6 @@ Future<string> Pipe::Reader::read()
future = data->reads.back()->future();
}
}
- process::internal::release(&data->lock);
return future;
}
@@ -208,8 +206,7 @@ bool Pipe::Reader::close()
bool notify = false;
queue<Owned<Promise<string>>> reads;
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->readEnd == Reader::OPEN) {
// Throw away outstanding data.
while (!data->writes.empty()) {
@@ -226,7 +223,6 @@ bool Pipe::Reader::close()
notify = data->writeEnd == Writer::OPEN;
}
}
- process::internal::release(&data->lock);
// NOTE: We transition the promises outside the critical section
// to avoid triggering callbacks that try to reacquire the lock.
@@ -250,8 +246,7 @@ bool Pipe::Writer::write(const string& s)
bool written = false;
Owned<Promise<string>> read;
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
// Ignore writes if either end of the pipe is closed or failed!
if (data->writeEnd == Writer::OPEN && data->readEnd == Reader::OPEN) {
// Don't bother surfacing empty writes to the readers.
@@ -266,7 +261,6 @@ bool Pipe::Writer::write(const string& s)
written = true;
}
}
- process::internal::release(&data->lock);
// NOTE: We set the promise outside the critical section to avoid
// triggering callbacks that try to reacquire the lock.
@@ -283,8 +277,7 @@ bool Pipe::Writer::close()
bool closed = false;
queue<Owned<Promise<string>>> reads;
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->writeEnd == Writer::OPEN) {
// Extract all the pending reads so we can complete them.
std::swap(data->reads, reads);
@@ -293,7 +286,6 @@ bool Pipe::Writer::close()
closed = true;
}
}
- process::internal::release(&data->lock);
// NOTE: We set the promises outside the critical section to avoid
// triggering callbacks that try to reacquire the lock.
@@ -311,8 +303,7 @@ bool Pipe::Writer::fail(const string& message)
bool failed = false;
queue<Owned<Promise<string>>> reads;
- process::internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->writeEnd == Writer::OPEN) {
// Extract all the pending reads so we can fail them.
std::swap(data->reads, reads);
@@ -322,7 +313,6 @@ bool Pipe::Writer::fail(const string& message)
failed = true;
}
}
- process::internal::release(&data->lock);
// NOTE: We set the promises outside the critical section to avoid
// triggering callbacks that try to reacquire the lock.
[2/9] mesos git commit: Move synchronized.hpp into stout.
Posted by be...@apache.org.
Move synchronized.hpp into stout.
Review: https://reviews.apache.org/r/35012
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/90c10cbd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/90c10cbd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/90c10cbd
Branch: refs/heads/master
Commit: 90c10cbd6ff183b2ed00cfd98451ea5b9abd9f5b
Parents: 9f9d773
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:24:45 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:24:45 2015 -0700
----------------------------------------------------------------------
.../3rdparty/stout/include/Makefile.am | 1 +
.../stout/include/stout/synchronized.hpp | 75 ++++++++++++++++++++
2 files changed, 76 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/90c10cbd/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/Makefile.am b/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
index 79239d7..6ac2f04 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/stout/include/Makefile.am
@@ -72,6 +72,7 @@ nobase_include_HEADERS = \
stout/strings.hpp \
stout/subcommand.hpp \
stout/svn.hpp \
+ stout/synchronized.hpp \
stout/tests/utils.hpp \
stout/thread.hpp \
stout/try.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/90c10cbd/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
new file mode 100644
index 0000000..60eaf26
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/synchronized.hpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STOUT_SYNCHRONIZED_HPP__
+#define __STOUT_SYNCHRONIZED_HPP__
+
+#include <atomic>
+#include <mutex>
+#include <type_traits>
+
+// A helper class for the synchronized(m) macro. It is an RAII 'guard'
+// for a synchronization primitive 'T'. The general template handles
+// cases such as 'std::mutex' and 'std::recursive_mutex'.
+template <typename T>
+class Synchronized
+{
+public:
+ Synchronized(T* _lock) : lock(CHECK_NOTNULL(_lock)) { lock->lock(); }
+ Synchronized(T** _lock) : Synchronized(*CHECK_NOTNULL(_lock)) {}
+
+ ~Synchronized() { lock->unlock(); }
+
+ operator bool() const { return true; }
+private:
+ T* lock;
+};
+
+
+// A specialization of the Synchronized class for 'std::atomic_flag'.
+// This is necessary as the locking functions are different.
+template <>
+class Synchronized<std::atomic_flag>
+{
+public:
+ Synchronized(std::atomic_flag* _flag) : flag(CHECK_NOTNULL(_flag))
+ {
+ while (flag->test_and_set(std::memory_order_acquire)) {}
+ }
+ Synchronized(std::atomic_flag** _flag)
+ : Synchronized(*CHECK_NOTNULL(_flag)) {}
+
+ ~Synchronized()
+ {
+ flag->clear(std::memory_order_release);
+ }
+
+ operator bool() const { return true; }
+private:
+ std::atomic_flag* flag;
+};
+
+
+// A macro for acquiring a scoped 'guard' on any type that can satisfy
+// the 'Synchronized' interface. Currently this includes 'std::mutex',
+// 'std::recursive_mutex' and 'std::atomic_flag'.
+// Example:
+// std::mutex m;
+// synchronized (m) {
+// // Do something under the lock.
+// }
+#define synchronized(m) \
+ if (auto __ ## __file__ ## _ ## __line__ ## __lock = Synchronized<typename std::remove_pointer<decltype(m)>::type>(&m)) // NOLINT(whitespace/line_length)
+
+#endif // __STOUT_SYNCHRONIZED_HPP__
[6/9] mesos git commit: Refactor Queue to use synchronized.
Posted by be...@apache.org.
Refactor Queue to use synchronized.
Review: https://reviews.apache.org/r/32361
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4153a7e0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4153a7e0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4153a7e0
Branch: refs/heads/master
Commit: 4153a7e0681eb4131bd8a5395e4e038ce62324e8
Parents: e8d6f91
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:25:51 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:25:51 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/queue.hpp | 19 ++++++++-----------
1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4153a7e0/3rdparty/libprocess/include/process/queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/queue.hpp b/3rdparty/libprocess/include/process/queue.hpp
index df8efc0..1496b38 100644
--- a/3rdparty/libprocess/include/process/queue.hpp
+++ b/3rdparty/libprocess/include/process/queue.hpp
@@ -1,14 +1,16 @@
#ifndef __PROCESS_QUEUE_HPP__
#define __PROCESS_QUEUE_HPP__
+#include <atomic>
#include <deque>
#include <memory>
#include <queue>
#include <process/future.hpp>
-#include <process/internal.hpp>
#include <process/owned.hpp>
+#include <stout/synchronized.hpp>
+
namespace process {
template <typename T>
@@ -24,8 +26,7 @@ public:
// trigger callbacks that try to reacquire the lock.
Owned<Promise<T>> promise;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->promises.empty()) {
data->elements.push(t);
} else {
@@ -33,7 +34,6 @@ public:
data->promises.pop_front();
}
}
- internal::release(&data->lock);
if (promise.get() != NULL) {
promise->set(t);
@@ -44,8 +44,7 @@ public:
{
Future<T> future;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->elements.empty()) {
data->promises.push_back(Owned<Promise<T>>(new Promise<T>()));
future = data->promises.back()->future();
@@ -54,7 +53,6 @@ public:
data->elements.pop();
}
}
- internal::release(&data->lock);
return future;
}
@@ -62,7 +60,7 @@ public:
private:
struct Data
{
- Data() : lock(0) {}
+ Data() : lock(ATOMIC_FLAG_INIT) {}
~Data()
{
@@ -70,9 +68,8 @@ private:
}
// Rather than use a process to serialize access to the queue's
- // internal data we use a low-level "lock" which we acquire and
- // release using atomic builtins.
- int lock;
+ // internal data we use a 'std::atomic_flag'.
+ std::atomic_flag lock;
// Represents "waiters" for elements from the queue.
std::deque<Owned<Promise<T>>> promises;
[8/9] mesos git commit: Refactor Metrics::Metric to use synchronized.
Posted by be...@apache.org.
Refactor Metrics::Metric to use synchronized.
Review: https://reviews.apache.org/r/32363
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8c2d8d47
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8c2d8d47
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8c2d8d47
Branch: refs/heads/master
Commit: 8c2d8d47f2bed2c15a5db2d0dc1afb33025f4af0
Parents: 8323bca
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:26:16 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:26:16 2015 -0700
----------------------------------------------------------------------
.../libprocess/include/process/metrics/metric.hpp | 15 ++++++---------
1 file changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8c2d8d47/3rdparty/libprocess/include/process/metrics/metric.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/metric.hpp b/3rdparty/libprocess/include/process/metrics/metric.hpp
index a7be2d7..44a7d5a 100644
--- a/3rdparty/libprocess/include/process/metrics/metric.hpp
+++ b/3rdparty/libprocess/include/process/metrics/metric.hpp
@@ -1,17 +1,18 @@
#ifndef __PROCESS_METRICS_METRIC_HPP__
#define __PROCESS_METRICS_METRIC_HPP__
+#include <atomic>
#include <memory>
#include <string>
#include <process/future.hpp>
-#include <process/internal.hpp>
#include <process/owned.hpp>
#include <process/statistics.hpp>
#include <process/timeseries.hpp>
#include <stout/duration.hpp>
#include <stout/option.hpp>
+#include <stout/synchronized.hpp>
namespace process {
namespace metrics {
@@ -33,11 +34,9 @@ public:
Option<Statistics<double>> statistics = None();
if (data->history.isSome()) {
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
statistics = Statistics<double>::from(*data->history.get());
}
- internal::release(&data->lock);
}
return statistics;
@@ -53,11 +52,9 @@ protected:
if (data->history.isSome()) {
Time now = Clock::now();
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
data->history.get()->set(value, now);
}
- internal::release(&data->lock);
}
}
@@ -65,7 +62,7 @@ private:
struct Data {
Data(const std::string& _name, const Option<Duration>& window)
: name(_name),
- lock(0),
+ lock(ATOMIC_FLAG_INIT),
history(None())
{
if (window.isSome()) {
@@ -76,7 +73,7 @@ private:
const std::string name;
- int lock;
+ std::atomic_flag lock;
Option<Owned<TimeSeries<double>>> history;
};
[3/9] mesos git commit: Move synchronized.hpp out of libprocess.
Posted by be...@apache.org.
Move synchronized.hpp out of libprocess.
Review: https://reviews.apache.org/r/35013
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/889daa98
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/889daa98
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/889daa98
Branch: refs/heads/master
Commit: 889daa989fd19a12afb2560faef5d58d9af5db99
Parents: 90c10cb
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:25:01 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:25:02 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 -
3rdparty/libprocess/src/clock.cpp | 2 +-
3rdparty/libprocess/src/libev.hpp | 3 +-
3rdparty/libprocess/src/libevent.cpp | 3 +-
3rdparty/libprocess/src/process.cpp | 2 +-
3rdparty/libprocess/src/synchronized.hpp | 61 ---------------------------
6 files changed, 5 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/889daa98/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 6e8972f..489ce35 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -51,7 +51,6 @@ libprocess_la_SOURCES = \
src/reap.cpp \
src/socket.cpp \
src/subprocess.cpp \
- src/synchronized.hpp \
src/timeseries.cpp
libprocess_la_CPPFLAGS = \
http://git-wip-us.apache.org/repos/asf/mesos/blob/889daa98/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
index afeb43c..dd726c1 100644
--- a/3rdparty/libprocess/src/clock.cpp
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -14,11 +14,11 @@
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
+#include <stout/synchronized.hpp>
#include <stout/try.hpp>
#include <stout/unreachable.hpp>
#include "event_loop.hpp"
-#include "synchronized.hpp"
using std::list;
using std::map;
http://git-wip-us.apache.org/repos/asf/mesos/blob/889daa98/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
index 6c2015b..a0a2f49 100644
--- a/3rdparty/libprocess/src/libev.hpp
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -10,10 +10,9 @@
#include <process/owned.hpp>
#include <stout/lambda.hpp>
+#include <stout/synchronized.hpp>
#include <stout/thread.hpp>
-#include "synchronized.hpp"
-
namespace process {
// Event loop.
http://git-wip-us.apache.org/repos/asf/mesos/blob/889daa98/3rdparty/libprocess/src/libevent.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.cpp b/3rdparty/libprocess/src/libevent.cpp
index d27fcb9..fb03859 100644
--- a/3rdparty/libprocess/src/libevent.cpp
+++ b/3rdparty/libprocess/src/libevent.cpp
@@ -5,9 +5,10 @@
#include <process/logging.hpp>
+#include <stout/synchronized.hpp>
+
#include "event_loop.hpp"
#include "libevent.hpp"
-#include "synchronized.hpp"
namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/889daa98/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 834670b..f1d3e5b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -74,6 +74,7 @@
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/strings.hpp>
+#include <stout/synchronized.hpp>
#include <stout/thread.hpp>
#include <stout/unreachable.hpp>
@@ -83,7 +84,6 @@
#include "event_loop.hpp"
#include "gate.hpp"
#include "process_reference.hpp"
-#include "synchronized.hpp"
using namespace process::metrics::internal;
http://git-wip-us.apache.org/repos/asf/mesos/blob/889daa98/3rdparty/libprocess/src/synchronized.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/synchronized.hpp b/3rdparty/libprocess/src/synchronized.hpp
deleted file mode 100644
index 462c46c..0000000
--- a/3rdparty/libprocess/src/synchronized.hpp
+++ /dev/null
@@ -1,61 +0,0 @@
-#ifndef __SYNCHRONIZABLE_HPP__
-#define __SYNCHRONIZABLE_HPP__
-
-#include <atomic>
-#include <mutex>
-#include <type_traits>
-
-// A helper class for the synchronized(m) macro. It is an RAII 'guard'
-// for a synchronization primitive 'T'. The general template handles
-// cases such as 'std::mutex' and 'std::recursive_mutex'.
-template <typename T>
-class Synchronized
-{
-public:
- Synchronized(T* _lock) : lock(CHECK_NOTNULL(_lock)) { lock->lock(); }
- Synchronized(T** _lock) : Synchronized(*CHECK_NOTNULL(_lock)) {}
-
- ~Synchronized() { lock->unlock(); }
-
- operator bool() const { return true; }
-private:
- T* lock;
-};
-
-
-// A specialization of the Synchronized class for 'std::atomic_flag'.
-// This is necessary as the locking functions are different.
-template <>
-class Synchronized<std::atomic_flag>
-{
-public:
- Synchronized(std::atomic_flag* _flag) : flag(CHECK_NOTNULL(_flag))
- {
- while (flag->test_and_set(std::memory_order_acquire)) {}
- }
- Synchronized(std::atomic_flag** _flag)
- : Synchronized(*CHECK_NOTNULL(_flag)) {}
-
- ~Synchronized()
- {
- flag->clear(std::memory_order_release);
- }
-
- operator bool() const { return true; }
-private:
- std::atomic_flag* flag;
-};
-
-
-// A macro for acquiring a scoped 'guard' on any type that can satisfy
-// the 'Synchronized' interface. Currently this includes 'std::mutex',
-// 'std::recursive_mutex' and 'std::atomic_flag'.
-// Example:
-// std::mutex m;
-// synchronized (m) {
-// // Do something under the lock.
-// }
-#define synchronized(m) \
- if (auto __ ## __file__ ## _ ## __line__ ## __lock = Synchronized<typename std::remove_pointer<decltype(m)>::type>(&m)) // NOLINT(whitespace/line_length)
-
-#endif // __SYNCHRONIZABLE_HPP__