You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/11/21 17:04:34 UTC
[4/4] mesos git commit: Replace std::queue with std::vector in
Libprocess Future.
Replace std::queue with std::vector in Libprocess Future.
Use std::vector instead of std::queue to reduce dynamic allocations
and improve performance.
Review: https://reviews.apache.org/r/28196
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/da7b4ac6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/da7b4ac6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/da7b4ac6
Branch: refs/heads/master
Commit: da7b4ac6c19e932e10d6c0321a2afcb7198883b5
Parents: 2885c80
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Fri Nov 21 08:04:04 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Nov 21 08:04:05 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/include/process/future.hpp | 50 +++++++++++----------
1 file changed, 27 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/da7b4ac6/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 608934d..0326b23 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -6,11 +6,11 @@
#include <iostream>
#include <list>
-#include <queue>
#include <set>
#if __cplusplus >= 201103L
#include <type_traits>
#endif // __cplusplus >= 201103L
+#include <vector>
#include <glog/logging.h>
@@ -547,11 +547,11 @@ private:
bool associated;
T* t;
std::string* message; // Message associated with failure.
- std::queue<DiscardCallback> onDiscardCallbacks;
- std::queue<ReadyCallback> onReadyCallbacks;
- std::queue<FailedCallback> onFailedCallbacks;
- std::queue<DiscardedCallback> onDiscardedCallbacks;
- std::queue<AnyCallback> onAnyCallbacks;
+ std::vector<DiscardCallback> onDiscardCallbacks;
+ std::vector<ReadyCallback> onReadyCallbacks;
+ std::vector<FailedCallback> onFailedCallbacks;
+ std::vector<DiscardedCallback> onDiscardedCallbacks;
+ std::vector<AnyCallback> onAnyCallbacks;
};
// Sets the value for this future, unless the future is already set,
@@ -570,13 +570,10 @@ namespace internal {
// Helper for executing callbacks that have been registered.
template <typename C, typename... Arguments>
- void run(std::queue<C>& callbacks, Arguments&&... arguments)
+ void run(const std::vector<C>& callbacks, Arguments&&... arguments)
{
- // TODO(jmlvanre): replace this with a vector and make parameter
- // const since we can iterate it safely.
- while (!callbacks.empty()) {
- callbacks.front()(std::forward<Arguments>(arguments)...);
- callbacks.pop();
+ for (size_t i = 0; i < callbacks.size(); ++i) {
+ callbacks[i](std::forward<Arguments>(arguments)...);
}
}
@@ -947,8 +944,10 @@ bool Promise<T>::discard(Future<T> future)
// DISCARDED so there should not be any concurrent modifications.
if (result) {
internal::run(future.data->onDiscardedCallbacks);
+ future.data->onDiscardedCallbacks.clear();
internal::run(future.data->onAnyCallbacks, future);
+ future.data->onAnyCallbacks.clear();
}
return result;
@@ -1083,6 +1082,7 @@ bool Future<T>::discard()
// 'Data::onDiscardCallbacks'.
if (result) {
internal::run(data->onDiscardCallbacks);
+ data->onDiscardCallbacks.clear();
}
return result;
@@ -1157,7 +1157,7 @@ bool Future<T>::await(const Duration& duration) const
{
if (data->state == PENDING) {
pending = true;
- data->onAnyCallbacks.push(lambda::bind(&internal::awaited, latch));
+ data->onAnyCallbacks.push_back(lambda::bind(&internal::awaited, latch));
}
}
internal::release(&data->lock);
@@ -1210,7 +1210,7 @@ const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
if (data->discard) {
run = true;
} else if (data->state == PENDING) {
- data->onDiscardCallbacks.push(std::move(callback));
+ data->onDiscardCallbacks.emplace_back(std::move(callback));
}
}
internal::release(&data->lock);
@@ -1234,7 +1234,7 @@ const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
if (data->state == READY) {
run = true;
} else if (data->state == PENDING) {
- data->onReadyCallbacks.push(std::move(callback));
+ data->onReadyCallbacks.emplace_back(std::move(callback));
}
}
internal::release(&data->lock);
@@ -1258,7 +1258,7 @@ const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
if (data->state == FAILED) {
run = true;
} else if (data->state == PENDING) {
- data->onFailedCallbacks.push(std::move(callback));
+ data->onFailedCallbacks.emplace_back(std::move(callback));
}
}
internal::release(&data->lock);
@@ -1282,7 +1282,7 @@ const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
if (data->state == DISCARDED) {
run = true;
} else if (data->state == PENDING) {
- data->onDiscardedCallbacks.push(std::move(callback));
+ data->onDiscardedCallbacks.emplace_back(std::move(callback));
}
}
internal::release(&data->lock);
@@ -1304,7 +1304,7 @@ const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
internal::acquire(&data->lock);
{
if (data->state == PENDING) {
- data->onAnyCallbacks.push(std::move(callback));
+ data->onAnyCallbacks.emplace_back(std::move(callback));
} else {
run = true;
}
@@ -1330,7 +1330,7 @@ const Future<T>& Future<T>::onDiscard(const DiscardCallback& callback) const
if (data->discard) {
run = true;
} else if (data->state == PENDING) {
- data->onDiscardCallbacks.push(callback);
+ data->onDiscardCallbacks.push_back(callback);
}
}
internal::release(&data->lock);
@@ -1354,7 +1354,7 @@ const Future<T>& Future<T>::onReady(const ReadyCallback& callback) const
if (data->state == READY) {
run = true;
} else if (data->state == PENDING) {
- data->onReadyCallbacks.push(callback);
+ data->onReadyCallbacks.push_back(callback);
}
}
internal::release(&data->lock);
@@ -1378,7 +1378,7 @@ const Future<T>& Future<T>::onFailed(const FailedCallback& callback) const
if (data->state == FAILED) {
run = true;
} else if (data->state == PENDING) {
- data->onFailedCallbacks.push(callback);
+ data->onFailedCallbacks.push_back(callback);
}
}
internal::release(&data->lock);
@@ -1403,7 +1403,7 @@ const Future<T>& Future<T>::onDiscarded(
if (data->state == DISCARDED) {
run = true;
} else if (data->state == PENDING) {
- data->onDiscardedCallbacks.push(callback);
+ data->onDiscardedCallbacks.push_back(callback);
}
}
internal::release(&data->lock);
@@ -1425,7 +1425,7 @@ const Future<T>& Future<T>::onAny(const AnyCallback& callback) const
internal::acquire(&data->lock);
{
if (data->state == PENDING) {
- data->onAnyCallbacks.push(callback);
+ data->onAnyCallbacks.push_back(callback);
} else {
run = true;
}
@@ -1609,8 +1609,10 @@ bool Future<T>::set(const T& _t)
// should not be any concurrent modications.
if (result) {
internal::run(data->onReadyCallbacks, *data->t);
+ data->onReadyCallbacks.clear();
internal::run(data->onAnyCallbacks, *this);
+ data->onAnyCallbacks.clear();
}
return result;
@@ -1637,8 +1639,10 @@ bool Future<T>::fail(const std::string& _message)
// should not be any concurrent modications.
if (result) {
internal::run(data->onFailedCallbacks, *data->message);
+ data->onFailedCallbacks.clear();
internal::run(data->onAnyCallbacks, *this);
+ data->onAnyCallbacks.clear();
}
return result;