You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/04 09:30:27 UTC
[6/9] mesos git commit: Refactor Queue to use synchronized.
Refactor Queue to use synchronized.
Review: https://reviews.apache.org/r/32361
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4153a7e0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4153a7e0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4153a7e0
Branch: refs/heads/master
Commit: 4153a7e0681eb4131bd8a5395e4e038ce62324e8
Parents: e8d6f91
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:25:51 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:25:51 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/queue.hpp | 19 ++++++++-----------
1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4153a7e0/3rdparty/libprocess/include/process/queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/queue.hpp b/3rdparty/libprocess/include/process/queue.hpp
index df8efc0..1496b38 100644
--- a/3rdparty/libprocess/include/process/queue.hpp
+++ b/3rdparty/libprocess/include/process/queue.hpp
@@ -1,14 +1,16 @@
#ifndef __PROCESS_QUEUE_HPP__
#define __PROCESS_QUEUE_HPP__
+#include <atomic>
#include <deque>
#include <memory>
#include <queue>
#include <process/future.hpp>
-#include <process/internal.hpp>
#include <process/owned.hpp>
+#include <stout/synchronized.hpp>
+
namespace process {
template <typename T>
@@ -24,8 +26,7 @@ public:
// trigger callbacks that try to reacquire the lock.
Owned<Promise<T>> promise;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->promises.empty()) {
data->elements.push(t);
} else {
@@ -33,7 +34,6 @@ public:
data->promises.pop_front();
}
}
- internal::release(&data->lock);
if (promise.get() != NULL) {
promise->set(t);
@@ -44,8 +44,7 @@ public:
{
Future<T> future;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->elements.empty()) {
data->promises.push_back(Owned<Promise<T>>(new Promise<T>()));
future = data->promises.back()->future();
@@ -54,7 +53,6 @@ public:
data->elements.pop();
}
}
- internal::release(&data->lock);
return future;
}
@@ -62,7 +60,7 @@ public:
private:
struct Data
{
- Data() : lock(0) {}
+ Data() : lock(ATOMIC_FLAG_INIT) {}
~Data()
{
@@ -70,9 +68,8 @@ private:
}
// Rather than use a process to serialize access to the queue's
- // internal data we use a low-level "lock" which we acquire and
- // release using atomic builtins.
- int lock;
+ // internal data we use a 'std::atomic_flag'.
+ std::atomic_flag lock;
// Represents "waiters" for elements from the queue.
std::deque<Owned<Promise<T>>> promises;