You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/03 01:23:24 UTC
svn commit: r1368752 - in /incubator/mesos/trunk/src: messages/state.proto
state/leveldb.cpp state/leveldb.hpp state/serializer.hpp state/state.hpp
state/zookeeper.cpp state/zookeeper.hpp tests/state_tests.cpp
Author: benh
Date: Thu Aug 2 23:23:24 2012
New Revision: 1368752
URL: http://svn.apache.org/viewvc?rev=1368752&view=rev
Log:
Factored out serializer/deserializer from state abstraction
(https://reviews.apache.org/r/6076).
Added:
incubator/mesos/trunk/src/state/leveldb.hpp
incubator/mesos/trunk/src/state/serializer.hpp
incubator/mesos/trunk/src/state/zookeeper.hpp
Modified:
incubator/mesos/trunk/src/messages/state.proto
incubator/mesos/trunk/src/state/leveldb.cpp
incubator/mesos/trunk/src/state/state.hpp
incubator/mesos/trunk/src/state/zookeeper.cpp
incubator/mesos/trunk/src/tests/state_tests.cpp
Modified: incubator/mesos/trunk/src/messages/state.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/state.proto?rev=1368752&r1=1368751&r2=1368752&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages/state.proto (original)
+++ incubator/mesos/trunk/src/messages/state.proto Thu Aug 2 23:23:24 2012
@@ -18,13 +18,9 @@
package mesos.internal.state;
-// Describes a state entry, effectively a key/value pair. We include
-// the protocol buffer descriptor so that we can check usage.
+// Describes a state entry, a versioned (via a UUID) key/value pair.
message Entry {
required string name = 1;
required bytes uuid = 2;
- required string type = 3;
- // TODO(benh): Consider saving more information about the type.
- // required google.protobuf.DescriptorProto message_type = 3;
- required bytes value = 4;
+ required bytes value = 3;
}
Modified: incubator/mesos/trunk/src/state/leveldb.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/leveldb.cpp?rev=1368752&r1=1368751&r2=1368752&view=diff
==============================================================================
--- incubator/mesos/trunk/src/state/leveldb.cpp (original)
+++ incubator/mesos/trunk/src/state/leveldb.cpp Thu Aug 2 23:23:24 2012
@@ -16,42 +16,17 @@
#include "messages/state.hpp"
+#include "state/leveldb.hpp"
#include "state/state.hpp"
using namespace process;
-using process::wait; // Necessary on some OS's to disambiguate.
-
using std::string;
namespace mesos {
namespace internal {
namespace state {
-class LevelDBStateProcess : public Process<LevelDBStateProcess>
-{
-public:
- LevelDBStateProcess(const string& path);
- virtual ~LevelDBStateProcess();
-
- virtual void initialize();
-
- // State implementation.
- Future<Option<Entry> > fetch(const string& name);
- Future<bool> swap(const Entry& entry, const UUID& uuid);
-
-private:
- // Helpers for interacting with leveldb.
- Try<Option<Entry> > get(const string& name);
- Try<bool> put(const Entry& entry);
-
- const string path;
- leveldb::DB* db;
-
- Option<string> error;
-};
-
-
LevelDBStateProcess::LevelDBStateProcess(const string& _path)
: path(_path), db(NULL) {}
@@ -180,33 +155,6 @@ Try<bool> LevelDBStateProcess::put(const
return true;
}
-
-LevelDBState::LevelDBState(const std::string& path)
-{
- process = new LevelDBStateProcess(path);
- spawn(process);
-}
-
-
-LevelDBState::~LevelDBState()
-{
- terminate(process);
- wait(process);
- delete process;
-}
-
-
-Future<Option<Entry> > LevelDBState::fetch(const string& name)
-{
- return dispatch(process, &LevelDBStateProcess::fetch, name);
-}
-
-
-Future<bool> LevelDBState::swap(const Entry& entry, const UUID& uuid)
-{
- return dispatch(process, &LevelDBStateProcess::swap, entry, uuid);
-}
-
} // namespace state {
} // namespace internal {
} // namespace mesos {
Added: incubator/mesos/trunk/src/state/leveldb.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/leveldb.hpp?rev=1368752&view=auto
==============================================================================
--- incubator/mesos/trunk/src/state/leveldb.hpp (added)
+++ incubator/mesos/trunk/src/state/leveldb.hpp Thu Aug 2 23:23:24 2012
@@ -0,0 +1,107 @@
+#ifndef __STATE_LEVELDB_HPP__
+#define __STATE_LEVELDB_HPP__
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/state.hpp"
+
+#include "state/serializer.hpp"
+#include "state/state.hpp"
+
+// Forward declarations.
+namespace leveldb { class DB; }
+
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+// More forward declarations.
+class LevelDBStateProcess;
+
+
+template <typename Serializer = StringSerializer>
+class LevelDBState : public State<Serializer>
+{
+public:
+ LevelDBState(const std::string& path);
+ virtual ~LevelDBState();
+
+protected:
+ // State implementation.
+ virtual process::Future<Option<Entry> > fetch(const std::string& name);
+ virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+private:
+ LevelDBStateProcess* process;
+};
+
+
+class LevelDBStateProcess : public process::Process<LevelDBStateProcess>
+{
+public:
+ LevelDBStateProcess(const std::string& path);
+ virtual ~LevelDBStateProcess();
+
+ virtual void initialize();
+
+ // State implementation.
+ process::Future<Option<Entry> > fetch(const std::string& name);
+ process::Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+private:
+ // Helpers for interacting with leveldb.
+ Try<Option<Entry> > get(const std::string& name);
+ Try<bool> put(const Entry& entry);
+
+ const std::string path;
+ leveldb::DB* db;
+
+ Option<std::string> error;
+};
+
+
+template <typename Serializer>
+LevelDBState<Serializer>::LevelDBState(const std::string& path)
+{
+ process = new LevelDBStateProcess(path);
+ process::spawn(process);
+}
+
+
+template <typename Serializer>
+LevelDBState<Serializer>::~LevelDBState()
+{
+ process::terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+template <typename Serializer>
+process::Future<Option<Entry> > LevelDBState<Serializer>::fetch(
+ const std::string& name)
+{
+ return process::dispatch(process, &LevelDBStateProcess::fetch, name);
+}
+
+
+template <typename Serializer>
+process::Future<bool> LevelDBState<Serializer>::swap(
+ const Entry& entry,
+ const UUID& uuid)
+{
+ return process::dispatch(process, &LevelDBStateProcess::swap, entry, uuid);
+}
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_LEVELDB_HPP__
Added: incubator/mesos/trunk/src/state/serializer.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/serializer.hpp?rev=1368752&view=auto
==============================================================================
--- incubator/mesos/trunk/src/state/serializer.hpp (added)
+++ incubator/mesos/trunk/src/state/serializer.hpp Thu Aug 2 23:23:24 2012
@@ -0,0 +1,66 @@
+#ifndef __STATE_SERIALIZER_HPP__
+#define __STATE_SERIALIZER_HPP__
+
+#include <google/protobuf/message.h>
+
+#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+
+#include <string>
+
+#include <stout/try.hpp>
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+struct StringSerializer
+{
+ template <typename T>
+ static Try<std::string> deserialize(const std::string& value)
+ {
+ return value;
+ }
+
+ template <typename T>
+ static Try<std::string> serialize(const std::string& value)
+ {
+ return value;
+ }
+};
+
+
+struct ProtobufSerializer
+{
+ template <typename T>
+ static Try<T> deserialize(const std::string& value)
+ {
+ T t;
+ const google::protobuf::Message* message = &t; // Check T is a protobuf.
+
+ google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
+ if (!t.ParseFromZeroCopyStream(&stream)) {
+ return Try<T>::error(
+ "Failed to deserialize " + t.GetDescriptor()->full_name());
+ }
+ return t;
+ }
+
+ template <typename T>
+ static Try<std::string> serialize(const T& t)
+ {
+ // TODO(benh): Actually store the descriptor so that we can verify
+ // type information (and compatibility) when we deserialize.
+ std::string value;
+ if (!t.SerializeToString(&value)) {
+ return Try<std::string>::error(
+ "Failed to serialize " + t.GetDescriptor()->full_name());
+ }
+ return value;
+ }
+};
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_SERIALIZER_HPP__
Modified: incubator/mesos/trunk/src/state/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/state.hpp?rev=1368752&r1=1368751&r2=1368752&view=diff
==============================================================================
--- incubator/mesos/trunk/src/state/state.hpp (original)
+++ incubator/mesos/trunk/src/state/state.hpp Thu Aug 2 23:23:24 2012
@@ -16,39 +16,30 @@
* limitations under the License.
*/
-#ifndef __STATE_HPP__
-#define __STATE_HPP__
-
-#include <google/protobuf/message.h>
-
-#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+#ifndef __STATE_STATE_HPP__
+#define __STATE_STATE_HPP__
#include <string>
#include <process/future.hpp>
#include <stout/option.hpp>
-#include <stout/time.hpp>
+#include <stout/try.hpp>
#include <stout/uuid.hpp>
#include "logging/logging.hpp"
#include "messages/state.hpp"
-#include "zookeeper/authentication.hpp"
+#include "state/serializer.hpp"
namespace mesos {
namespace internal {
namespace state {
-// Forward declarations.
-class LevelDBStateProcess;
-class ZooKeeperStateProcess;
-
// An abstraction of "state" (possibly between multiple distributed
// components) represented by "variables" (effectively key/value
-// pairs). The value of a variable in the state must be a protocol
-// buffer. Variables are versioned such that setting a variable in the
+// pairs). Variables are versioned such that setting a variable in the
// state will only succeed if the variable has not changed since last
// fetched. Varying implementations of state provide varying
// replicated guarantees.
@@ -59,37 +50,50 @@ class ZooKeeperStateProcess;
// performed on the variable since your get.
// Example:
-// State* state = new ZooKeeperState();
-// Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
-// State::Variable<Slaves> slaves = variable.get();
+
+// State<ProtobufSerializer>* state = new ZooKeeperState<ProtobufSerializer>();
+// Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
+// Variable<Slaves> slaves = variable.get();
// slaves->add_infos()->MergeFrom(info);
// Future<bool> set = state->set(&slaves);
-class State
+// Forward declarations.
+template <typename Serializer>
+class State;
+class ZooKeeperStateProcess;
+
+
+template <typename T>
+class Variable
{
public:
- template <typename T>
- class Variable
+ T* operator -> ()
{
- public:
- T* operator -> ()
- {
- return &t;
- }
+ return &t;
+ }
- private:
- friend class State; // Creates and manages variables.
+ void mutate(const T& _t)
+ {
+ t = _t;
+ }
- Variable(const Entry& _entry, const T& _t)
- : entry(_entry), t(_t)
- {
- const google::protobuf::Message* message = &t; // Check T is a protobuf.
- }
+private:
+ template <typename Serializer>
+ friend class State; // Creates and manages variables.
+
+ Variable(const Entry& _entry, const T& _t)
+ : entry(_entry), t(_t)
+ {}
+
+ Entry entry; // Not const so Variable is copyable.
+ T t;
+};
- Entry entry; // Not const so Variable is copyable.
- T t;
- };
+template <typename Serializer = StringSerializer>
+class State
+{
+public:
State() {}
virtual ~State() {}
@@ -111,77 +115,77 @@ protected:
virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid) = 0;
private:
- // Helper to convert an Entry to a Variable<T>. We make this a
+ // Helper to convert an Entry into some Variable<T> (or create a
+ // default Entry in the event no Entry was found). We make this a
// static member of State for friend access to Variable's
// constructor.
template <typename T>
- static process::Future<State::Variable<T> > convert(
+ static process::Future<Variable<T> > convert(
const std::string& name,
const Option<Entry>& option);
+
};
-// Helper for converting an Entry into a Variable<T>.
+template <typename Serializer>
template <typename T>
-process::Future<State::Variable<T> > State::convert(
+process::Future<Variable<T> > State<Serializer>::convert(
const std::string& name,
const Option<Entry>& option)
{
- T t;
-
if (option.isSome()) {
const Entry& entry = option.get();
- // TODO(benh): Check _compatibility_ versus equivalance.
- CHECK(t.GetDescriptor()->full_name() == entry.type());
+ Try<T> t = Serializer::template deserialize<T>(entry.value());
- const std::string& value = entry.value();
- google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
- if (!t.ParseFromZeroCopyStream(&stream)) {
- return process::Future<State::Variable<T> >::failed(
- "Failed to deserialize " + t.GetDescriptor()->full_name());
+ if (t.isError()) {
+ return process::Future<Variable<T> >::failed(t.error());
}
- return State::Variable<T>(entry, t);
+ return Variable<T>(entry, t.get());
}
// Otherwise, construct a Variable out of a new Entry with a default
// value for T (and a random UUID to start).
- std::string value;
+ T t;
- if (!t.SerializeToString(&value)) {
- return process::Future<State::Variable<T> >::failed(
- "Failed to serialize " + t.GetDescriptor()->full_name());
+ Try<std::string> value = Serializer::template serialize<T>(t);
+
+ if (value.isError()) {
+ return process::Future<Variable<T> >::failed(value.error());
}
Entry entry;
entry.set_name(name);
entry.set_uuid(UUID::random().toBytes());
- entry.set_type(t.GetDescriptor()->full_name());
- entry.set_value(value);
+ entry.set_value(value.get());
- return State::Variable<T>(entry, t);
+ return Variable<T>(entry, t);
}
+template <typename Serializer>
template <typename T>
-process::Future<State::Variable<T> > State::get(const std::string& name)
+process::Future<Variable<T> > State<Serializer>::get(const std::string& name)
{
std::tr1::function<
- process::Future<State::Variable<T> >(const Option<Entry>&)> convert =
- std::tr1::bind(&State::convert<T>, name, std::tr1::placeholders::_1);
+ process::Future<Variable<T> >(const Option<Entry>&)> convert =
+ std::tr1::bind(&State<Serializer>::template convert<T>,
+ name,
+ std::tr1::placeholders::_1);
return fetch(name).then(convert);
}
+template <typename Serializer>
template <typename T>
-process::Future<bool> State::set(State::Variable<T>* variable)
+process::Future<bool> State<Serializer>::set(Variable<T>* variable)
{
- std::string value;
- if (!variable->t.SerializeToString(&value)) {
- return process::Future<bool>::failed(
- "Failed to serialize " + variable->entry.type());
+ Try<std::string> value = Serializer::template serialize<T>(variable->t);
+
+ if (value.isError()) {
+ return process::Future<bool>::failed(value.error());
}
// Note that we try and swap an entry even if the value didn't change!
@@ -189,65 +193,13 @@ process::Future<bool> State::set(State::
// Update the UUID and value of the entry.
variable->entry.set_uuid(UUID::random().toBytes());
- variable->entry.set_value(value);
+ variable->entry.set_value(value.get());
return swap(variable->entry, uuid);
}
-
-class LevelDBState : public State
-{
-public:
- LevelDBState(const std::string& path);
- virtual ~LevelDBState();
-
-protected:
- // State implementation.
- virtual process::Future<Option<Entry> > fetch(const std::string& name);
- virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
-
-private:
- LevelDBStateProcess* process;
-};
-
-
-class ZooKeeperState : public State
-{
-public:
- // TODO(benh): Just take a zookeeper::URL.
- ZooKeeperState(
- const std::string& servers,
- const seconds& timeout,
- const std::string& znode,
- const Option<zookeeper::Authentication>& auth =
- Option<zookeeper::Authentication>());
- virtual ~ZooKeeperState();
-
-protected:
- // State implementation.
- virtual process::Future<Option<Entry> > fetch(const std::string& name);
- virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
-
-private:
- ZooKeeperStateProcess* process;
-};
-
} // namespace state {
} // namespace internal {
} // namespace mesos {
-#endif // __STATE_HPP__
-
-
-
-
-// need a Future::operator -> (), plus a test
-
-
-// have a way to "watch" a Variable and get a future that signifies when/if it has changed?
-
-// need a master work directory for local leveldb version of State!
-
-// use leveldb for non-ha version of master (no reading/writing files)
-
-// need to set the location of master detector zk znode, and set this znode location
+#endif // __STATE_STATE_HPP__
Modified: incubator/mesos/trunk/src/state/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/zookeeper.cpp?rev=1368752&r1=1368751&r2=1368752&view=diff
==============================================================================
--- incubator/mesos/trunk/src/state/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/state/zookeeper.cpp Thu Aug 2 23:23:24 2012
@@ -19,6 +19,7 @@
#include "messages/state.hpp"
#include "state/state.hpp"
+#include "state/zookeeper.hpp"
#include "zookeeper/authentication.hpp"
#include "zookeeper/watcher.hpp"
@@ -26,8 +27,6 @@
using namespace process;
-using process::wait; // Necessary on some OS's to disambiguate.
-
using std::queue;
using std::string;
@@ -37,78 +36,6 @@ namespace mesos {
namespace internal {
namespace state {
-class ZooKeeperStateProcess : public Process<ZooKeeperStateProcess>
-{
-public:
- ZooKeeperStateProcess(
- const string& servers,
- const seconds& timeout,
- const string& znode,
- const Option<Authentication>& auth);
- virtual ~ZooKeeperStateProcess();
-
- virtual void initialize();
-
- // State implementation.
- Future<Option<Entry> > fetch(const string& name);
- Future<bool> swap(const Entry& entry, const UUID& uuid);
-
- // ZooKeeper events.
- void connected(bool reconnect);
- void reconnecting();
- void expired();
- void updated(const string& path);
- void created(const string& path);
- void deleted(const string& path);
-
-private:
- // Helpers for fetching and swapping.
- Result<Option<Entry> > doFetch(const string& name);
- Result<bool> doSwap(const Entry& entry, const UUID& uuid);
-
- const string servers;
- const seconds timeout;
- const string znode;
-
- Option<Authentication> auth; // ZooKeeper authentication.
-
- const ACL_vector acl; // Default ACL to use.
-
- Watcher* watcher;
- ZooKeeper* zk;
-
- enum State { // ZooKeeper connection state.
- DISCONNECTED,
- CONNECTING,
- CONNECTED,
- } state;
-
- struct Fetch
- {
- Fetch(const string& _name)
- : name(_name) {}
- string name;
- Promise<Option<Entry> > promise;
- };
-
- struct Swap
- {
- Swap(const Entry& _entry, const UUID& _uuid)
- : entry(_entry), uuid(_uuid) {}
- Entry entry;
- UUID uuid;
- Promise<bool> promise;
- };
-
- struct {
- queue<Fetch*> fetches;
- queue<Swap*> swaps;
- } pending;
-
- Option<string> error;
-};
-
-
// Helper for failing a queue of promises.
template <typename T>
void fail(queue<T*>* queue, const string& message)
@@ -425,37 +352,6 @@ Result<bool> ZooKeeperStateProcess::doSw
return true;
}
-
-ZooKeeperState::ZooKeeperState(
- const string& servers,
- const seconds& timeout,
- const string& znode,
- const Option<Authentication>& auth)
-{
- process = new ZooKeeperStateProcess(servers, timeout, znode, auth);
- spawn(process);
-}
-
-
-ZooKeeperState::~ZooKeeperState()
-{
- terminate(process);
- wait(process);
- delete process;
-}
-
-
-Future<Option<Entry> > ZooKeeperState::fetch(const string& name)
-{
- return dispatch(process, &ZooKeeperStateProcess::fetch, name);
-}
-
-
-Future<bool> ZooKeeperState::swap(const Entry& entry, const UUID& uuid)
-{
- return dispatch(process, &ZooKeeperStateProcess::swap, entry, uuid);
-}
-
} // namespace state {
} // namespace internal {
} // namespace mesos {
Added: incubator/mesos/trunk/src/state/zookeeper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/zookeeper.hpp?rev=1368752&view=auto
==============================================================================
--- incubator/mesos/trunk/src/state/zookeeper.hpp (added)
+++ incubator/mesos/trunk/src/state/zookeeper.hpp Thu Aug 2 23:23:24 2012
@@ -0,0 +1,170 @@
+#ifndef __STATE_ZOOKEEPER_HPP__
+#define __STATE_ZOOKEEPER_HPP__
+
+#include <queue>
+#include <string>
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include <stout/option.hpp>
+#include <stout/result.hpp>
+#include <stout/time.hpp>
+#include <stout/try.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/state.hpp"
+
+#include "state/serializer.hpp"
+#include "state/state.hpp"
+
+#include "zookeeper/authentication.hpp"
+#include "zookeeper/watcher.hpp"
+#include "zookeeper/zookeeper.hpp"
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+// Forward declarations.
+class ZooKeeperStateProcess;
+
+
+template <typename Serializer = StringSerializer>
+class ZooKeeperState : public State<Serializer>
+{
+public:
+ // TODO(benh): Just take a zookeeper::URL.
+ ZooKeeperState(
+ const std::string& servers,
+ const seconds& timeout,
+ const std::string& znode,
+ const Option<zookeeper::Authentication>& auth =
+ Option<zookeeper::Authentication>());
+ virtual ~ZooKeeperState();
+
+protected:
+ // State implementation.
+ virtual process::Future<Option<Entry> > fetch(const std::string& name);
+ virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+private:
+ ZooKeeperStateProcess* process;
+};
+
+
+class ZooKeeperStateProcess : public process::Process<ZooKeeperStateProcess>
+{
+public:
+ ZooKeeperStateProcess(
+ const std::string& servers,
+ const seconds& timeout,
+ const std::string& znode,
+ const Option<zookeeper::Authentication>& auth);
+ virtual ~ZooKeeperStateProcess();
+
+ virtual void initialize();
+
+ // State implementation.
+ process::Future<Option<Entry> > fetch(const std::string& name);
+ process::Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+ // ZooKeeper events.
+ void connected(bool reconnect);
+ void reconnecting();
+ void expired();
+ void updated(const std::string& path);
+ void created(const std::string& path);
+ void deleted(const std::string& path);
+
+private:
+ // Helpers for fetching and swapping.
+ Result<Option<Entry> > doFetch(const std::string& name);
+ Result<bool> doSwap(const Entry& entry, const UUID& uuid);
+
+ const std::string servers;
+ const seconds timeout;
+ const std::string znode;
+
+ Option<zookeeper::Authentication> auth; // ZooKeeper authentication.
+
+ const ACL_vector acl; // Default ACL to use.
+
+ Watcher* watcher;
+ ZooKeeper* zk;
+
+ enum State { // ZooKeeper connection state.
+ DISCONNECTED,
+ CONNECTING,
+ CONNECTED,
+ } state;
+
+ struct Fetch
+ {
+ Fetch(const std::string& _name)
+ : name(_name) {}
+ std::string name;
+ process::Promise<Option<Entry> > promise;
+ };
+
+ struct Swap
+ {
+ Swap(const Entry& _entry, const UUID& _uuid)
+ : entry(_entry), uuid(_uuid) {}
+ Entry entry;
+ UUID uuid;
+ process::Promise<bool> promise;
+ };
+
+ struct {
+ std::queue<Fetch*> fetches;
+ std::queue<Swap*> swaps;
+ } pending;
+
+ Option<std::string> error;
+};
+
+
+template <typename Serializer>
+ZooKeeperState<Serializer>::ZooKeeperState(
+ const std::string& servers,
+ const seconds& timeout,
+ const std::string& znode,
+ const Option<zookeeper::Authentication>& auth)
+{
+ process = new ZooKeeperStateProcess(servers, timeout, znode, auth);
+ process::spawn(process);
+}
+
+
+template <typename Serializer>
+ZooKeeperState<Serializer>::~ZooKeeperState()
+{
+ process::terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+template <typename Serializer>
+process::Future<Option<Entry> > ZooKeeperState<Serializer>::fetch(
+ const std::string& name)
+{
+ return process::dispatch(process, &ZooKeeperStateProcess::fetch, name);
+}
+
+
+template <typename Serializer>
+process::Future<bool> ZooKeeperState<Serializer>::swap(
+ const Entry& entry,
+ const UUID& uuid)
+{
+ return process::dispatch(process, &ZooKeeperStateProcess::swap, entry, uuid);
+}
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_ZOOKEEPER_HPP__
Modified: incubator/mesos/trunk/src/tests/state_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/state_tests.cpp?rev=1368752&r1=1368751&r2=1368752&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/state_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/state_tests.cpp Thu Aug 2 23:23:24 2012
@@ -33,7 +33,10 @@
#include "messages/messages.hpp"
+#include "state/leveldb.hpp"
+#include "state/serializer.hpp"
#include "state/state.hpp"
+#include "state/zookeeper.hpp"
#ifdef MESOS_HAS_JAVA
#include "tests/base_zookeeper_test.hpp"
@@ -48,15 +51,15 @@ using namespace mesos::internal::test;
using namespace process;
-void GetSetGet(State* state)
+void GetSetGet(State<ProtobufSerializer>* state)
{
- Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+ Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
variable.await();
ASSERT_TRUE(variable.isReady());
- State::Variable<Slaves> slaves1 = variable.get();
+ Variable<Slaves> slaves1 = variable.get();
EXPECT_TRUE(slaves1->infos().size() == 0);
@@ -79,7 +82,7 @@ void GetSetGet(State* state)
ASSERT_TRUE(variable.isReady());
- State::Variable<Slaves> slaves2 = variable.get();
+ Variable<Slaves> slaves2 = variable.get();
ASSERT_TRUE(slaves2->infos().size() == 1);
EXPECT_EQ("localhost", slaves2->infos(0).hostname());
@@ -87,15 +90,15 @@ void GetSetGet(State* state)
}
-void GetSetSetGet(State* state)
+void GetSetSetGet(State<ProtobufSerializer>* state)
{
- Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+ Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
variable.await();
ASSERT_TRUE(variable.isReady());
- State::Variable<Slaves> slaves1 = variable.get();
+ Variable<Slaves> slaves1 = variable.get();
EXPECT_TRUE(slaves1->infos().size() == 0);
@@ -125,7 +128,7 @@ void GetSetSetGet(State* state)
ASSERT_TRUE(variable.isReady());
- State::Variable<Slaves> slaves2 = variable.get();
+ Variable<Slaves> slaves2 = variable.get();
ASSERT_TRUE(slaves2->infos().size() == 1);
EXPECT_EQ("localhost", slaves2->infos(0).hostname());
@@ -133,15 +136,15 @@ void GetSetSetGet(State* state)
}
-void GetGetSetSetGet(State* state)
+void GetGetSetSetGet(State<ProtobufSerializer>* state)
{
- Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+ Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
variable.await();
ASSERT_TRUE(variable.isReady());
- State::Variable<Slaves> slaves1 = variable.get();
+ Variable<Slaves> slaves1 = variable.get();
EXPECT_TRUE(slaves1->infos().size() == 0);
@@ -151,7 +154,7 @@ void GetGetSetSetGet(State* state)
ASSERT_TRUE(variable.isReady());
- State::Variable<Slaves> slaves2 = variable.get();
+ Variable<Slaves> slaves2 = variable.get();
EXPECT_TRUE(slaves2->infos().size() == 0);
@@ -205,7 +208,7 @@ protected:
virtual void SetUp()
{
os::rmdir(path);
- state = new LevelDBState(path);
+ state = new LevelDBState<ProtobufSerializer>(path);
}
virtual void TearDown()
@@ -214,7 +217,7 @@ protected:
os::rmdir(path);
}
- State* state;
+ State<ProtobufSerializer>* state;
private:
const std::string path;
@@ -250,7 +253,10 @@ protected:
virtual void SetUp()
{
BaseZooKeeperTest::SetUp();
- state = new ZooKeeperState(zks->connectString(), NO_TIMEOUT, "/state/");
+ state = new ZooKeeperState<ProtobufSerializer>(
+ zks->connectString(),
+ NO_TIMEOUT,
+ "/state/");
}
virtual void TearDown()
@@ -259,7 +265,7 @@ protected:
BaseZooKeeperTest::TearDown();
}
- State* state;
+ State<ProtobufSerializer>* state;
};