You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/11/04 17:44:55 UTC
[1/2] git commit: Refactored Future to use shared_ptr.
Updated Branches:
refs/heads/master 605ffc9c4 -> 7bc7acc9f
Refactored Future to use shared_ptr.
Replaced manual reference counting with shared_ptr in Future. This
allows us to kill some nasty copy and deallocation functions in Future
(e.g. cleanup, copy).
From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/13602
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ae85e39c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ae85e39c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ae85e39c
Branch: refs/heads/master
Commit: ae85e39c1625fe6b80783cfe746fb1a67b70fd84
Parents: 605ffc9
Author: Benjamin Hindman <be...@gmail.com>
Authored: Mon Nov 4 06:35:58 2013 -1000
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Nov 4 06:37:03 2013 -1000
----------------------------------------------------------------------
3rdparty/libprocess/include/process/future.hpp | 324 +++++++-------------
1 file changed, 115 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ae85e39c/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index e03f8c9..f08fdf3 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -226,23 +226,31 @@ private:
void copy(const Future<T>& that);
void cleanup();
- enum State {
+ enum State
+ {
PENDING,
READY,
FAILED,
DISCARDED,
};
- int* refs;
- int* lock;
- State* state;
- T** t;
- std::string** message; // Message associated with failure.
- std::queue<ReadyCallback>* onReadyCallbacks;
- std::queue<FailedCallback>* onFailedCallbacks;
- std::queue<DiscardedCallback>* onDiscardedCallbacks;
- std::queue<AnyCallback>* onAnyCallbacks;
- Latch* latch;
+ struct Data
+ {
+ Data();
+ ~Data();
+
+ int lock;
+ Latch latch;
+ State state;
+ T* t;
+ std::string* message; // Message associated with failure.
+ std::queue<ReadyCallback> onReadyCallbacks;
+ std::queue<FailedCallback> onFailedCallbacks;
+ std::queue<DiscardedCallback> onDiscardedCallbacks;
+ std::queue<AnyCallback> onAnyCallbacks;
+ };
+
+ std::tr1::shared_ptr<Data> data;
};
@@ -482,31 +490,29 @@ Future<T> Future<T>::failed(const std::string& message)
template <typename T>
+Future<T>::Data::Data()
+ : lock(0),
+ state(PENDING),
+ t(NULL),
+ message(NULL) {}
+
+
+template <typename T>
+Future<T>::Data::~Data()
+{
+ delete t;
+ delete message;
+}
+
+
+template <typename T>
Future<T>::Future()
- : refs(new int(1)),
- lock(new int(0)),
- state(new State(PENDING)),
- t(new T*(NULL)),
- message(new std::string*(NULL)),
- onReadyCallbacks(new std::queue<ReadyCallback>()),
- onFailedCallbacks(new std::queue<FailedCallback>()),
- onDiscardedCallbacks(new std::queue<DiscardedCallback>()),
- onAnyCallbacks(new std::queue<AnyCallback>()),
- latch(new Latch()) {}
+ : data(new Data()) {}
template <typename T>
Future<T>::Future(const T& _t)
- : refs(new int(1)),
- lock(new int(0)),
- state(new State(PENDING)),
- t(new T*(NULL)),
- message(new std::string*(NULL)),
- onReadyCallbacks(new std::queue<ReadyCallback>()),
- onFailedCallbacks(new std::queue<FailedCallback>()),
- onDiscardedCallbacks(new std::queue<DiscardedCallback>()),
- onAnyCallbacks(new std::queue<AnyCallback>()),
- latch(new Latch())
+ : data(new Data())
{
set(_t);
}
@@ -514,15 +520,15 @@ Future<T>::Future(const T& _t)
template <typename T>
Future<T>::Future(const Future<T>& that)
-{
- copy(that);
-}
+ : data(that.data) {}
template <typename T>
Future<T>::~Future()
{
- cleanup();
+ if (data.unique()) {
+ discard();
+ }
}
@@ -530,8 +536,10 @@ template <typename T>
Future<T>& Future<T>::operator = (const Future<T>& that)
{
if (this != &that) {
- cleanup();
- copy(that);
+ if (data.unique()) {
+ discard();
+ }
+ data = that.data;
}
return *this;
}
@@ -540,18 +548,14 @@ Future<T>& Future<T>::operator = (const Future<T>& that)
template <typename T>
bool Future<T>::operator == (const Future<T>& that) const
{
- assert(latch != NULL);
- assert(that.latch != NULL);
- return *latch == *that.latch;
+ return data == that.data;
}
template <typename T>
bool Future<T>::operator < (const Future<T>& that) const
{
- assert(latch != NULL);
- assert(that.latch != NULL);
- return *latch < *that.latch;
+ return data < that.data;
}
@@ -560,32 +564,30 @@ bool Future<T>::discard()
{
bool result = false;
- assert(lock != NULL);
- internal::acquire(lock);
+ internal::acquire(&data->lock);
{
- assert(state != NULL);
- if (*state == PENDING) {
- *state = DISCARDED;
- latch->trigger();
+ if (data->state == PENDING) {
+ data->state = DISCARDED;
+ data->latch.trigger();
result = true;
}
}
- internal::release(lock);
+ internal::release(&data->lock);
// Invoke all callbacks associated with this future being
// DISCARDED. We don't need a lock because the state is now in
// DISCARDED so there should not be any concurrent modifications.
if (result) {
- while (!onDiscardedCallbacks->empty()) {
+ while (!data->onDiscardedCallbacks.empty()) {
// TODO(*): Invoke callbacks in another execution context.
- onDiscardedCallbacks->front()();
- onDiscardedCallbacks->pop();
+ data->onDiscardedCallbacks.front()();
+ data->onDiscardedCallbacks.pop();
}
- while (!onAnyCallbacks->empty()) {
+ while (!data->onAnyCallbacks.empty()) {
// TODO(*): Invoke callbacks in another execution context.
- onAnyCallbacks->front()(*this);
- onAnyCallbacks->pop();
+ data->onAnyCallbacks.front()(*this);
+ data->onAnyCallbacks.pop();
}
}
@@ -596,32 +598,28 @@ bool Future<T>::discard()
template <typename T>
bool Future<T>::isPending() const
{
- assert(state != NULL);
- return *state == PENDING;
+ return data->state == PENDING;
}
template <typename T>
bool Future<T>::isReady() const
{
- assert(state != NULL);
- return *state == READY;
+ return data->state == READY;
}
template <typename T>
bool Future<T>::isDiscarded() const
{
- assert(state != NULL);
- return *state == DISCARDED;
+ return data->state == DISCARDED;
}
template <typename T>
bool Future<T>::isFailed() const
{
- assert(state != NULL);
- return *state == FAILED;
+ return data->state == FAILED;
}
@@ -629,10 +627,10 @@ template <typename T>
bool Future<T>::await(const Duration& duration) const
{
if (!isReady() && !isDiscarded() && !isFailed()) {
- assert(latch != NULL);
- return latch->await(duration);
+ return data->latch.await(duration);
+ } else {
+ return true;
}
- return true;
}
@@ -655,20 +653,17 @@ T Future<T>::get() const
abort();
}
- assert(t != NULL);
- assert(*t != NULL);
- return **t;
+ assert(data->t != NULL);
+ return *data->t;
}
template <typename T>
std::string Future<T>::failure() const
{
- assert(message != NULL);
- if (*message != NULL) {
- return **message;
+ if (data->message != NULL) {
+ return *data->message;
}
-
return "";
}
@@ -678,21 +673,19 @@ const Future<T>& Future<T>::onReady(const ReadyCallback& callback) const
{
bool run = false;
- assert(lock != NULL);
- internal::acquire(lock);
+ internal::acquire(&data->lock);
{
- assert(state != NULL);
- if (*state == READY) {
+ if (data->state == READY) {
run = true;
- } else if (*state == PENDING) {
- onReadyCallbacks->push(callback);
+ } else if (data->state == PENDING) {
+ data->onReadyCallbacks.push(callback);
}
}
- internal::release(lock);
+ internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
- callback(**t);
+ callback(*data->t);
}
return *this;
@@ -704,21 +697,19 @@ const Future<T>& Future<T>::onFailed(const FailedCallback& callback) const
{
bool run = false;
- assert(lock != NULL);
- internal::acquire(lock);
+ internal::acquire(&data->lock);
{
- assert(state != NULL);
- if (*state == FAILED) {
+ if (data->state == FAILED) {
run = true;
- } else if (*state == PENDING) {
- onFailedCallbacks->push(callback);
+ } else if (data->state == PENDING) {
+ data->onFailedCallbacks.push(callback);
}
}
- internal::release(lock);
+ internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
- callback(**message);
+ callback(*data->message);
}
return *this;
@@ -731,17 +722,15 @@ const Future<T>& Future<T>::onDiscarded(
{
bool run = false;
- assert(lock != NULL);
- internal::acquire(lock);
+ internal::acquire(&data->lock);
{
- assert(state != NULL);
- if (*state == DISCARDED) {
+ if (data->state == DISCARDED) {
run = true;
- } else if (*state == PENDING) {
- onDiscardedCallbacks->push(callback);
+ } else if (data->state == PENDING) {
+ data->onDiscardedCallbacks.push(callback);
}
}
- internal::release(lock);
+ internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
@@ -757,17 +746,15 @@ const Future<T>& Future<T>::onAny(const AnyCallback& callback) const
{
bool run = false;
- assert(lock != NULL);
- internal::acquire(lock);
+ internal::acquire(&data->lock);
{
- assert(state != NULL);
- if (*state != PENDING) {
+ if (data->state != PENDING) {
run = true;
- } else if (*state == PENDING) {
- onAnyCallbacks->push(callback);
+ } else if (data->state == PENDING) {
+ data->onAnyCallbacks.push(callback);
}
}
- internal::release(lock);
+ internal::release(&data->lock);
// TODO(*): Invoke callback in another execution context.
if (run) {
@@ -906,33 +893,31 @@ bool Future<T>::set(const T& _t)
{
bool result = false;
- assert(lock != NULL);
- internal::acquire(lock);
+ internal::acquire(&data->lock);
{
- assert(state != NULL);
- if (*state == PENDING) {
- *t = new T(_t);
- *state = READY;
- latch->trigger();
+ if (data->state == PENDING) {
+ data->t = new T(_t);
+ data->state = READY;
+ data->latch.trigger();
result = true;
}
}
- internal::release(lock);
+ internal::release(&data->lock);
// Invoke all callbacks associated with this future being READY. We
// don't need a lock because the state is now in READY so there
// should not be any concurrent modications.
if (result) {
- while (!onReadyCallbacks->empty()) {
+ while (!data->onReadyCallbacks.empty()) {
// TODO(*): Invoke callbacks in another execution context.
- onReadyCallbacks->front()(**t);
- onReadyCallbacks->pop();
+ data->onReadyCallbacks.front()(*data->t);
+ data->onReadyCallbacks.pop();
}
- while (!onAnyCallbacks->empty()) {
+ while (!data->onAnyCallbacks.empty()) {
// TODO(*): Invoke callbacks in another execution context.
- onAnyCallbacks->front()(*this);
- onAnyCallbacks->pop();
+ data->onAnyCallbacks.front()(*this);
+ data->onAnyCallbacks.pop();
}
}
@@ -945,116 +930,37 @@ bool Future<T>::fail(const std::string& _message)
{
bool result = false;
- assert(lock != NULL);
- internal::acquire(lock);
+ internal::acquire(&data->lock);
{
- assert(state != NULL);
- if (*state == PENDING) {
- *message = new std::string(_message);
- *state = FAILED;
- latch->trigger();
+ if (data->state == PENDING) {
+ data->message = new std::string(_message);
+ data->state = FAILED;
+ data->latch.trigger();
result = true;
}
}
- internal::release(lock);
+ internal::release(&data->lock);
// Invoke all callbacks associated with this future being FAILED. We
// don't need a lock because the state is now in FAILED so there
// should not be any concurrent modications.
if (result) {
- while (!onFailedCallbacks->empty()) {
+ while (!data->onFailedCallbacks.empty()) {
// TODO(*): Invoke callbacks in another execution context.
- onFailedCallbacks->front()(**message);
- onFailedCallbacks->pop();
+ data->onFailedCallbacks.front()(*data->message);
+ data->onFailedCallbacks.pop();
}
- while (!onAnyCallbacks->empty()) {
+ while (!data->onAnyCallbacks.empty()) {
// TODO(*): Invoke callbacks in another execution context.
- onAnyCallbacks->front()(*this);
- onAnyCallbacks->pop();
+ data->onAnyCallbacks.front()(*this);
+ data->onAnyCallbacks.pop();
}
}
return result;
}
-
-template <typename T>
-void Future<T>::copy(const Future<T>& that)
-{
- assert(that.refs > 0);
- __sync_fetch_and_add(that.refs, 1);
- refs = that.refs;
- lock = that.lock;
- state = that.state;
- t = that.t;
- message = that.message;
- onReadyCallbacks = that.onReadyCallbacks;
- onFailedCallbacks = that.onFailedCallbacks;
- onDiscardedCallbacks = that.onDiscardedCallbacks;
- onAnyCallbacks = that.onAnyCallbacks;
- latch = that.latch;
-}
-
-
-template <typename T>
-void Future<T>::cleanup()
-{
- assert(refs != NULL);
- if (__sync_sub_and_fetch(refs, 1) == 0) {
- // Discard the future if it is still pending (so we invoke any
- // discarded callbacks that have been setup). Note that we put the
- // reference count back at 1 here in case one of the callbacks
- // decides it wants to keep a reference.
- assert(state != NULL);
- if (*state == PENDING) {
- *refs = 1;
- discard();
- __sync_sub_and_fetch(refs, 1);
- }
-
- // Now try and cleanup again (this time we know the future has
- // either been discarded or was not pending). Note that one of the
- // callbacks might have stored the future, in which case we'll
- // just return without doing anything, but the state will forever
- // be "discarded".
- assert(refs != NULL);
- if (*refs == 0) {
- delete refs;
- refs = NULL;
- assert(lock != NULL);
- delete lock;
- lock = NULL;
- assert(state != NULL);
- delete state;
- state = NULL;
- assert(t != NULL);
- delete *t;
- delete t;
- t = NULL;
- assert(message != NULL);
- delete *message;
- delete message;
- message = NULL;
- assert(onReadyCallbacks != NULL);
- delete onReadyCallbacks;
- onReadyCallbacks = NULL;
- assert(onFailedCallbacks != NULL);
- delete onFailedCallbacks;
- onFailedCallbacks = NULL;
- assert(onDiscardedCallbacks != NULL);
- delete onDiscardedCallbacks;
- onDiscardedCallbacks = NULL;
- assert(onAnyCallbacks != NULL);
- delete onAnyCallbacks;
- onAnyCallbacks = NULL;
- assert(latch != NULL);
- delete latch;
- latch = NULL;
- }
- }
-}
-
} // namespace process {
#endif // __PROCESS_FUTURE_HPP__
[2/2] git commit: Create Latch in Future Lazily.
Posted by be...@apache.org.
Create Latch in Future Lazily.
No need to create a latch if we don't call Future.await(). Given that
Future.await() is rare in the code base, we should have a big
performance improvement. I am attaching the experiment results.
From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/15190
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7bc7acc9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7bc7acc9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7bc7acc9
Branch: refs/heads/master
Commit: 7bc7acc9f3a31359cc2b0130d26acd20e7cb96af
Parents: ae85e39
Author: Benjamin Hindman <be...@gmail.com>
Authored: Mon Nov 4 06:43:46 2013 -1000
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Nov 4 06:44:26 2013 -1000
----------------------------------------------------------------------
3rdparty/libprocess/include/process/future.hpp | 37 ++++++++++++++++-----
1 file changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7bc7acc9/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index f08fdf3..1625d1d 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -240,7 +240,7 @@ private:
~Data();
int lock;
- Latch latch;
+ Latch* latch;
State state;
T* t;
std::string* message; // Message associated with failure.
@@ -492,6 +492,7 @@ Future<T> Future<T>::failed(const std::string& message)
template <typename T>
Future<T>::Data::Data()
: lock(0),
+ latch(NULL),
state(PENDING),
t(NULL),
message(NULL) {}
@@ -500,6 +501,7 @@ Future<T>::Data::Data()
template <typename T>
Future<T>::Data::~Data()
{
+ delete latch;
delete t;
delete message;
}
@@ -568,7 +570,9 @@ bool Future<T>::discard()
{
if (data->state == PENDING) {
data->state = DISCARDED;
- data->latch.trigger();
+ if (data->latch != NULL) {
+ data->latch->trigger();
+ }
result = true;
}
}
@@ -626,11 +630,24 @@ bool Future<T>::isFailed() const
template <typename T>
bool Future<T>::await(const Duration& duration) const
{
- if (!isReady() && !isDiscarded() && !isFailed()) {
- return data->latch.await(duration);
- } else {
- return true;
+ bool await = false;
+
+ internal::acquire(&data->lock);
+ {
+ if (data->state == PENDING) {
+ if (data->latch == NULL) {
+ data->latch = new Latch();
+ }
+ await = true;
+ }
}
+ internal::release(&data->lock);
+
+ if (await) {
+ return data->latch->await(duration);
+ }
+
+ return true;
}
@@ -898,7 +915,9 @@ bool Future<T>::set(const T& _t)
if (data->state == PENDING) {
data->t = new T(_t);
data->state = READY;
- data->latch.trigger();
+ if (data->latch != NULL) {
+ data->latch->trigger();
+ }
result = true;
}
}
@@ -935,7 +954,9 @@ bool Future<T>::fail(const std::string& _message)
if (data->state == PENDING) {
data->message = new std::string(_message);
data->state = FAILED;
- data->latch.trigger();
+ if (data->latch != NULL) {
+ data->latch->trigger();
+ }
result = true;
}
}