You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/11/01 00:12:27 UTC

git commit: Supported upgrading from Shared to Owned.

Updated Branches:
  refs/heads/master 270b7594c -> 09df8039a


Supported upgrading from Shared to Owned.

From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/15112


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/09df8039
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/09df8039
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/09df8039

Branch: refs/heads/master
Commit: 09df8039adbf43f54ea5e9ec92275fd2d573bcb2
Parents: 270b759
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Oct 31 13:11:55 2013 -1000
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Oct 31 13:11:55 2013 -1000

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/shared.hpp | 167 +++++++++++++++++++-
 3rdparty/libprocess/src/tests/shared_tests.cpp |  85 +++++++++-
 2 files changed, 245 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/09df8039/3rdparty/libprocess/include/process/shared.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/shared.hpp b/3rdparty/libprocess/include/process/shared.hpp
index faa11cc..e0c7991 100644
--- a/3rdparty/libprocess/include/process/shared.hpp
+++ b/3rdparty/libprocess/include/process/shared.hpp
@@ -1,21 +1,180 @@
 #ifndef __PROCESS_SHARED_HPP__
 #define __PROCESS_SHARED_HPP__
 
+#include <glog/logging.h>
+
 #include <boost/shared_ptr.hpp>
 
+#include <stout/owned.hpp>
+
+#include <process/future.hpp>
+
 namespace process {
 
 // Represents a shared pointer and therefore enforces 'const' access.
 template <typename T>
-class Shared : public boost::shared_ptr<const T>
+class Shared
 {
 public:
-  Shared(T* t) : boost::shared_ptr<const T>(t) {}
+  Shared();
+  explicit Shared(T* t);
+
+  bool operator == (const Shared<T>& that) const;
+  bool operator < (const Shared<T>& that) const;
+
+  // Enforces const access semantics.
+  const T& operator * () const;
+  const T* operator -> () const;
+  const T* get() const;
+
+  bool unique() const;
+
+  void reset();
+  void reset(T* t);
+  void swap(Shared<T>& that);
+
+  // Upgrading from a shared pointer to an owned pointer. This shared
+  // pointer will be reset after this function is invoked. If two
+  // shared pointers pointing to the same object both want to be
+  // upgraded, only one of them may succeed and the other one will get
+  // a failed future.
+  Future<Owned<T> > upgrade();
+
+private:
+  struct Data
+  {
+    Data(T* _t);
+    ~Data();
 
-  // TODO(jieyu): Support upgrading from a shared pointer to an owned
-  // pointer. The interface is like this: Future<Owned<T> > upgrade().
+    T* t;
+    volatile bool upgraded;
+    Promise<Owned<T> > promise;
+  };
+
+  boost::shared_ptr<Data> data;
 };
 
+
+template <typename T>
+Shared<T>::Shared() {}
+
+
+template <typename T>
+Shared<T>::Shared(T* t)
+{
+  if (t != NULL) {
+    data.reset(new Data(t));
+  }
+}
+
+
+template <typename T>
+bool Shared<T>::operator == (const Shared<T>& that) const
+{
+  return data == that.data;
+}
+
+
+template <typename T>
+bool Shared<T>::operator < (const Shared<T>& that) const
+{
+  return data < that.data;
+}
+
+
+template <typename T>
+const T& Shared<T>::operator * () const
+{
+  return *CHECK_NOTNULL(get());
+}
+
+
+template <typename T>
+const T* Shared<T>::operator -> () const
+{
+  return CHECK_NOTNULL(get());
+}
+
+
+template <typename T>
+const T* Shared<T>::get() const
+{
+  if (data.get() == NULL) {
+    return NULL;
+  } else {
+    return data->t;
+  }
+}
+
+
+template <typename T>
+bool Shared<T>::unique() const
+{
+  return data.unique();
+}
+
+
+template <typename T>
+void Shared<T>::reset()
+{
+  data.reset();
+}
+
+
+template <typename T>
+void Shared<T>::reset(T* t)
+{
+  if (t == NULL) {
+    data.reset();
+  } else {
+    data.reset(new Data(t));
+  }
+}
+
+
+template <typename T>
+void Shared<T>::swap(Shared<T>& that)
+{
+  data.swap(that.data);
+}
+
+
+template <typename T>
+Future<Owned<T> > Shared<T>::upgrade()
+{
+  // If two threads simultaneously access this object and at least one
+  // of them is a write, the behavior is undefined. This is similar to
+  // boost::shared_ptr. For more details, please refer to the boost
+  // shared_ptr document (section "Thread Safety").
+  if (data.get() == NULL) {
+    return Owned<T>(NULL);
+  }
+
+  if (!__sync_bool_compare_and_swap(&data->upgraded, false, true)) {
+    return Future<Owned<T> >::failed("An upgrade is already being performed");
+  }
+
+  Future<Owned<T> > future = data->promise.future();
+  data.reset();
+  return future;
+}
+
+
+template <typename T>
+Shared<T>::Data::Data(T* _t)
+  : t(CHECK_NOTNULL(_t)), upgraded(false) {}
+
+
+template <typename T>
+Shared<T>::Data::~Data()
+{
+  if (upgraded) {
+    promise.set(Owned<T>(t));
+  } else {
+    delete t;
+  }
+}
+
 } // namespace process {
 
 #endif // __PROCESS_SHARED_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/09df8039/3rdparty/libprocess/src/tests/shared_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/shared_tests.cpp b/3rdparty/libprocess/src/tests/shared_tests.cpp
