You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/09/28 06:00:20 UTC
mesos git commit: Protect against Future callbacks deleting instance.
Repository: mesos
Updated Branches:
refs/heads/master 89c6cea11 -> d47641039
Protect against Future callbacks deleting instance.
See MESOS-8010.
Review: https://reviews.apache.org/r/62609
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d4764103
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d4764103
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d4764103
Branch: refs/heads/master
Commit: d47641039f5e2dd18af007250ef7ae2a34258a2d
Parents: 89c6cea
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Sep 26 20:13:11 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Sep 27 22:59:39 2017 -0700
----------------------------------------------------------------------
3rdparty/libprocess/include/process/future.hpp | 53 ++++++++++-----------
1 file changed, 26 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d4764103/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index a11a588..4cf44ba 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -880,21 +880,24 @@ void discard(const std::list<Future<T>>& futures)
template <typename T>
bool Promise<T>::discard(Future<T> future)
{
- std::shared_ptr<typename Future<T>::Data> data = future.data;
-
bool result = false;
- synchronized (data->lock) {
- if (data->state == Future<T>::PENDING) {
- data->state = Future<T>::DISCARDED;
+ synchronized (future.data->lock) {
+ if (future.data->state == Future<T>::PENDING) {
+ future.data->state = Future<T>::DISCARDED;
result = true;
}
}
// Invoke all callbacks associated with this future being
// DISCARDED. We don't need a lock because the state is now in
- // DISCARDED so there should not be any concurrent modifications.
+ // DISCARDED so there should not be any concurrent modifications to
+ // the callbacks.
if (result) {
+ // NOTE: we rely on the fact that we have `future` to protect
+ // ourselves from one of the callbacks erroneously deleting the
+ // future. In `Future::_set()` and `Future::fail()` we have to
+ // explicitly take a copy to protect ourselves.
internal::run(future.data->onDiscardedCallbacks);
internal::run(future.data->onAnyCallbacks, future);
@@ -1034,23 +1037,13 @@ bool Future<T>::discard()
if (!data->discard && data->state == PENDING) {
result = data->discard = true;
- // NOTE: We make a copy of the onDiscard callbacks here
- // because it is possible that another thread completes this
- // future (ready, failed or discarded) when the current thread
- // is out of this critical section but *before* it executed the
- // onDiscard callbacks. If that happens, the other thread might
- // be clearing the onDiscard callbacks (via clearAllCallbacks())
- // while the current thread is executing or clearing the
- // onDiscard callbacks, causing thread safety issue.
- callbacks = data->onDiscardCallbacks;
- data->onDiscardCallbacks.clear();
+ callbacks.swap(data->onDiscardCallbacks);
}
}
// Invoke all callbacks associated with doing a discard on this
- // future. We don't need a lock because 'Data::discard' should now
- // be set so we won't be adding anything else to
- // 'Data::onDiscardCallbacks'.
+ // future. The callbacks get destroyed when we exit from the
+ // function.
if (result) {
internal::run(callbacks);
}
@@ -1535,12 +1528,15 @@ bool Future<T>::_set(U&& u)
// Invoke all callbacks associated with this future being READY. We
// don't need a lock because the state is now in READY so there
- // should not be any concurrent modifications.
+ // should not be any concurrent modifications to the callbacks.
if (result) {
- internal::run(data->onReadyCallbacks, data->result.get());
- internal::run(data->onAnyCallbacks, *this);
+ // Grab a copy of `data` just in case invoking the callbacks
+ // erroneously attempts to delete this future.
+ std::shared_ptr<typename Future<T>::Data> copy = data;
+ internal::run(copy->onReadyCallbacks, copy->result.get());
+ internal::run(copy->onAnyCallbacks, *this);
- data->clearAllCallbacks();
+ copy->clearAllCallbacks();
}
return result;
@@ -1562,12 +1558,15 @@ bool Future<T>::fail(const std::string& _message)
// Invoke all callbacks associated with this future being FAILED. We
// don't need a lock because the state is now in FAILED so there
- // should not be any concurrent modifications.
+ // should not be any concurrent modifications to the callbacks.
if (result) {
- internal::run(data->onFailedCallbacks, data->result.error());
- internal::run(data->onAnyCallbacks, *this);
+ // Grab a copy of `data` just in case invoking the callbacks
+ // erroneously attempts to delete this future.
+ std::shared_ptr<typename Future<T>::Data> copy = data;
+ internal::run(copy->onFailedCallbacks, copy->result.error());
+ internal::run(copy->onAnyCallbacks, *this);
- data->clearAllCallbacks();
+ copy->clearAllCallbacks();
}
return result;