You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/03 01:23:24 UTC

svn commit: r1368752 - in /incubator/mesos/trunk/src: messages/state.proto state/leveldb.cpp state/leveldb.hpp state/serializer.hpp state/state.hpp state/zookeeper.cpp state/zookeeper.hpp tests/state_tests.cpp

Author: benh
Date: Thu Aug  2 23:23:24 2012
New Revision: 1368752

URL: http://svn.apache.org/viewvc?rev=1368752&view=rev
Log:
Factored out serializer/deserializer from state abstraction
(https://reviews.apache.org/r/6076).

Added:
    incubator/mesos/trunk/src/state/leveldb.hpp
    incubator/mesos/trunk/src/state/serializer.hpp
    incubator/mesos/trunk/src/state/zookeeper.hpp
Modified:
    incubator/mesos/trunk/src/messages/state.proto
    incubator/mesos/trunk/src/state/leveldb.cpp
    incubator/mesos/trunk/src/state/state.hpp
    incubator/mesos/trunk/src/state/zookeeper.cpp
    incubator/mesos/trunk/src/tests/state_tests.cpp

Modified: incubator/mesos/trunk/src/messages/state.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/state.proto?rev=1368752&r1=1368751&r2=1368752&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages/state.proto (original)
+++ incubator/mesos/trunk/src/messages/state.proto Thu Aug  2 23:23:24 2012
@@ -18,13 +18,9 @@
 
 package mesos.internal.state;
 
-// Describes a state entry, effectively a key/value pair. We include
-// the protocol buffer descriptor so that we can check usage.
+// Describes a state entry, a versioned (via a UUID) key/value pair.
 message Entry {
   required string name = 1;
   required bytes uuid = 2;
-  required string type = 3;
-  // TODO(benh): Consider saving more information about the type.
-  // required google.protobuf.DescriptorProto message_type = 3;
-  required bytes value = 4;
+  required bytes value = 3;
 }

Modified: incubator/mesos/trunk/src/state/leveldb.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/leveldb.cpp?rev=1368752&r1=1368751&r2=1368752&view=diff
==============================================================================
--- incubator/mesos/trunk/src/state/leveldb.cpp (original)
+++ incubator/mesos/trunk/src/state/leveldb.cpp Thu Aug  2 23:23:24 2012
@@ -16,42 +16,17 @@
 
 #include "messages/state.hpp"
 
+#include "state/leveldb.hpp"
 #include "state/state.hpp"
 
 using namespace process;
 
-using process::wait; // Necessary on some OS's to disambiguate.
-
 using std::string;
 
 namespace mesos {
 namespace internal {
 namespace state {
 
-class LevelDBStateProcess : public Process<LevelDBStateProcess>
-{
-public:
-  LevelDBStateProcess(const string& path);
-  virtual ~LevelDBStateProcess();
-
-  virtual void initialize();
-
-  // State implementation.
-  Future<Option<Entry> > fetch(const string& name);
-  Future<bool> swap(const Entry& entry, const UUID& uuid);
-
-private:
-  // Helpers for interacting with leveldb.
-  Try<Option<Entry> > get(const string& name);
-  Try<bool> put(const Entry& entry);
-
-  const string path;
-  leveldb::DB* db;
-
-  Option<string> error;
-};
-
-
 LevelDBStateProcess::LevelDBStateProcess(const string& _path)
   : path(_path), db(NULL) {}
 
@@ -180,33 +155,6 @@ Try<bool> LevelDBStateProcess::put(const
   return true;
 }
 
-
-LevelDBState::LevelDBState(const std::string& path)
-{
-  process = new LevelDBStateProcess(path);
-  spawn(process);
-}
-
-
-LevelDBState::~LevelDBState()
-{
-  terminate(process);
-  wait(process);
-  delete process;
-}
-
-
-Future<Option<Entry> > LevelDBState::fetch(const string& name)
-{
-  return dispatch(process, &LevelDBStateProcess::fetch, name);
-}
-
-
-Future<bool> LevelDBState::swap(const Entry& entry, const UUID& uuid)
-{
-  return dispatch(process, &LevelDBStateProcess::swap, entry, uuid);
-}
-
 } // namespace state {
 } // namespace internal {
 } // namespace mesos {

Added: incubator/mesos/trunk/src/state/leveldb.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/leveldb.hpp?rev=1368752&view=auto
==============================================================================
--- incubator/mesos/trunk/src/state/leveldb.hpp (added)
+++ incubator/mesos/trunk/src/state/leveldb.hpp Thu Aug  2 23:23:24 2012
@@ -0,0 +1,107 @@
+#ifndef __STATE_LEVELDB_HPP__
+#define __STATE_LEVELDB_HPP__
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/state.hpp"
+
+#include "state/serializer.hpp"
+#include "state/state.hpp"
+
+// Forward declarations.
+namespace leveldb { class DB; }
+
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+// More forward declarations.
+class LevelDBStateProcess;
+
+
+template <typename Serializer = StringSerializer>
+class LevelDBState : public State<Serializer>
+{
+public:
+  LevelDBState(const std::string& path);
+  virtual ~LevelDBState();
+
+protected:
+  // State implementation.
+  virtual process::Future<Option<Entry> > fetch(const std::string& name);
+  virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+private:
+  LevelDBStateProcess* process;
+};
+
+
+class LevelDBStateProcess : public process::Process<LevelDBStateProcess>
+{
+public:
+  LevelDBStateProcess(const std::string& path);
+  virtual ~LevelDBStateProcess();
+
+  virtual void initialize();
+
+  // State implementation.
+  process::Future<Option<Entry> > fetch(const std::string& name);
+  process::Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+private:
+  // Helpers for interacting with leveldb.
+  Try<Option<Entry> > get(const std::string& name);
+  Try<bool> put(const Entry& entry);
+
+  const std::string path;
+  leveldb::DB* db;
+
+  Option<std::string> error;
+};
+
+
+template <typename Serializer>
+LevelDBState<Serializer>::LevelDBState(const std::string& path)
+{
+  process = new LevelDBStateProcess(path);
+  process::spawn(process);
+}
+
+
+template <typename Serializer>
+LevelDBState<Serializer>::~LevelDBState()
+{
+  process::terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+template <typename Serializer>
+process::Future<Option<Entry> > LevelDBState<Serializer>::fetch(
+    const std::string& name)
+{
+  return process::dispatch(process, &LevelDBStateProcess::fetch, name);
+}
+
+
+template <typename Serializer>
+process::Future<bool> LevelDBState<Serializer>::swap(
+    const Entry& entry,
+    const UUID& uuid)
+{
+  return process::dispatch(process, &LevelDBStateProcess::swap, entry, uuid);
+}
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_LEVELDB_HPP__

Added: incubator/mesos/trunk/src/state/serializer.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/serializer.hpp?rev=1368752&view=auto
==============================================================================
--- incubator/mesos/trunk/src/state/serializer.hpp (added)
+++ incubator/mesos/trunk/src/state/serializer.hpp Thu Aug  2 23:23:24 2012
@@ -0,0 +1,66 @@
+#ifndef __STATE_SERIALIZER_HPP__
+#define __STATE_SERIALIZER_HPP__
+
+#include <google/protobuf/message.h>
+
+#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+
+#include <string>
+
+#include <stout/try.hpp>
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+struct StringSerializer
+{
+  template <typename T>
+  static Try<std::string> deserialize(const std::string& value)
+  {
+    return value;
+  }
+
+  template <typename T>
+  static Try<std::string> serialize(const std::string& value)
+  {
+    return value;
+  }
+};
+
+
+struct ProtobufSerializer
+{
+  template <typename T>
+  static Try<T> deserialize(const std::string& value)
+  {
+    T t;
+    const google::protobuf::Message* message = &t; // Check T is a protobuf.
+
+    google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
+    if (!t.ParseFromZeroCopyStream(&stream)) {
+      return Try<T>::error(
+          "Failed to deserialize " + t.GetDescriptor()->full_name());
+    }
+    return t;
+  }
+
+  template <typename T>
+  static Try<std::string> serialize(const T& t)
+  {
+    // TODO(benh): Actually store the descriptor so that we can verify
+    // type information (and compatibility) when we deserialize.
+    std::string value;
+    if (!t.SerializeToString(&value)) {
+      return Try<std::string>::error(
+          "Failed to serialize " + t.GetDescriptor()->full_name());
+    }
+    return value;
+  }
+};
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_SERIALIZER_HPP__

Modified: incubator/mesos/trunk/src/state/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/state.hpp?rev=1368752&r1=1368751&r2=1368752&view=diff
==============================================================================
--- incubator/mesos/trunk/src/state/state.hpp (original)
+++ incubator/mesos/trunk/src/state/state.hpp Thu Aug  2 23:23:24 2012
@@ -16,39 +16,30 @@
  * limitations under the License.
  */
 
-#ifndef __STATE_HPP__
-#define __STATE_HPP__
-
-#include <google/protobuf/message.h>
-
-#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+#ifndef __STATE_STATE_HPP__
+#define __STATE_STATE_HPP__
 
 #include <string>
 
 #include <process/future.hpp>
 
 #include <stout/option.hpp>
-#include <stout/time.hpp>
+#include <stout/try.hpp>
 #include <stout/uuid.hpp>
 
 #include "logging/logging.hpp"
 
 #include "messages/state.hpp"
 
-#include "zookeeper/authentication.hpp"
+#include "state/serializer.hpp"
 
 namespace mesos {
 namespace internal {
 namespace state {
 
-// Forward declarations.
-class LevelDBStateProcess;
-class ZooKeeperStateProcess;
-
 // An abstraction of "state" (possibly between multiple distributed
 // components) represented by "variables" (effectively key/value
-// pairs). The value of a variable in the state must be a protocol
-// buffer. Variables are versioned such that setting a variable in the
+// pairs). Variables are versioned such that setting a variable in the
 // state will only succeed if the variable has not changed since last
 // fetched. Varying implementations of state provide varying
 // replicated guarantees.
@@ -59,37 +50,50 @@ class ZooKeeperStateProcess;
 // performed on the variable since your get.
 
 // Example:
-//   State* state = new ZooKeeperState();
-//   Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
-//   State::Variable<Slaves> slaves = variable.get();
+
+//   State<ProtobufSerializer>* state = new ZooKeeperState<ProtobufSerializer>();
+//   Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
+//   Variable<Slaves> slaves = variable.get();
 //   slaves->add_infos()->MergeFrom(info);
 //   Future<bool> set = state->set(&slaves);
 
-class State
+// Forward declarations.
+template <typename Serializer>
+class State;
+class ZooKeeperStateProcess;
+
+
+template <typename T>
+class Variable
 {
 public:
-  template <typename T>
-  class Variable
+  T* operator -> ()
   {
-  public:
-    T* operator -> ()
-    {
-      return &t;
-    }
+    return &t;
+  }
 
-  private:
-    friend class State; // Creates and manages variables.
+  void mutate(const T& _t)
+  {
+    t = _t;
+  }
 
-    Variable(const Entry& _entry, const T& _t)
-      : entry(_entry), t(_t)
-    {
-      const google::protobuf::Message* message = &t; // Check T is a protobuf.
-    }
+private:
+  template <typename Serializer>
+  friend class State; // Creates and manages variables.
+
+  Variable(const Entry& _entry, const T& _t)
+    : entry(_entry), t(_t)
+  {}
+
+  Entry entry; // Not const so Variable is copyable.
+  T t;
+};
 
-    Entry entry; // Not const so Variable is copyable.
-    T t;
-  };
 
+template <typename Serializer = StringSerializer>
+class State
+{
+public:
   State() {}
   virtual ~State() {}
 
@@ -111,77 +115,77 @@ protected:
   virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid) = 0;
 
 private:
-  // Helper to convert an Entry to a Variable<T>. We make this a
+  // Helper to convert an Entry into some Variable<T> (or create a
+  // default Entry in the event no Entry was found). We make this a
   // static member of State for friend access to Variable's
   // constructor.
   template <typename T>
-  static process::Future<State::Variable<T> > convert(
+  static process::Future<Variable<T> > convert(
       const std::string& name,
       const Option<Entry>& option);
+
 };
 
 
-// Helper for converting an Entry into a Variable<T>.
+template <typename Serializer>
 template <typename T>
-process::Future<State::Variable<T> > State::convert(
+process::Future<Variable<T> > State<Serializer>::convert(
     const std::string& name,
     const Option<Entry>& option)
 {
-  T t;
-
   if (option.isSome()) {
     const Entry& entry = option.get();
 
-    // TODO(benh): Check _compatibility_ versus equivalance.
-    CHECK(t.GetDescriptor()->full_name() == entry.type());
+    Try<T> t = Serializer::template deserialize<T>(entry.value());
 
-    const std::string& value = entry.value();
-    google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
-    if (!t.ParseFromZeroCopyStream(&stream)) {
-      return process::Future<State::Variable<T> >::failed(
-          "Failed to deserialize " + t.GetDescriptor()->full_name());
+    if (t.isError()) {
+      return process::Future<Variable<T> >::failed(t.error());
     }
 
-    return State::Variable<T>(entry, t);
+    return Variable<T>(entry, t.get());
   }
 
   // Otherwise, construct a Variable out of a new Entry with a default
   // value for T (and a random UUID to start).
-  std::string value;
+  T t;
 
-  if (!t.SerializeToString(&value)) {
-    return process::Future<State::Variable<T> >::failed(
-        "Failed to serialize " + t.GetDescriptor()->full_name());
+  Try<std::string> value = Serializer::template serialize<T>(t);
+
+  if (value.isError()) {
+    return process::Future<Variable<T> >::failed(value.error());
   }
 
   Entry entry;
   entry.set_name(name);
   entry.set_uuid(UUID::random().toBytes());
-  entry.set_type(t.GetDescriptor()->full_name());
-  entry.set_value(value);
+  entry.set_value(value.get());
 
-  return State::Variable<T>(entry, t);
+  return Variable<T>(entry, t);
 }
 
 
+template <typename Serializer>
 template <typename T>
-process::Future<State::Variable<T> > State::get(const std::string& name)
+process::Future<Variable<T> > State<Serializer>::get(const std::string& name)
 {
   std::tr1::function<
-  process::Future<State::Variable<T> >(const Option<Entry>&)> convert =
-    std::tr1::bind(&State::convert<T>, name, std::tr1::placeholders::_1);
+  process::Future<Variable<T> >(const Option<Entry>&)> convert =
+    std::tr1::bind(&State<Serializer>::template convert<T>,
+                   name,
+                   std::tr1::placeholders::_1);
 
   return fetch(name).then(convert);
 }
 
 
+template <typename Serializer>
 template <typename T>
-process::Future<bool> State::set(State::Variable<T>* variable)
+process::Future<bool> State<Serializer>::set(Variable<T>* variable)
 {
-  std::string value;
-  if (!variable->t.SerializeToString(&value)) {
-    return process::Future<bool>::failed(
-        "Failed to serialize " + variable->entry.type());
+  Try<std::string> value = Serializer::template serialize<T>(variable->t);
+
+  if (value.isError()) {
+    return process::Future<bool>::failed(value.error());
   }
 
   // Note that we try and swap an entry even if the value didn't change!
@@ -189,65 +193,13 @@ process::Future<bool> State::set(State::
 
   // Update the UUID and value of the entry.
   variable->entry.set_uuid(UUID::random().toBytes());
-  variable->entry.set_value(value);
+  variable->entry.set_value(value.get());
 
   return swap(variable->entry, uuid);
 }
 
-
-class LevelDBState : public State
-{
-public:
-  LevelDBState(const std::string& path);
-  virtual ~LevelDBState();
-
-protected:
-  // State implementation.
-  virtual process::Future<Option<Entry> > fetch(const std::string& name);
-  virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
-
-private:
-  LevelDBStateProcess* process;
-};
-
-
-class ZooKeeperState : public State
-{
-public:
-  // TODO(benh): Just take a zookeeper::URL.
-  ZooKeeperState(
-      const std::string& servers,
-      const seconds& timeout,
-      const std::string& znode,
-      const Option<zookeeper::Authentication>& auth =
-      Option<zookeeper::Authentication>());
-  virtual ~ZooKeeperState();
-
-protected:
-  // State implementation.
-  virtual process::Future<Option<Entry> > fetch(const std::string& name);
-  virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
-
-private:
-  ZooKeeperStateProcess* process;
-};
-
 } // namespace state {
 } // namespace internal {
 } // namespace mesos {
 
-#endif // __STATE_HPP__
-
-
-
-
-// need a Future::operator -> (), plus a test
-
-
-// have a way to "watch" a Variable and get a future that signifies when/if it has changed?
-
-// need a master work directory for local leveldb version of State!
-
-// use leveldb for non-ha version of master (no reading/writing files)
-
-// need to set the location of master detector zk znode, and set this znode location
+#endif // __STATE_STATE_HPP__

Modified: incubator/mesos/trunk/src/state/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/zookeeper.cpp?rev=1368752&r1=1368751&r2=1368752&view=diff
==============================================================================
--- incubator/mesos/trunk/src/state/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/state/zookeeper.cpp Thu Aug  2 23:23:24 2012
@@ -19,6 +19,7 @@
 #include "messages/state.hpp"
 
 #include "state/state.hpp"
+#include "state/zookeeper.hpp"
 
 #include "zookeeper/authentication.hpp"
 #include "zookeeper/watcher.hpp"
@@ -26,8 +27,6 @@
 
 using namespace process;
 
-using process::wait; // Necessary on some OS's to disambiguate.
-
 using std::queue;
 using std::string;
 
@@ -37,78 +36,6 @@ namespace mesos {
 namespace internal {
 namespace state {
 
-class ZooKeeperStateProcess : public Process<ZooKeeperStateProcess>
-{
-public:
-  ZooKeeperStateProcess(
-      const string& servers,
-      const seconds& timeout,
-      const string& znode,
-      const Option<Authentication>& auth);
-  virtual ~ZooKeeperStateProcess();
-
-  virtual void initialize();
-
-  // State implementation.
-  Future<Option<Entry> > fetch(const string& name);
-  Future<bool> swap(const Entry& entry, const UUID& uuid);
-
-  // ZooKeeper events.
-  void connected(bool reconnect);
-  void reconnecting();
-  void expired();
-  void updated(const string& path);
-  void created(const string& path);
-  void deleted(const string& path);
-
-private:
-  // Helpers for fetching and swapping.
-  Result<Option<Entry> > doFetch(const string& name);
-  Result<bool> doSwap(const Entry& entry, const UUID& uuid);
-
-  const string servers;
-  const seconds timeout;
-  const string znode;
-
-  Option<Authentication> auth; // ZooKeeper authentication.
-
-  const ACL_vector acl; // Default ACL to use.
-
-  Watcher* watcher;
-  ZooKeeper* zk;
-
-  enum State { // ZooKeeper connection state.
-    DISCONNECTED,
-    CONNECTING,
-    CONNECTED,
-  } state;
-
-  struct Fetch
-  {
-    Fetch(const string& _name)
-      : name(_name) {}
-    string name;
-    Promise<Option<Entry> > promise;
-  };
-
-  struct Swap
-  {
-    Swap(const Entry& _entry, const UUID& _uuid)
-      : entry(_entry), uuid(_uuid) {}
-    Entry entry;
-    UUID uuid;
-    Promise<bool> promise;
-  };
-
-  struct {
-    queue<Fetch*> fetches;
-    queue<Swap*> swaps;
-  } pending;
-
-  Option<string> error;
-};
-
-
 // Helper for failing a queue of promises.
 template <typename T>
 void fail(queue<T*>* queue, const string& message)
@@ -425,37 +352,6 @@ Result<bool> ZooKeeperStateProcess::doSw
   return true;
 }
 
-
-ZooKeeperState::ZooKeeperState(
-    const string& servers,
-    const seconds& timeout,
-    const string& znode,
-    const Option<Authentication>& auth)
-{
-  process = new ZooKeeperStateProcess(servers, timeout, znode, auth);
-  spawn(process);
-}
-
-
-ZooKeeperState::~ZooKeeperState()
-{
-  terminate(process);
-  wait(process);
-  delete process;
-}
-
-
-Future<Option<Entry> > ZooKeeperState::fetch(const string& name)
-{
-  return dispatch(process, &ZooKeeperStateProcess::fetch, name);
-}
-
-
-Future<bool> ZooKeeperState::swap(const Entry& entry, const UUID& uuid)
-{
-  return dispatch(process, &ZooKeeperStateProcess::swap, entry, uuid);
-}
-
 } // namespace state {
 } // namespace internal {
 } // namespace mesos {

Added: incubator/mesos/trunk/src/state/zookeeper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/zookeeper.hpp?rev=1368752&view=auto
==============================================================================
--- incubator/mesos/trunk/src/state/zookeeper.hpp (added)
+++ incubator/mesos/trunk/src/state/zookeeper.hpp Thu Aug  2 23:23:24 2012
@@ -0,0 +1,170 @@
+#ifndef __STATE_ZOOKEEPER_HPP__
+#define __STATE_ZOOKEEPER_HPP__
+
+#include <queue>
+#include <string>
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include <stout/option.hpp>
+#include <stout/result.hpp>
+#include <stout/time.hpp>
+#include <stout/try.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/state.hpp"
+
+#include "state/serializer.hpp"
+#include "state/state.hpp"
+
+#include "zookeeper/authentication.hpp"
+#include "zookeeper/watcher.hpp"
+#include "zookeeper/zookeeper.hpp"
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+// Forward declarations.
+class ZooKeeperStateProcess;
+
+
+template <typename Serializer = StringSerializer>
+class ZooKeeperState : public State<Serializer>
+{
+public:
+  // TODO(benh): Just take a zookeeper::URL.
+  ZooKeeperState(
+      const std::string& servers,
+      const seconds& timeout,
+      const std::string& znode,
+      const Option<zookeeper::Authentication>& auth =
+      Option<zookeeper::Authentication>());
+  virtual ~ZooKeeperState();
+
+protected:
+  // State implementation.
+  virtual process::Future<Option<Entry> > fetch(const std::string& name);
+  virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+private:
+  ZooKeeperStateProcess* process;
+};
+
+
+class ZooKeeperStateProcess : public process::Process<ZooKeeperStateProcess>
+{
+public:
+  ZooKeeperStateProcess(
+      const std::string& servers,
+      const seconds& timeout,
+      const std::string& znode,
+      const Option<zookeeper::Authentication>& auth);
+  virtual ~ZooKeeperStateProcess();
+
+  virtual void initialize();
+
+  // State implementation.
+  process::Future<Option<Entry> > fetch(const std::string& name);
+  process::Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+  // ZooKeeper events.
+  void connected(bool reconnect);
+  void reconnecting();
+  void expired();
+  void updated(const std::string& path);
+  void created(const std::string& path);
+  void deleted(const std::string& path);
+
+private:
+  // Helpers for fetching and swapping.
+  Result<Option<Entry> > doFetch(const std::string& name);
+  Result<bool> doSwap(const Entry& entry, const UUID& uuid);
+
+  const std::string servers;
+  const seconds timeout;
+  const std::string znode;
+
+  Option<zookeeper::Authentication> auth; // ZooKeeper authentication.
+
+  const ACL_vector acl; // Default ACL to use.
+
+  Watcher* watcher;
+  ZooKeeper* zk;
+
+  enum State { // ZooKeeper connection state.
+    DISCONNECTED,
+    CONNECTING,
+    CONNECTED,
+  } state;
+
+  struct Fetch
+  {
+    Fetch(const std::string& _name)
+      : name(_name) {}
+    std::string name;
+    process::Promise<Option<Entry> > promise;
+  };
+
+  struct Swap
+  {
+    Swap(const Entry& _entry, const UUID& _uuid)
+      : entry(_entry), uuid(_uuid) {}
+    Entry entry;
+    UUID uuid;
+    process::Promise<bool> promise;
+  };
+
+  struct {
+    std::queue<Fetch*> fetches;
+    std::queue<Swap*> swaps;
+  } pending;
+
+  Option<std::string> error;
+};
+
+
+template <typename Serializer>
+ZooKeeperState<Serializer>::ZooKeeperState(
+    const std::string& servers,
+    const seconds& timeout,
+    const std::string& znode,
+    const Option<zookeeper::Authentication>& auth)
+{
+  process = new ZooKeeperStateProcess(servers, timeout, znode, auth);
+  process::spawn(process);
+}
+
+
+template <typename Serializer>
+ZooKeeperState<Serializer>::~ZooKeeperState()
+{
+  process::terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+template <typename Serializer>
+process::Future<Option<Entry> > ZooKeeperState<Serializer>::fetch(
+    const std::string& name)
+{
+  return process::dispatch(process, &ZooKeeperStateProcess::fetch, name);
+}
+
+
+template <typename Serializer>
+process::Future<bool> ZooKeeperState<Serializer>::swap(
+    const Entry& entry,
+    const UUID& uuid)
+{
+  return process::dispatch(process, &ZooKeeperStateProcess::swap, entry, uuid);
+}
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_ZOOKEEPER_HPP__

Modified: incubator/mesos/trunk/src/tests/state_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/state_tests.cpp?rev=1368752&r1=1368751&r2=1368752&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/state_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/state_tests.cpp Thu Aug  2 23:23:24 2012
@@ -33,7 +33,10 @@
 
 #include "messages/messages.hpp"
 
+#include "state/leveldb.hpp"
+#include "state/serializer.hpp"
 #include "state/state.hpp"
+#include "state/zookeeper.hpp"
 
 #ifdef MESOS_HAS_JAVA
 #include "tests/base_zookeeper_test.hpp"
@@ -48,15 +51,15 @@ using namespace mesos::internal::test;
 using namespace process;
 
 
-void GetSetGet(State* state)
+void GetSetGet(State<ProtobufSerializer>* state)
 {
-  Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+  Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
 
   variable.await();
 
   ASSERT_TRUE(variable.isReady());
 
-  State::Variable<Slaves> slaves1 = variable.get();
+  Variable<Slaves> slaves1 = variable.get();
 
   EXPECT_TRUE(slaves1->infos().size() == 0);
 
@@ -79,7 +82,7 @@ void GetSetGet(State* state)
 
   ASSERT_TRUE(variable.isReady());
 
-  State::Variable<Slaves> slaves2 = variable.get();
+  Variable<Slaves> slaves2 = variable.get();
 
   ASSERT_TRUE(slaves2->infos().size() == 1);
   EXPECT_EQ("localhost", slaves2->infos(0).hostname());
@@ -87,15 +90,15 @@ void GetSetGet(State* state)
 }
 
 
-void GetSetSetGet(State* state)
+void GetSetSetGet(State<ProtobufSerializer>* state)
 {
-  Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+  Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
 
   variable.await();
 
   ASSERT_TRUE(variable.isReady());
 
-  State::Variable<Slaves> slaves1 = variable.get();
+  Variable<Slaves> slaves1 = variable.get();
 
   EXPECT_TRUE(slaves1->infos().size() == 0);
 
@@ -125,7 +128,7 @@ void GetSetSetGet(State* state)
 
   ASSERT_TRUE(variable.isReady());
 
-  State::Variable<Slaves> slaves2 = variable.get();
+  Variable<Slaves> slaves2 = variable.get();
 
   ASSERT_TRUE(slaves2->infos().size() == 1);
   EXPECT_EQ("localhost", slaves2->infos(0).hostname());
@@ -133,15 +136,15 @@ void GetSetSetGet(State* state)
 }
 
 
-void GetGetSetSetGet(State* state)
+void GetGetSetSetGet(State<ProtobufSerializer>* state)
 {
-  Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+  Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
 
   variable.await();
 
   ASSERT_TRUE(variable.isReady());
 
-  State::Variable<Slaves> slaves1 = variable.get();
+  Variable<Slaves> slaves1 = variable.get();
 
   EXPECT_TRUE(slaves1->infos().size() == 0);
 
@@ -151,7 +154,7 @@ void GetGetSetSetGet(State* state)
 
   ASSERT_TRUE(variable.isReady());
 
-  State::Variable<Slaves> slaves2 = variable.get();
+  Variable<Slaves> slaves2 = variable.get();
 
   EXPECT_TRUE(slaves2->infos().size() == 0);
 
@@ -205,7 +208,7 @@ protected:
   virtual void SetUp()
   {
     os::rmdir(path);
-    state = new LevelDBState(path);
+    state = new LevelDBState<ProtobufSerializer>(path);
   }
 
   virtual void TearDown()
@@ -214,7 +217,7 @@ protected:
     os::rmdir(path);
   }
 
-  State* state;
+  State<ProtobufSerializer>* state;
 
 private:
   const std::string path;
@@ -250,7 +253,10 @@ protected:
   virtual void SetUp()
   {
     BaseZooKeeperTest::SetUp();
-    state = new ZooKeeperState(zks->connectString(), NO_TIMEOUT, "/state/");
+    state = new ZooKeeperState<ProtobufSerializer>(
+        zks->connectString(),
+        NO_TIMEOUT,
+        "/state/");
   }
 
   virtual void TearDown()
@@ -259,7 +265,7 @@ protected:
     BaseZooKeeperTest::TearDown();
   }
 
-  State* state;
+  State<ProtobufSerializer>* state;
 };