You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/04 09:30:25 UTC

[4/9] mesos git commit: Refactor Future to use synchronized.

Refactor Future to use synchronized.

Review: https://reviews.apache.org/r/32358


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37373fb3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37373fb3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37373fb3

Branch: refs/heads/master
Commit: 37373fb3318770b4971831c15c563f399787d065
Parents: 889daa9
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:25:23 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:25:24 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp | 52 ++++++---------------
 1 file changed, 15 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/37373fb3/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index c22d6c8..75cbe12 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -4,6 +4,7 @@
 #include <assert.h>
 #include <stdlib.h> // For abort.
 
+#include <atomic>
 #include <iostream>
 #include <list>
 #include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
@@ -14,7 +15,6 @@
 #include <glog/logging.h>
 
 #include <process/clock.hpp>
-#include <process/internal.hpp>
 #include <process/latch.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
@@ -27,6 +27,7 @@
 #include <stout/none.hpp>
 #include <stout/option.hpp>
 #include <stout/preprocessor.hpp>
+#include <stout/synchronized.hpp>
 #include <stout/try.hpp>
 
 namespace process {
@@ -383,7 +384,7 @@ private:
 
     void clearAllCallbacks();
 
-    int lock;
+    std::atomic_flag lock;
     State state;
     bool discard;
     bool associated;
@@ -598,8 +599,7 @@ bool Promise<T>::associate(const Future<T>& future)
 {
   bool associated = false;
 
-  internal::acquire(&f.data->lock);
-  {
+  synchronized (f.data->lock) {
     // Don't associate if this promise has completed. Note that this
     // does not include if Future::discard was called on this future
     // since in that case that would still leave the future PENDING
@@ -616,7 +616,6 @@ bool Promise<T>::associate(const Future<T>& future)
       // another.
     }
   }
-  internal::release(&f.data->lock);
 
   // Note that we do the actual associating after releasing the lock
   // above to avoid deadlocking by attempting to require the lock
@@ -763,14 +762,12 @@ bool Promise<T>::discard(Future<T> future)
 
   bool result = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == Future<T>::PENDING) {
       data->state = Future<T>::DISCARDED;
       result = true;
     }
   }
-  internal::release(&data->lock);
 
   // Invoke all callbacks associated with this future being
   // DISCARDED. We don't need a lock because the state is now in
@@ -797,7 +794,7 @@ Future<T> Future<T>::failed(const std::string& message)
 
 template <typename T>
 Future<T>::Data::Data()
-  : lock(0),
+  : lock(ATOMIC_FLAG_INIT),
     state(PENDING),
     discard(false),
     associated(false),
@@ -912,8 +909,7 @@ bool Future<T>::discard()
   bool result = false;
 
   std::vector<DiscardCallback> callbacks;
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (!data->discard && data->state == PENDING) {
       result = data->discard = true;
 
@@ -929,7 +925,6 @@ bool Future<T>::discard()
       data->onDiscardCallbacks.clear();
     }
   }
-  internal::release(&data->lock);
 
   // Invoke all callbacks associated with doing a discard on this
   // future. We don't need a lock because 'Data::discard' should now
@@ -1007,14 +1002,12 @@ bool Future<T>::await(const Duration& duration) const
 
   bool pending = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == PENDING) {
       pending = true;
       data->onAnyCallbacks.push_back(lambda::bind(&internal::awaited, latch));
     }
   }
-  internal::release(&data->lock);
 
   if (pending) {
     return latch->await(duration);
@@ -1058,15 +1051,13 @@ const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
 {
   bool run = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->discard) {
       run = true;
     } else if (data->state == PENDING) {
       data->onDiscardCallbacks.emplace_back(std::move(callback));
     }
   }
-  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
@@ -1082,15 +1073,13 @@ const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
 {
   bool run = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == READY) {
       run = true;
     } else if (data->state == PENDING) {
       data->onReadyCallbacks.emplace_back(std::move(callback));
     }
   }
-  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
@@ -1106,15 +1095,13 @@ const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
 {
   bool run = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == FAILED) {
       run = true;
     } else if (data->state == PENDING) {
       data->onFailedCallbacks.emplace_back(std::move(callback));
     }
   }
-  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
@@ -1130,15 +1117,13 @@ const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
 {
   bool run = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == DISCARDED) {
       run = true;
     } else if (data->state == PENDING) {
       data->onDiscardedCallbacks.emplace_back(std::move(callback));
     }
   }
-  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
@@ -1154,15 +1139,13 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
 {
   bool run = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == PENDING) {
       data->onAnyCallbacks.emplace_back(std::move(callback));
     } else {
       run = true;
     }
   }
-  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
@@ -1172,7 +1155,6 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
   return *this;
 }
 
-
 namespace internal {
 
 // NOTE: We need to name this 'thenf' versus 'then' to distinguish it
@@ -1360,15 +1342,13 @@ bool Future<T>::set(const T& _t)
 {
   bool result = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == PENDING) {
       data->t = new T(_t);
       data->state = READY;
       result = true;
     }
   }
-  internal::release(&data->lock);
 
   // Invoke all callbacks associated with this future being READY. We
   // don't need a lock because the state is now in READY so there
@@ -1389,15 +1369,13 @@ bool Future<T>::fail(const std::string& _message)
 {
   bool result = false;
 
-  internal::acquire(&data->lock);
-  {
+  synchronized (data->lock) {
     if (data->state == PENDING) {
       data->message = new std::string(_message);
       data->state = FAILED;
       result = true;
     }
   }
-  internal::release(&data->lock);
 
   // Invoke all callbacks associated with this future being FAILED. We
   // don't need a lock because the state is now in FAILED so there