You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/11/01 00:12:27 UTC
git commit: Supported upgrading from Shared to Owned.
Updated Branches:
refs/heads/master 270b7594c -> 09df8039a
Supported upgrading from Shared to Owned.
From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/15112
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/09df8039
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/09df8039
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/09df8039
Branch: refs/heads/master
Commit: 09df8039adbf43f54ea5e9ec92275fd2d573bcb2
Parents: 270b759
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Oct 31 13:11:55 2013 -1000
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Oct 31 13:11:55 2013 -1000
----------------------------------------------------------------------
3rdparty/libprocess/include/process/shared.hpp | 167 +++++++++++++++++++-
3rdparty/libprocess/src/tests/shared_tests.cpp | 85 +++++++++-
2 files changed, 245 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/09df8039/3rdparty/libprocess/include/process/shared.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/shared.hpp b/3rdparty/libprocess/include/process/shared.hpp
index faa11cc..e0c7991 100644
--- a/3rdparty/libprocess/include/process/shared.hpp
+++ b/3rdparty/libprocess/include/process/shared.hpp
@@ -1,21 +1,180 @@
#ifndef __PROCESS_SHARED_HPP__
#define __PROCESS_SHARED_HPP__
+#include <glog/logging.h>
+
#include <boost/shared_ptr.hpp>
+#include <stout/owned.hpp>
+
+#include <process/future.hpp>
+
namespace process {
// Represents a shared pointer and therefore enforces 'const' access.
template <typename T>
-class Shared : public boost::shared_ptr<const T>
+class Shared
{
public:
- Shared(T* t) : boost::shared_ptr<const T>(t) {}
+ Shared();
+ explicit Shared(T* t);
+
+ bool operator == (const Shared<T>& that) const;
+ bool operator < (const Shared<T>& that) const;
+
+ // Enforces const access semantics.
+ const T& operator * () const;
+ const T* operator -> () const;
+ const T* get() const;
+
+ bool unique() const;
+
+ void reset();
+ void reset(T* t);
+ void swap(Shared<T>& that);
+
+ // Upgrading from a shared pointer to an owned pointer. This shared
+ // pointer will be reset after this function is invoked. If two
+ // shared pointers pointing to the same object both want to be
+ // upgraded, only one of them may succeed and the other one will get
+ // a failed future.
+ Future<Owned<T> > upgrade();
+
+private:
+ struct Data
+ {
+ Data(T* _t);
+ ~Data();
- // TODO(jieyu): Support upgrading from a shared pointer to an owned
- // pointer. The interface is like this: Future<Owned<T> > upgrade().
+ T* t;
+ volatile bool upgraded;
+ Promise<Owned<T> > promise;
+ };
+
+ boost::shared_ptr<Data> data;
};
+
+template <typename T>
+Shared<T>::Shared() {}
+
+
+template <typename T>
+Shared<T>::Shared(T* t)
+{
+ if (t != NULL) {
+ data.reset(new Data(t));
+ }
+}
+
+
+template <typename T>
+bool Shared<T>::operator == (const Shared<T>& that) const
+{
+ return data == that.data;
+}
+
+
+template <typename T>
+bool Shared<T>::operator < (const Shared<T>& that) const
+{
+ return data < that.data;
+}
+
+
+template <typename T>
+const T& Shared<T>::operator * () const
+{
+ return *CHECK_NOTNULL(get());
+}
+
+
+template <typename T>
+const T* Shared<T>::operator -> () const
+{
+ return CHECK_NOTNULL(get());
+}
+
+
+template <typename T>
+const T* Shared<T>::get() const
+{
+ if (data.get() == NULL) {
+ return NULL;
+ } else {
+ return data->t;
+ }
+}
+
+
+template <typename T>
+bool Shared<T>::unique() const
+{
+ return data.unique();
+}
+
+
+template <typename T>
+void Shared<T>::reset()
+{
+ data.reset();
+}
+
+
+template <typename T>
+void Shared<T>::reset(T* t)
+{
+ if (t == NULL) {
+ data.reset();
+ } else {
+ data.reset(new Data(t));
+ }
+}
+
+
+template <typename T>
+void Shared<T>::swap(Shared<T>& that)
+{
+ data.swap(that.data);
+}
+
+
+template <typename T>
+Future<Owned<T> > Shared<T>::upgrade()
+{
+ // If two threads simultaneously access this object and at least one
+ // of them is a write, the behavior is undefined. This is similar to
+ // boost::shared_ptr. For more details, please refer to the boost
+ // shared_ptr document (section "Thread Safety").
+ if (data.get() == NULL) {
+ return Owned<T>(NULL);
+ }
+
+ if (!__sync_bool_compare_and_swap(&data->upgraded, false, true)) {
+ return Future<Owned<T> >::failed("An upgrade is already being performed");
+ }
+
+ Future<Owned<T> > future = data->promise.future();
+ data.reset();
+ return future;
+}
+
+
+template <typename T>
+Shared<T>::Data::Data(T* _t)
+ : t(CHECK_NOTNULL(_t)), upgraded(false) {}
+
+
+template <typename T>
+Shared<T>::Data::~Data()
+{
+ if (upgraded) {
+ promise.set(Owned<T>(t));
+ } else {
+ delete t;
+ }
+}
+
} // namespace process {
#endif // __PROCESS_SHARED_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/09df8039/3rdparty/libprocess/src/tests/shared_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/shared_tests.cpp b/3rdparty/libprocess/src/tests/shared_tests.cpp
index e520b4f..860a9aa 100644
--- a/3rdparty/libprocess/src/tests/shared_tests.cpp
+++ b/3rdparty/libprocess/src/tests/shared_tests.cpp
@@ -1,5 +1,6 @@
#include <gmock/gmock.h>
+#include <process/gtest.hpp>
#include <process/shared.hpp>
using namespace process;
@@ -20,10 +21,88 @@ TEST(Shared, ConstAccess)
Foo* foo = new Foo();
foo->set(10);
- Shared<Foo> sp(foo);
+ Shared<Foo> shared(foo);
- EXPECT_EQ(10, sp->get());
+ EXPECT_EQ(10, shared->get());
// The following won't compile.
- // sp->set(20);
+ // shared->set(20);
+}
+
+
+TEST(Shared, Null)
+{
+ Shared<Foo> shared(NULL);
+ Shared<Foo> shared2(shared);
+
+ EXPECT_TRUE(shared.get() == NULL);
+ EXPECT_TRUE(shared2.get() == NULL);
+}
+
+
+TEST(Shared, Reset)
+{
+ Foo* foo = new Foo();
+ foo->set(42);
+
+ Shared<Foo> shared(foo);
+ Shared<Foo> shared2(shared);
+
+ EXPECT_FALSE(shared.unique());
+ EXPECT_FALSE(shared2.unique());
+ EXPECT_EQ(42, shared->get());
+ EXPECT_EQ(42, shared2->get());
+
+ shared.reset();
+
+ EXPECT_FALSE(shared.unique());
+ EXPECT_TRUE(shared.get() == NULL);
+
+ EXPECT_TRUE(shared2.unique());
+ EXPECT_EQ(42, shared2->get());
+}
+
+
+TEST(Shared, Upgrade)
+{
+ Foo* foo = new Foo();
+ foo->set(42);
+
+ Shared<Foo> shared(foo);
+
+ EXPECT_EQ(42, shared->get());
+ EXPECT_EQ(42, (*shared).get());
+ EXPECT_EQ(42, shared.get()->get());
+ EXPECT_TRUE(shared.unique());
+
+ Future<Owned<Foo> > future;
+
+ {
+ Shared<Foo> shared2(shared);
+
+ EXPECT_EQ(42, shared2->get());
+ EXPECT_EQ(42, (*shared2).get());
+ EXPECT_EQ(42, shared2.get()->get());
+ EXPECT_FALSE(shared2.unique());
+ EXPECT_FALSE(shared.unique());
+
+ future = shared2.upgrade();
+
+ // A shared pointer will be reset after it called upgrade.
+ EXPECT_TRUE(shared2.get() == NULL);
+
+ // Only one upgrade is allowed.
+ AWAIT_FAILED(shared.upgrade());
+
+ // Upgrade is not done yet as 'shared' is still holding the reference.
+ EXPECT_TRUE(future.isPending());
+ }
+
+ shared.reset();
+ AWAIT_READY(future);
+
+ Owned<Foo> owned = future.get();
+ EXPECT_EQ(42, owned->get());
+ EXPECT_EQ(42, (*owned).get());
+ EXPECT_EQ(42, owned.get()->get());
}