You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/11/04 17:44:55 UTC

[1/2] git commit: Refactored Future to use shared_ptr.

Updated Branches:
  refs/heads/master 605ffc9c4 -> 7bc7acc9f


Refactored Future to use shared_ptr.

Replaced manual reference counting with shared_ptr in Future. This
allows us to kill some nasty copy and deallocation functions in Future
(e.g. cleanup, copy).

From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/13602


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ae85e39c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ae85e39c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ae85e39c

Branch: refs/heads/master
Commit: ae85e39c1625fe6b80783cfe746fb1a67b70fd84
Parents: 605ffc9
Author: Benjamin Hindman <be...@gmail.com>
Authored: Mon Nov 4 06:35:58 2013 -1000
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Nov 4 06:37:03 2013 -1000

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp | 324 +++++++-------------
 1 file changed, 115 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ae85e39c/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index e03f8c9..f08fdf3 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -226,23 +226,31 @@ private:
   void copy(const Future<T>& that);
   void cleanup();
 
-  enum State {
+  enum State
+  {
     PENDING,
     READY,
     FAILED,
     DISCARDED,
   };
 
-  int* refs;
-  int* lock;
-  State* state;
-  T** t;
-  std::string** message; // Message associated with failure.
-  std::queue<ReadyCallback>* onReadyCallbacks;
-  std::queue<FailedCallback>* onFailedCallbacks;
-  std::queue<DiscardedCallback>* onDiscardedCallbacks;
-  std::queue<AnyCallback>* onAnyCallbacks;
-  Latch* latch;
+  struct Data
+  {
+    Data();
+    ~Data();
+
+    int lock;
+    Latch latch;
+    State state;
+    T* t;
+    std::string* message; // Message associated with failure.
+    std::queue<ReadyCallback> onReadyCallbacks;
+    std::queue<FailedCallback> onFailedCallbacks;
+    std::queue<DiscardedCallback> onDiscardedCallbacks;
+    std::queue<AnyCallback> onAnyCallbacks;
+  };
+
+  std::tr1::shared_ptr<Data> data;
 };
 
 
@@ -482,31 +490,29 @@ Future<T> Future<T>::failed(const std::string& message)
 
 
 template <typename T>
+Future<T>::Data::Data()
+  : lock(0),
+    state(PENDING),
+    t(NULL),
+    message(NULL) {}
+
+
+template <typename T>
+Future<T>::Data::~Data()
+{
+  delete t;
+  delete message;
+}
+
+
+template <typename T>
 Future<T>::Future()
-  : refs(new int(1)),
-    lock(new int(0)),
-    state(new State(PENDING)),
-    t(new T*(NULL)),
-    message(new std::string*(NULL)),
-    onReadyCallbacks(new std::queue<ReadyCallback>()),
-    onFailedCallbacks(new std::queue<FailedCallback>()),
-    onDiscardedCallbacks(new std::queue<DiscardedCallback>()),
-    onAnyCallbacks(new std::queue<AnyCallback>()),
-    latch(new Latch()) {}
+  : data(new Data()) {}
 
 
 template <typename T>
 Future<T>::Future(const T& _t)
-  : refs(new int(1)),
-    lock(new int(0)),
-    state(new State(PENDING)),
-    t(new T*(NULL)),
-    message(new std::string*(NULL)),
-    onReadyCallbacks(new std::queue<ReadyCallback>()),
-    onFailedCallbacks(new std::queue<FailedCallback>()),
-    onDiscardedCallbacks(new std::queue<DiscardedCallback>()),
-    onAnyCallbacks(new std::queue<AnyCallback>()),
-    latch(new Latch())
+  : data(new Data())
 {
   set(_t);
 }
@@ -514,15 +520,15 @@ Future<T>::Future(const T& _t)
 
 template <typename T>
 Future<T>::Future(const Future<T>& that)
