You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2017/09/28 06:00:20 UTC

mesos git commit: Protect against Future callbacks deleting instance.

Repository: mesos
Updated Branches:
  refs/heads/master 89c6cea11 -> d47641039


Protect against Future callbacks deleting instance.

See MESOS-8010.

Review: https://reviews.apache.org/r/62609


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d4764103
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d4764103
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d4764103

Branch: refs/heads/master
Commit: d47641039f5e2dd18af007250ef7ae2a34258a2d
Parents: 89c6cea
Author: Benjamin Hindman <be...@gmail.com>
Authored: Tue Sep 26 20:13:11 2017 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Sep 27 22:59:39 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp | 53 ++++++++++-----------
 1 file changed, 26 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d4764103/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index a11a588..4cf44ba 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -880,21 +880,24 @@ void discard(const std::list<Future<T>>& futures)
 template <typename T>
 bool Promise<T>::discard(Future<T> future)
 {
-  std::shared_ptr<typename Future<T>::Data> data = future.data;
-
   bool result = false;
 
-  synchronized (data->lock) {
-    if (data->state == Future<T>::PENDING) {
-      data->state = Future<T>::DISCARDED;
+  synchronized (future.data->lock) {
+    if (future.data->state == Future<T>::PENDING) {
+      future.data->state = Future<T>::DISCARDED;
       result = true;
     }
   }
 
   // Invoke all callbacks associated with this future being
   // DISCARDED. We don't need a lock because the state is now in
-  // DISCARDED so there should not be any concurrent modifications.
+  // DISCARDED so there should not be any concurrent modifications to
+  // the callbacks.
   if (result) {
+    // NOTE: we rely on the fact that we have `future` to protect
+    // ourselves from one of the callbacks erroneously deleting the
+    // future. In `Future::_set()` and `Future::fail()` we have to
+    // explicitly take a copy to protect ourselves.
     internal::run(future.data->onDiscardedCallbacks);
     internal::run(future.data->onAnyCallbacks, future);
 
@@ -1034,23 +1037,13 @@ bool Future<T>::discard()
     if (!data->discard && data->state == PENDING) {
       result = data->discard = true;
 
-      // NOTE: We make a copy of the onDiscard callbacks here
-      // because it is possible that another thread completes this
-      // future (ready, failed or discarded) when the current thread
-      // is out of this critical section but *before* it executed the
-      // onDiscard callbacks. If that happens, the other thread might
-      // be clearing the onDiscard callbacks (via clearAllCallbacks())
-      // while the current thread is executing or clearing the
-      // onDiscard callbacks, causing thread safety issue.
-      callbacks = data->onDiscardCallbacks;
-      data->onDiscardCallbacks.clear();
+      callbacks.swap(data->onDiscardCallbacks);
     }
   }
 
   // Invoke all callbacks associated with doing a discard on this
-  // future. We don't need a lock because 'Data::discard' should now
-  // be set so we won't be adding anything else to
-  // 'Data::onDiscardCallbacks'.
+  // future. The callbacks get destroyed when we exit from the
+  // function.
   if (result) {
     internal::run(callbacks);
   }
@@ -1535,12 +1528,15 @@ bool Future<T>::_set(U&& u)
 
   // Invoke all callbacks associated with this future being READY. We
   // don't need a lock because the state is now in READY so there
-  // should not be any concurrent modifications.
+  // should not be any concurrent modifications to the callbacks.
   if (result) {
-    internal::run(data->onReadyCallbacks, data->result.get());
-    internal::run(data->onAnyCallbacks, *this);
+    // Grab a copy of `data` just in case invoking the callbacks
+    // erroneously attempts to delete this future.
+    std::shared_ptr<typename Future<T>::Data> copy = data;
+    internal::run(copy->onReadyCallbacks, copy->result.get());
+    internal::run(copy->onAnyCallbacks, *this);
 
-    data->clearAllCallbacks();
+    copy->clearAllCallbacks();
   }
 
   return result;
@@ -1562,12 +1558,15 @@ bool Future<T>::fail(const std::string& _message)
 
   // Invoke all callbacks associated with this future being FAILED. We
   // don't need a lock because the state is now in FAILED so there
-  // should not be any concurrent modifications.
+  // should not be any concurrent modifications to the callbacks.
   if (result) {
-    internal::run(data->onFailedCallbacks, data->result.error());
-    internal::run(data->onAnyCallbacks, *this);
+    // Grab a copy of `data` just in case invoking the callbacks
+    // erroneously attempts to delete this future.
+    std::shared_ptr<typename Future<T>::Data> copy = data;
+    internal::run(copy->onFailedCallbacks, copy->result.error());
+    internal::run(copy->onAnyCallbacks, *this);
 
-    data->clearAllCallbacks();
+    copy->clearAllCallbacks();
   }
 
   return result;