index e520b4f..860a9aa 100644
--- a/3rdparty/libprocess/src/tests/shared_tests.cpp
+++ b/3rdparty/libprocess/src/tests/shared_tests.cpp
@@ -1,5 +1,6 @@
 #include <gmock/gmock.h>
 
+#include <process/gtest.hpp>
 #include <process/shared.hpp>
 
 using namespace process;
@@ -20,10 +21,88 @@ TEST(Shared, ConstAccess)
   Foo* foo = new Foo();
   foo->set(10);
 
-  Shared<Foo> sp(foo);
+  Shared<Foo> shared(foo);
 
-  EXPECT_EQ(10, sp->get());
+  EXPECT_EQ(10, shared->get());
 
   // The following won't compile.
-  // sp->set(20);
+  // shared->set(20);
+}
+
+
+TEST(Shared, Null)
+{
+  Shared<Foo> shared(NULL);
+  Shared<Foo> shared2(shared);
+
+  EXPECT_TRUE(shared.get() == NULL);
+  EXPECT_TRUE(shared2.get() == NULL);
+}
+
+
+TEST(Shared, Reset)
+{
+  Foo* foo = new Foo();
+  foo->set(42);
+
+  Shared<Foo> shared(foo);
+  Shared<Foo> shared2(shared);
+
+  EXPECT_FALSE(shared.unique());
+  EXPECT_FALSE(shared2.unique());
+  EXPECT_EQ(42, shared->get());
+  EXPECT_EQ(42, shared2->get());
+
+  shared.reset();
+
+  EXPECT_FALSE(shared.unique());
+  EXPECT_TRUE(shared.get() == NULL);
+
+  EXPECT_TRUE(shared2.unique());
+  EXPECT_EQ(42, shared2->get());
+}
+
+
+TEST(Shared, Upgrade)
+{
+  Foo* foo = new Foo();
+  foo->set(42);
+
+  Shared<Foo> shared(foo);
+
+  EXPECT_EQ(42, shared->get());
+  EXPECT_EQ(42, (*shared).get());
+  EXPECT_EQ(42, shared.get()->get());
+  EXPECT_TRUE(shared.unique());
+
+  Future<Owned<Foo> > future;
+
+  {
+    Shared<Foo> shared2(shared);
+
+    EXPECT_EQ(42, shared2->get());
+    EXPECT_EQ(42, (*shared2).get());
+    EXPECT_EQ(42, shared2.get()->get());
+    EXPECT_FALSE(shared2.unique());
+    EXPECT_FALSE(shared.unique());
+
+    future = shared2.upgrade();
+
+    // A shared pointer will be reset after it called upgrade.
+    EXPECT_TRUE(shared2.get() == NULL);
+
+    // Only one upgrade is allowed.
+    AWAIT_FAILED(shared.upgrade());
+
+    // Upgrade is not done yet as 'shared' is still holding the reference.
+    EXPECT_TRUE(future.isPending());
+  }
+
+  shared.reset();
+  AWAIT_READY(future);
+
+  Owned<Foo> owned = future.get();
+  EXPECT_EQ(42, owned->get());
+  EXPECT_EQ(42, (*owned).get());
+  EXPECT_EQ(42, owned.get()->get());
 }