-{
-  copy(that);
-}
+  : data(that.data) {}
 
 
 template <typename T>
 Future<T>::~Future()
 {
-  cleanup();
+  if (data.unique()) {
+    discard();
+  }
 }
 
 
@@ -530,8 +536,10 @@ template <typename T>
 Future<T>& Future<T>::operator = (const Future<T>& that)
 {
   if (this != &that) {
-    cleanup();
-    copy(that);
+    if (data.unique()) {
+      discard();
+    }
+    data = that.data;
   }
   return *this;
 }
@@ -540,18 +548,14 @@ Future<T>& Future<T>::operator = (const Future<T>& that)
 template <typename T>
 bool Future<T>::operator == (const Future<T>& that) const
 {
-  assert(latch != NULL);
-  assert(that.latch != NULL);
-  return *latch == *that.latch;
+  return data == that.data;
 }
 
 
 template <typename T>
 bool Future<T>::operator < (const Future<T>& that) const
 {
-  assert(latch != NULL);
-  assert(that.latch != NULL);
-  return *latch < *that.latch;
+  return data < that.data;
 }
 
 
@@ -560,32 +564,30 @@ bool Future<T>::discard()
 {
   bool result = false;
 
-  assert(lock != NULL);
-  internal::acquire(lock);
+  internal::acquire(&data->lock);
   {
-    assert(state != NULL);
-    if (*state == PENDING) {
-      *state = DISCARDED;
-      latch->trigger();
+    if (data->state == PENDING) {
+      data->state = DISCARDED;
+      data->latch.trigger();
       result = true;
     }
   }
-  internal::release(lock);
+  internal::release(&data->lock);
 
   // Invoke all callbacks associated with this future being
   // DISCARDED. We don't need a lock because the state is now in
   // DISCARDED so there should not be any concurrent modifications.
   if (result) {
-    while (!onDiscardedCallbacks->empty()) {
+    while (!data->onDiscardedCallbacks.empty()) {
       // TODO(*): Invoke callbacks in another execution context.
-      onDiscardedCallbacks->front()();
-      onDiscardedCallbacks->pop();
+      data->onDiscardedCallbacks.front()();
+      data->onDiscardedCallbacks.pop();
     }
 
-    while (!onAnyCallbacks->empty()) {
+    while (!data->onAnyCallbacks.empty()) {
       // TODO(*): Invoke callbacks in another execution context.
-      onAnyCallbacks->front()(*this);
-      onAnyCallbacks->pop();
+      data->onAnyCallbacks.front()(*this);
+      data->onAnyCallbacks.pop();
     }
   }
 
@@ -596,32 +598,28 @@ bool Future<T>::discard()
 template <typename T>
 bool Future<T>::isPending() const
 {
-  assert(state != NULL);
-  return *state == PENDING;
+  return data->state == PENDING;
 }
 
 
 template <typename T>
 bool Future<T>::isReady() const
 {
-  assert(state != NULL);
-  return *state == READY;
+  return data->state == READY;
 }
 
 
 template <typename T>
 bool Future<T>::isDiscarded() const
 {
-  assert(state != NULL);
-  return *state == DISCARDED;
+  return data->state == DISCARDED;
 }
 
 
 template <typename T>
 bool Future<T>::isFailed() const
 {
-  assert(state != NULL);
-  return *state == FAILED;
+  return data->state == FAILED;
 }
 
 
@@ -629,10 +627,10 @@ template <typename T>
 bool Future<T>::await(const Duration& duration) const
 {
   if (!isReady() && !isDiscarded() && !isFailed()) {
-    assert(latch != NULL);
-    return latch->await(duration);
+    return data->latch.await(duration);
+  } else {
+    return true;
   }
-  return true;
 }
 
 
@@ -655,20 +653,17 @@ T Future<T>::get() const
     abort();
   }
 
-  assert(t != NULL);
-  assert(*t != NULL);
-  return **t;
+  assert(data->t != NULL);
+  return *data->t;
 }
 
 
 template <typename T>
 std::string Future<T>::failure() const
 {
-  assert(message != NULL);
-  if (*message != NULL) {
-    return **message;
+  if (data->message != NULL) {
+    return *data->message;
   }
-
   return "";
 }
 
@@ -678,21 +673,19 @@ const Future<T>& Future<T>::onReady(const ReadyCallback& callback) const
 {
   bool run = false;
 
-  assert(lock != NULL);
-  internal::acquire(lock);
+  internal::acquire(&data->lock);
   {
-    assert(state != NULL);
-    if (*state == READY) {
+    if (data->state == READY) {
       run = true;
-    } else if (*state == PENDING) {
-      onReadyCallbacks->push(callback);
+    } else if (data->state == PENDING) {
+      data->onReadyCallbacks.push(callback);
     }
   }
-  internal::release(lock);
+  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(**t);
+    callback(*data->t);
   }
 
   return *this;
@@ -704,21 +697,19 @@ const Future<T>& Future<T>::onFailed(const FailedCallback& callback) const
 {
   bool run = false;
 
-  assert(lock != NULL);
-  internal::acquire(lock);
+  internal::acquire(&data->lock);
   {
-    assert(state != NULL);
-    if (*state == FAILED) {
+    if (data->state == FAILED) {
       run = true;
-    } else if (*state == PENDING) {
-      onFailedCallbacks->push(callback);
+    } else if (data->state == PENDING) {
+      data->onFailedCallbacks.push(callback);
     }
   }
-  internal::release(lock);
+  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
-    callback(**message);
+    callback(*data->message);
   }
 
   return *this;
@@ -731,17 +722,15 @@ const Future<T>& Future<T>::onDiscarded(
 {
   bool run = false;
 
-  assert(lock != NULL);
-  internal::acquire(lock);
+  internal::acquire(&data->lock);
   {
-    assert(state != NULL);
-    if (*state == DISCARDED) {
+    if (data->state == DISCARDED) {
       run = true;
-    } else if (*state == PENDING) {
-      onDiscardedCallbacks->push(callback);
+    } else if (data->state == PENDING) {
+      data->onDiscardedCallbacks.push(callback);
     }
   }
-  internal::release(lock);
+  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
@@ -757,17 +746,15 @@ const Future<T>& Future<T>::onAny(const AnyCallback& callback) const
 {
   bool run = false;
 
-  assert(lock != NULL);
-  internal::acquire(lock);
+  internal::acquire(&data->lock);
   {
-    assert(state != NULL);
-    if (*state != PENDING) {
+    if (data->state != PENDING) {
       run = true;
-    } else if (*state == PENDING) {
-      onAnyCallbacks->push(callback);
+    } else if (data->state == PENDING) {
+      data->onAnyCallbacks.push(callback);
     }
   }
-  internal::release(lock);
+  internal::release(&data->lock);
 
   // TODO(*): Invoke callback in another execution context.
   if (run) {
@@ -906,33 +893,31 @@ bool Future<T>::set(const T& _t)
 {
   bool result = false;
 
-  assert(lock != NULL);
-  internal::acquire(lock);
+  internal::acquire(&data->lock);
   {
-    assert(state != NULL);
-    if (*state == PENDING) {
-      *t = new T(_t);
-      *state = READY;
-      latch->trigger();
+    if (data->state == PENDING) {
+      data->t = new T(_t);
+      data->state = READY;
+      data->latch.trigger();
       result = true;
     }
   }
-  internal::release(lock);
+  internal::release(&data->lock);
 
   // Invoke all callbacks associated with this future being READY. We
   // don't need a lock because the state is now in READY so there
   // should not be any concurrent modications.
   if (result) {
-    while (!onReadyCallbacks->empty()) {
+    while (!data->onReadyCallbacks.empty()) {
       // TODO(*): Invoke callbacks in another execution context.
-      onReadyCallbacks->front()(**t);
-      onReadyCallbacks->pop();
+      data->onReadyCallbacks.front()(*data->t);
+      data->onReadyCallbacks.pop();
     }
 
-    while (!onAnyCallbacks->empty()) {
+    while (!data->onAnyCallbacks.empty()) {
       // TODO(*): Invoke callbacks in another execution context.
-      onAnyCallbacks->front()(*this);
-      onAnyCallbacks->pop();
+      data->onAnyCallbacks.front()(*this);
+      data->onAnyCallbacks.pop();
     }
   }
 
@@ -945,116 +930,37 @@ bool Future<T>::fail(const std::string& _message)
 {
   bool result = false;
 
-  assert(lock != NULL);
-  internal::acquire(lock);
+  internal::acquire(&data->lock);
   {
-    assert(state != NULL);
-    if (*state == PENDING) {
-      *message = new std::string(_message);
-      *state = FAILED;
-      latch->trigger();
+    if (data->state == PENDING) {
+      data->message = new std::string(_message);
+      data->state = FAILED;
+      data->latch.trigger();
       result = true;
     }
   }
-  internal::release(lock);
+  internal::release(&data->lock);
 
   // Invoke all callbacks associated with this future being FAILED. We
   // don't need a lock because the state is now in FAILED so there
   // should not be any concurrent modications.
   if (result) {
-    while (!onFailedCallbacks->empty()) {
+    while (!data->onFailedCallbacks.empty()) {
       // TODO(*): Invoke callbacks in another execution context.
-      onFailedCallbacks->front()(**message);
-      onFailedCallbacks->pop();
+      data->onFailedCallbacks.front()(*data->message);
+      data->onFailedCallbacks.pop();
     }
 
-    while (!onAnyCallbacks->empty()) {
+    while (!data->onAnyCallbacks.empty()) {
       // TODO(*): Invoke callbacks in another execution context.
-      onAnyCallbacks->front()(*this);
-      onAnyCallbacks->pop();
+      data->onAnyCallbacks.front()(*this);
+      data->onAnyCallbacks.pop();
     }
   }
 
   return result;
 }
 
-
-template <typename T>
-void Future<T>::copy(const Future<T>& that)
-{
-  assert(that.refs > 0);
-  __sync_fetch_and_add(that.refs, 1);
-  refs = that.refs;
-  lock = that.lock;
-  state = that.state;
-  t = that.t;
-  message = that.message;
-  onReadyCallbacks = that.onReadyCallbacks;
-  onFailedCallbacks = that.onFailedCallbacks;
-  onDiscardedCallbacks = that.onDiscardedCallbacks;
-  onAnyCallbacks = that.onAnyCallbacks;
-  latch = that.latch;
-}
-
-
-template <typename T>
-void Future<T>::cleanup()
-{
-  assert(refs != NULL);
-  if (__sync_sub_and_fetch(refs, 1) == 0) {
-    // Discard the future if it is still pending (so we invoke any
-    // discarded callbacks that have been setup). Note that we put the
-    // reference count back at 1 here in case one of the callbacks
-    // decides it wants to keep a reference.
-    assert(state != NULL);
-    if (*state == PENDING) {
-      *refs = 1;
-      discard();
-      __sync_sub_and_fetch(refs, 1);
-    }
-
-    // Now try and cleanup again (this time we know the future has
-    // either been discarded or was not pending). Note that one of the
-    // callbacks might have stored the future, in which case we'll
-    // just return without doing anything, but the state will forever
-    // be "discarded".
-    assert(refs != NULL);
-    if (*refs == 0) {
-      delete refs;
-      refs = NULL;
-      assert(lock != NULL);
-      delete lock;
-      lock = NULL;
-      assert(state != NULL);
-      delete state;
-      state = NULL;
-      assert(t != NULL);
-      delete *t;
-      delete t;
-      t = NULL;
-      assert(message != NULL);
-      delete *message;
-      delete message;
-      message = NULL;
-      assert(onReadyCallbacks != NULL);
-      delete onReadyCallbacks;
-      onReadyCallbacks = NULL;
-      assert(onFailedCallbacks != NULL);
-      delete onFailedCallbacks;
-      onFailedCallbacks = NULL;
-      assert(onDiscardedCallbacks != NULL);
-      delete onDiscardedCallbacks;
-      onDiscardedCallbacks = NULL;
-      assert(onAnyCallbacks != NULL);
-      delete onAnyCallbacks;
-      onAnyCallbacks = NULL;
-      assert(latch != NULL);
-      delete latch;
-      latch = NULL;
-    }
-  }
-}
-
 }  // namespace process {
 
 #endif // __PROCESS_FUTURE_HPP__


[2/2] git commit: Create Latch in Future Lazily.

Posted by be...@apache.org.
Create Latch in Future Lazily.

No need to create a latch if we don't call Future.await(). Given that
Future.await() is rare in the code base, we should have a big
performance improvement. I am attaching the experiment results.

From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/15190


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7bc7acc9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7bc7acc9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7bc7acc9

Branch: refs/heads/master
Commit: 7bc7acc9f3a31359cc2b0130d26acd20e7cb96af
Parents: ae85e39
Author: Benjamin Hindman <be...@gmail.com>
Authored: Mon Nov 4 06:43:46 2013 -1000
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Nov 4 06:44:26 2013 -1000

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp | 37 ++++++++++++++++-----
 1 file changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7bc7acc9/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index f08fdf3..1625d1d 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -240,7 +240,7 @@ private:
     ~Data();
 
     int lock;
-    Latch latch;
+    Latch* latch;
     State state;
     T* t;
     std::string* message; // Message associated with failure.
@@ -492,6 +492,7 @@ Future<T> Future<T>::failed(const std::string& message)
 template <typename T>
 Future<T>::Data::Data()
   : lock(0),
+    latch(NULL),
     state(PENDING),
     t(NULL),
     message(NULL) {}
@@ -500,6 +501,7 @@ Future<T>::Data::Data()
 template <typename T>
 Future<T>::Data::~Data()
 {
+  delete latch;
   delete t;
   delete message;
 }
@@ -568,7 +570,9 @@ bool Future<T>::discard()
   {
     if (data->state == PENDING) {
       data->state = DISCARDED;
-      data->latch.trigger();
+      if (data->latch != NULL) {
+        data->latch->trigger();
+      }
       result = true;
     }
   }
@@ -626,11 +630,24 @@ bool Future<T>::isFailed() const
 template <typename T>
 bool Future<T>::await(const Duration& duration) const
 {
-  if (!isReady() && !isDiscarded() && !isFailed()) {
-    return data->latch.await(duration);
-  } else {
-    return true;
+  bool await = false;
+
+  internal::acquire(&data->lock);
+  {
+    if (data->state == PENDING) {
+      if (data->latch == NULL) {
+        data->latch = new Latch();
+      }
+      await = true;
+    }
   }
+  internal::release(&data->lock);
+
+  if (await) {
+    return data->latch->await(duration);
+  }
+
+  return true;
 }
 
 
@@ -898,7 +915,9 @@ bool Future<T>::set(const T& _t)
     if (data->state == PENDING) {
       data->t = new T(_t);
       data->state = READY;
-      data->latch.trigger();
+      if (data->latch != NULL) {
+        data->latch->trigger();
+      }
       result = true;
     }
   }
@@ -935,7 +954,9 @@ bool Future<T>::fail(const std::string& _message)
     if (data->state == PENDING) {
       data->message = new std::string(_message);
       data->state = FAILED;
-      data->latch.trigger();
+      if (data->latch != NULL) {
+        data->latch->trigger();
+      }
       result = true;
     }
   }