You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/04 09:30:25 UTC
[4/9] mesos git commit: Refactor Future to use synchronized.
Refactor Future to use synchronized.
Review: https://reviews.apache.org/r/32358
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37373fb3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37373fb3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37373fb3
Branch: refs/heads/master
Commit: 37373fb3318770b4971831c15c563f399787d065
Parents: 889daa9
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Jun 4 00:25:23 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jun 4 00:25:24 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/future.hpp | 52 ++++++---------------
1 file changed, 15 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/37373fb3/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index c22d6c8..75cbe12 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -4,6 +4,7 @@
#include <assert.h>
#include <stdlib.h> // For abort.
+#include <atomic>
#include <iostream>
#include <list>
#include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
@@ -14,7 +15,6 @@
#include <glog/logging.h>
#include <process/clock.hpp>
-#include <process/internal.hpp>
#include <process/latch.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
@@ -27,6 +27,7 @@
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/preprocessor.hpp>
+#include <stout/synchronized.hpp>
#include <stout/try.hpp>
namespace process {
@@ -383,7 +384,7 @@ private:
void clearAllCallbacks();
- int lock;
+ std::atomic_flag lock;
State state;
bool discard;
bool associated;
@@ -598,8 +599,7 @@ bool Promise<T>::associate(const Future<T>& future)
{
bool associated = false;
- internal::acquire(&f.data->lock);
- {
+ synchronized (f.data->lock) {
// Don't associate if this promise has completed. Note that this
// does not include if Future::discard was called on this future
// since in that case that would still leave the future PENDING
@@ -616,7 +616,6 @@ bool Promise<T>::associate(const Future<T>& future)
// another.
}
}
- internal::release(&f.data->lock);
// Note that we do the actual associating after releasing the lock
// above to avoid deadlocking by attempting to require the lock
@@ -763,14 +762,12 @@ bool Promise<T>::discard(Future<T> future)
bool result = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == Future<T>::PENDING) {
data->state = Future<T>::DISCARDED;
result = true;
}
}
- internal::release(&data->lock);
// Invoke all callbacks associated with this future being
// DISCARDED. We don't need a lock because the state is now in
@@ -797,7 +794,7 @@ Future<T> Future<T>::failed(const std::string& message)
template <typename T>
Future<T>::Data::Data()
- : lock(0),
+ : lock(ATOMIC_FLAG_INIT),
state(PENDING),
discard(false),
associated(false),
@@ -912,8 +909,7 @@ bool Future<T>::discard()
bool result = false;
std::vector<DiscardCallback> callbacks;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (!data->discard && data->state == PENDING) {
result = data->discard = true;
@@ -929,7 +925,6 @@ bool Future<T>::discard()
data->onDiscardCallbacks.clear();
}
}
- internal::release(&data->lock);
// Invoke all callbacks associated with doing a discard on this
// future. We don't need a lock because 'Data::discard' should now
@@ -1007,14 +1002,12 @@ bool Future<T>::await(const Duration& duration) const
bool pending = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == PENDING) {
pending = true;
data->onAnyCallbacks.push_back(lambda::bind(&internal::awaited, latch));
}
}
- internal::release(&data->lock);
if (pending) {
return latch->await(duration);
@@ -1058,15 +1051,13 @@ const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
{
bool run = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->discard) {
run = true;
} else if (data->state == PENDING) {
data->onDiscardCallbacks.emplace_back(std::move(callback));
}
}
- internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
@@ -1082,15 +1073,13 @@ const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
{
bool run = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == READY) {
run = true;
} else if (data->state == PENDING) {
data->onReadyCallbacks.emplace_back(std::move(callback));
}
}
- internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
@@ -1106,15 +1095,13 @@ const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
{
bool run = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == FAILED) {
run = true;
} else if (data->state == PENDING) {
data->onFailedCallbacks.emplace_back(std::move(callback));
}
}
- internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
@@ -1130,15 +1117,13 @@ const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
{
bool run = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == DISCARDED) {
run = true;
} else if (data->state == PENDING) {
data->onDiscardedCallbacks.emplace_back(std::move(callback));
}
}
- internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
@@ -1154,15 +1139,13 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
{
bool run = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == PENDING) {
data->onAnyCallbacks.emplace_back(std::move(callback));
} else {
run = true;
}
}
- internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
@@ -1172,7 +1155,6 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
return *this;
}
-
namespace internal {
// NOTE: We need to name this 'thenf' versus 'then' to distinguish it
@@ -1360,15 +1342,13 @@ bool Future<T>::set(const T& _t)
{
bool result = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == PENDING) {
data->t = new T(_t);
data->state = READY;
result = true;
}
}
- internal::release(&data->lock);
// Invoke all callbacks associated with this future being READY. We
// don't need a lock because the state is now in READY so there
@@ -1389,15 +1369,13 @@ bool Future<T>::fail(const std::string& _message)
{
bool result = false;
- internal::acquire(&data->lock);
- {
+ synchronized (data->lock) {
if (data->state == PENDING) {
data->message = new std::string(_message);
data->state = FAILED;
result = true;
}
}
- internal::release(&data->lock);
// Invoke all callbacks associated with this future being FAILED. We
// don't need a lock because the state is now in FAILED so there