You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/04 09:30:27 UTC

[6/9] mesos git commit: Refactor Queue to use synchronized.

Refactor Queue to use synchronized.

Review: https://reviews.apache.org/r/32361


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4153a7e0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4153a7e0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4153a7e0

Branch: refs/heads/master
Commit: 4153a7e0681eb4131bd8a5395e4e038ce62324e8
Parents: e8d6f91
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:25:51 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:25:51 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/queue.hpp | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4153a7e0/3rdparty/libprocess/include/process/queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/queue.hpp b/3rdparty/libprocess/include/process/queue.hpp
index df8efc0..1496b38 100644
--- a/3rdparty/libprocess/include/process/queue.hpp
+++ b/3rdparty/libprocess/include/process/queue.hpp
@@ -1,14 +1,16 @@
 #ifndef __PROCESS_QUEUE_HPP__
 #define __PROCESS_QUEUE_HPP__
 
+#include <atomic>
 #include <deque>
 #include <memory>
 #include <queue>
 
 #include <process/future.hpp>
-#include <process/internal.hpp>
 #include <process/owned.hpp>
 
+#include <stout/synchronized.hpp>
+
 namespace process {
 
 template <typename T>
@@ -24,8 +26,7 @@ public:
     // trigger callbacks that try to reacquire the lock.
     Owned<Promise<T>> promise;
 
-    internal::acquire(&data->lock);
-    {
+    synchronized (data->lock) {
       if (data->promises.empty()) {
         data->elements.push(t);
       } else {
@@ -33,7 +34,6 @@ public:
         data->promises.pop_front();
       }
     }
-    internal::release(&data->lock);
 
     if (promise.get() != NULL) {
       promise->set(t);
@@ -44,8 +44,7 @@ public:
   {
     Future<T> future;
 
-    internal::acquire(&data->lock);
-    {
+    synchronized (data->lock) {
       if (data->elements.empty()) {
         data->promises.push_back(Owned<Promise<T>>(new Promise<T>()));
         future = data->promises.back()->future();
@@ -54,7 +53,6 @@ public:
         data->elements.pop();
       }
     }
-    internal::release(&data->lock);
 
     return future;
   }
@@ -62,7 +60,7 @@ public:
 private:
   struct Data
   {
-    Data() : lock(0) {}
+    Data() : lock(ATOMIC_FLAG_INIT) {}
 
     ~Data()
     {
@@ -70,9 +68,8 @@ private:
     }
 
     // Rather than use a process to serialize access to the queue's
-    // internal data we use a low-level "lock" which we acquire and
-    // release using atomic builtins.
-    int lock;
+    // internal data we use a 'std::atomic_flag'.
+    std::atomic_flag lock;
 
     // Represents "waiters" for elements from the queue.
     std::deque<Owned<Promise<T>>> promises;