You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/05/14 22:16:53 UTC

svn commit: r1338382 - in /incubator/mesos/trunk/src: Makefile.am messages/state.hpp messages/state.proto state/ state/leveldb.cpp state/state.hpp state/zookeeper.cpp tests/base_zookeeper_test.hpp tests/state_tests.cpp zookeeper/group.cpp

Author: benh
Date: Mon May 14 20:16:52 2012
New Revision: 1338382

URL: http://svn.apache.org/viewvc?rev=1338382&view=rev
Log:
Added "state" abstraction, including implementations using leveldb and ZooKeeper.

Added:
    incubator/mesos/trunk/src/messages/state.hpp
    incubator/mesos/trunk/src/messages/state.proto
    incubator/mesos/trunk/src/state/
    incubator/mesos/trunk/src/state/leveldb.cpp
    incubator/mesos/trunk/src/state/state.hpp
    incubator/mesos/trunk/src/state/zookeeper.cpp
    incubator/mesos/trunk/src/tests/state_tests.cpp
Modified:
    incubator/mesos/trunk/src/Makefile.am
    incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp
    incubator/mesos/trunk/src/zookeeper/group.cpp

Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1338382&r1=1338381&r2=1338382&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Mon May 14 20:16:52 2012
@@ -119,6 +119,11 @@ LOG_PROTOS = messages/log.pb.cc messages
 BUILT_SOURCES += $(LOG_PROTOS)
 CLEANFILES += $(LOG_PROTOS)
 
+STATE_PROTOS = messages/state.pb.cc messages/state.pb.h
+
+BUILT_SOURCES += $(STATE_PROTOS)
+CLEANFILES += $(STATE_PROTOS)
+
 
 # Targets for generating protocol buffer code.
 %.pb.cc %.pb.h: $(top_srcdir)/include/mesos/%.proto
@@ -157,7 +162,7 @@ libmesos_no_third_party_la_SOURCES = sch
 	common/utils.cpp common/date_utils.cpp common/resources.cpp	\
 	common/attributes.cpp common/values.cpp				\
 	zookeeper/zookeeper.cpp zookeeper/authentication.cpp		\
-	zookeeper/group.cpp messages/log.proto messages/messages.proto
+	zookeeper/group.cpp messages/messages.proto
 
 pkginclude_HEADERS = $(top_srcdir)/include/mesos/executor.hpp	\
 		     $(top_srcdir)/include/mesos/scheduler.hpp	\
@@ -185,12 +190,11 @@ libmesos_no_third_party_la_SOURCES += co
 	common/strings.hpp common/values.hpp				\
 	configurator/configuration.hpp configurator/configurator.hpp	\
 	configurator/option.hpp detector/detector.hpp			\
-	launcher/launcher.hpp local/local.hpp log/coordinator.hpp	\
-	log/replica.hpp log/log.hpp log/network.hpp			\
-	master/allocator.hpp master/allocator_factory.hpp		\
-	master/constants.hpp master/frameworks_manager.hpp		\
-	master/http.hpp master/master.hpp master/simple_allocator.hpp	\
-	master/slaves_manager.hpp master/webui.hpp messages/log.hpp	\
+	launcher/launcher.hpp local/local.hpp master/allocator.hpp	\
+	master/allocator_factory.hpp master/constants.hpp		\
+	master/frameworks_manager.hpp master/http.hpp			\
+	master/master.hpp master/simple_allocator.hpp			\
+	master/slaves_manager.hpp master/webui.hpp			\
 	messages/messages.hpp slave/constants.hpp slave/http.hpp	\
 	slave/isolation_module.hpp slave/isolation_module_factory.hpp	\
 	slave/lxc_isolation_module.hpp					\
@@ -229,12 +233,26 @@ libmesos_no_third_party_la_LIBADD += lib
 # include the leveldb headers.
 noinst_LTLIBRARIES += liblog.la
 liblog_la_SOURCES = log/coordinator.cpp log/replica.cpp
+liblog_la_SOURCES += log/coordinator.hpp log/replica.hpp log/log.hpp	\
+  log/network.hpp messages/log.hpp messages/log.proto
 nodist_liblog_la_SOURCES = $(LOG_PROTOS)
 liblog_la_CPPFLAGS = -I../$(LEVELDB)/include $(MESOS_CPPFLAGS)
 
 libmesos_no_third_party_la_LIBADD += liblog.la
 
 
+# Convenience library for building "state" abstraction in order to
+# include the leveldb headers.
+noinst_LTLIBRARIES += libstate.la
+libstate_la_SOURCES = state/leveldb.cpp state/zookeeper.cpp
+libstate_la_SOURCES += state/state.hpp messages/state.hpp	\
+  messages/state.proto
+nodist_libstate_la_SOURCES = $(STATE_PROTOS)
+libstate_la_CPPFLAGS = -I../$(LEVELDB)/include $(MESOS_CPPFLAGS)
+
+libmesos_no_third_party_la_LIBADD += libstate.la
+
+
 # Convenience library for the webui to include Python specific flags.
 if WEBUI
 noinst_LTLIBRARIES += libwebui.la
@@ -632,7 +650,7 @@ no_executor_framework_LDADD = libmesos.l
 check_PROGRAMS += mesos-tests
 
 mesos_tests_SOURCES = tests/main.cpp tests/utils.cpp			\
-	              tests/master_tests.cpp				\
+	              tests/master_tests.cpp tests/state_tests.cpp	\
 	              tests/resource_offers_tests.cpp			\
 	              tests/fault_tolerance_tests.cpp			\
 	              tests/log_tests.cpp tests/resources_tests.cpp	\

Added: incubator/mesos/trunk/src/messages/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/state.hpp?rev=1338382&view=auto
==============================================================================
--- incubator/mesos/trunk/src/messages/state.hpp (added)
+++ incubator/mesos/trunk/src/messages/state.hpp Mon May 14 20:16:52 2012
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESSAGE_STATE_HPP__
+#define __MESSAGES_STATE_HPP__
+
+#include "messages/state.pb.h"
+
+#endif // __MESSAGES_STATE_HPP__

Added: incubator/mesos/trunk/src/messages/state.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/state.proto?rev=1338382&view=auto
==============================================================================
--- incubator/mesos/trunk/src/messages/state.proto (added)
+++ incubator/mesos/trunk/src/messages/state.proto Mon May 14 20:16:52 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mesos.internal.state;
+
+// Describes a state entry, effectively a key/value pair. We include
+// the protocol buffer descriptor so that we can check usage.
+message Entry {
+  required string name = 1;
+  required bytes uuid = 2;
+  required string type = 3;
+  // TODO(benh): Consider saving more information about the type.
+  // required google.protobuf.DescriptorProto message_type = 3;
+  required bytes value = 4;
+}

Added: incubator/mesos/trunk/src/state/leveldb.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/leveldb.cpp?rev=1338382&view=auto
==============================================================================
--- incubator/mesos/trunk/src/state/leveldb.cpp (added)
+++ incubator/mesos/trunk/src/state/leveldb.cpp Mon May 14 20:16:52 2012
@@ -0,0 +1,211 @@
+#include <leveldb/db.h>
+
+#include <google/protobuf/message.h>
+
+#include <string>
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include "common/logging.hpp"
+#include "common/option.hpp"
+#include "common/try.hpp"
+#include "common/uuid.hpp"
+
+#include "messages/state.hpp"
+
+#include "state/state.hpp"
+
+using namespace process;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+class LevelDBStateProcess : public Process<LevelDBStateProcess>
+{
+public:
+  LevelDBStateProcess(const string& path);
+  virtual ~LevelDBStateProcess();
+
+  virtual void initialize();
+
+  // State implementation.
+  Future<Option<Entry> > fetch(const string& name);
+  Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+private:
+  // Helpers for interacting with leveldb.
+  Try<Option<Entry> > get(const string& name);
+  Try<bool> put(const Entry& entry);
+
+  const string path;
+  leveldb::DB* db;
+
+  Option<string> error;
+};
+
+
+LevelDBStateProcess::LevelDBStateProcess(const string& _path)
+  : path(_path), db(NULL) {}
+
+
+LevelDBStateProcess::~LevelDBStateProcess()
+{
+  delete db; // Might be null if open failed in LevelDBStateProcess::initialize.
+}
+
+
+void LevelDBStateProcess::initialize()
+{
+  leveldb::Options options;
+  options.create_if_missing = true;
+
+  leveldb::Status status = leveldb::DB::Open(options, path, &db);
+
+  if (!status.ok()) {
+    // TODO(benh): Consider trying to repair the DB.
+    error = Option<string>::some(status.ToString());
+  }
+
+  // TODO(benh): Conditionally compact to avoid long recovery times?
+  db->CompactRange(NULL, NULL);
+}
+
+
+Future<Option<Entry> > LevelDBStateProcess::fetch(const string& name)
+{
+  if (error.isSome()) {
+    return Future<Option<Entry> >::failed(error.get());
+  }
+
+  Try<Option<Entry> > option = get(name);
+
+  if (option.isError()) {
+    return Future<Option<Entry> >::failed(option.error());
+  }
+
+  return option.get();
+}
+
+
+Future<bool> LevelDBStateProcess::swap(const Entry& entry, const UUID& uuid)
+{
+  if (error.isSome()) {
+    return Future<bool>::failed(error.get());
+  }
+
+  // We do a fetch first to make sure the version has not changed. This
+  // could be optimized in the future, for now it will probably hit
+  // the cache anyway.
+  Try<Option<Entry> > option = get(entry.name());
+
+  if (option.isError()) {
+    return Future<bool>::failed(option.error());
+  }
+
+  if (option.get().isSome()) {
+    if (UUID::fromBytes(option.get().get().uuid()) != uuid) {
+      return false;
+    }
+  }
+
+  // Note that there is no need to do the DB::Get and DB::Put
+  // "atomically" because only one db can be opened at a time, so
+  // there can not be any writes that occur concurrently.
+
+  Try<bool> result = put(entry);
+
+  if (result.isError()) {
+    return Future<bool>::failed(result.error());
+  }
+
+  return result.get();
+}
+
+
+Try<Option<Entry> > LevelDBStateProcess::get(const string& name)
+{
+  CHECK(error.isNone());
+
+  leveldb::ReadOptions options;
+
+  string value;
+
+  leveldb::Status status = db->Get(options, name, &value);
+
+  if (status.IsNotFound()) {
+    return Option<Entry>::none();
+  } else if (!status.ok()) {
+    return Try<Option<Entry> >::error(status.ToString());
+  }
+
+  google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
+
+  Entry entry;
+
+  if (!entry.ParseFromZeroCopyStream(&stream)) {
+    return Try<Option<Entry> >::error("Failed to deserialize Entry");
+  }
+
+  return Option<Entry>::some(entry);
+}
+
+
+Try<bool> LevelDBStateProcess::put(const Entry& entry)
+{
+  CHECK(error.isNone());
+
+  leveldb::WriteOptions options;
+  options.sync = true;
+
+  string value;
+
+  if (!entry.SerializeToString(&value)) {
+    return Try<bool>::error("Failed to serialize Entry");
+  }
+
+  leveldb::Status status = db->Put(options, entry.name(), value);
+
+  if (!status.ok()) {
+    return Try<bool>::error(status.ToString());
+  }
+
+  return true;
+}
+
+
+LevelDBState::LevelDBState(const std::string& path)
+{
+  process = new LevelDBStateProcess(path);
+  spawn(process);
+}
+
+
+LevelDBState::~LevelDBState()
+{
+  terminate(process);
+  wait(process);
+  delete process;
+}
+
+
+Future<Option<Entry> > LevelDBState::fetch(const string& name)
+{
+  return dispatch(process, &LevelDBStateProcess::fetch, name);
+}
+
+
+Future<bool> LevelDBState::swap(const Entry& entry, const UUID& uuid)
+{
+  return dispatch(process, &LevelDBStateProcess::swap, entry, uuid);
+}
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {

Added: incubator/mesos/trunk/src/state/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/state.hpp?rev=1338382&view=auto
==============================================================================
--- incubator/mesos/trunk/src/state/state.hpp (added)
+++ incubator/mesos/trunk/src/state/state.hpp Mon May 14 20:16:52 2012
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STATE_HPP__
+#define __STATE_HPP__
+
+#include <google/protobuf/message.h>
+
+#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+
+#include <string>
+
+#include <process/future.hpp>
+
+#include "common/logging.hpp"
+#include "common/option.hpp"
+#include "common/seconds.hpp"
+#include "common/uuid.hpp"
+
+#include "messages/state.hpp"
+
+#include "zookeeper/authentication.hpp"
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+// Forward declarations.
+class LevelDBStateProcess;
+class ZooKeeperStateProcess;
+
+// An abstraction of "state" (possibly between multiple distributed
+// components) represented by "variables" (effectively key/value
+// pairs). The value of a variable in the state must be a protocol
+// buffer. Variables are versioned such that setting a variable in the
+// state will only succeed if the variable has not changed since last
+// fetched. Varying implementations of state provide varying
+// replicated guarantees.
+//
+// Note that the semantics of 'get' and 'set' provide atomicity. That
+// is, you can not set a variable that has changed since you did the
+// last get. That is, if a set succeeds then no other writes have been
+// performed on the variable since your get.
+
+// Example:
+//   State* state = new ZooKeeperState();
+//   Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+//   State::Variable<Slaves> slaves = variable.get();
+//   slaves->add_infos()->MergeFrom(info);
+//   Future<bool> set = state->set(&slaves);
+
+class State
+{
+public:
+  template <typename T>
+  class Variable
+  {
+  public:
+    T* operator -> ()
+    {
+      return &t;
+    }
+
+  private:
+    friend class State; // Creates and manages variables.
+
+    Variable(const Entry& _entry, const T& _t)
+      : entry(_entry), t(_t)
+    {
+      const google::protobuf::Message* message = &t; // Check T is a protobuf.
+    }
+
+    Entry entry;
+    T t;
+  };
+
+  State() {}
+  virtual ~State() {}
+
+  // Returns a variable from the state, creating a new one if one
+  // previously did not exist (or an error if one occurs).
+  template <typename T>
+  process::Future<Variable<T> > get(const std::string& name);
+
+  // Returns true if the variable was successfully set in the state,
+  // otherwise false if the version of the variable was no longer
+  // valid (or an error if one occurs).
+  template <typename T>
+  process::Future<bool> set(Variable<T>* variable);
+
+protected:
+  // Fetch and swap state entries, factored out to allow State
+  // implementations to be agnostic of Variable which is templated.
+  virtual process::Future<Option<Entry> > fetch(const std::string& name) = 0;
+  virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid) = 0;
+
+private:
+  // Helper to convert an Entry to a Variable<T>. We make this a
+  // static member of State for friend access to Variable's
+  // constructor.
+  template <typename T>
+  static process::Future<State::Variable<T> > convert(
+      const std::string& name,
+      const Option<Entry>& option);
+};
+
+
+// Helper for converting an Entry into a Variable<T>.
+template <typename T>
+process::Future<State::Variable<T> > State::convert(
+    const std::string& name,
+    const Option<Entry>& option)
+{
+  T t;
+
+  if (option.isSome()) {
+    const Entry& entry = option.get();
+
+    // TODO(benh): Check _compatibility_ versus equivalance.
+    CHECK(t.GetDescriptor()->full_name() == entry.type());
+
+    const std::string& value = entry.value();
+    google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
+    if (!t.ParseFromZeroCopyStream(&stream)) {
+      return process::Future<State::Variable<T> >::failed(
+          "Failed to deserialize " + t.GetDescriptor()->full_name());
+    }
+
+    return State::Variable<T>(entry, t);
+  }
+
+  // Otherwise, construct a Variable out of a new Entry with a default
+  // value for T (and a random UUID to start).
+  std::string value;
+
+  if (!t.SerializeToString(&value)) {
+    return process::Future<State::Variable<T> >::failed(
+        "Failed to serialize " + t.GetDescriptor()->full_name());
+  }
+
+  Entry entry;
+  entry.set_name(name);
+  entry.set_uuid(UUID::random().toBytes());
+  entry.set_type(t.GetDescriptor()->full_name());
+  entry.set_value(value);
+
+  return State::Variable<T>(entry, t);
+}
+
+
+template <typename T>
+process::Future<State::Variable<T> > State::get(const std::string& name)
+{
+  std::tr1::function<
+  process::Future<State::Variable<T> >(const Option<Entry>&)> convert =
+    std::tr1::bind(&State::convert<T>, name, std::tr1::placeholders::_1);
+
+  return fetch(name).then(convert);
+}
+
+
+template <typename T>
+process::Future<bool> State::set(State::Variable<T>* variable)
+{
+  std::string value;
+  if (!variable->t.SerializeToString(&value)) {
+    return process::Future<bool>::failed(
+        "Failed to serialize " + variable->entry.type());
+  }
+
+  // Note that we try and swap an entry even if the value didn't change!
+  UUID uuid = UUID::fromBytes(variable->entry.uuid());
+
+  // Update the UUID and value of the entry.
+  variable->entry.set_uuid(UUID::random().toBytes());
+  variable->entry.set_value(value);
+
+  return swap(variable->entry, uuid);
+}
+
+
+class LevelDBState : public State
+{
+public:
+  LevelDBState(const std::string& path);
+  virtual ~LevelDBState();
+
+protected:
+  // State implementation.
+  virtual process::Future<Option<Entry> > fetch(const std::string& name);
+  virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+private:
+  LevelDBStateProcess* process;
+};
+
+
+class ZooKeeperState : public State
+{
+public:
+  // TODO(benh): Just take a zookeeper::URL.
+  ZooKeeperState(
+      const std::string& servers,
+      const seconds& timeout,
+      const std::string& znode,
+      const Option<zookeeper::Authentication>& auth =
+      Option<zookeeper::Authentication>());
+  virtual ~ZooKeeperState();
+
+protected:
+  // State implementation.
+  virtual process::Future<Option<Entry> > fetch(const std::string& name);
+  virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+private:
+  ZooKeeperStateProcess* process;
+};
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_HPP__
+
+
+
+
+// need a Future::operator -> (), plus a test
+
+
+// have a way to "watch" a Variable and get a future that signifies when/if it has changed?
+
+// need a master work directory for local leveldb version of State!
+
+// use leveldb for non-ha version of master (no reading/writing files)
+
+// need to set the location of master detector zk znode, and set this znode location

Added: incubator/mesos/trunk/src/state/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/zookeeper.cpp?rev=1338382&view=auto
==============================================================================
--- incubator/mesos/trunk/src/state/zookeeper.cpp (added)
+++ incubator/mesos/trunk/src/state/zookeeper.cpp Mon May 14 20:16:52 2012
@@ -0,0 +1,460 @@
+#include <google/protobuf/message.h>
+
+#include <queue>
+#include <string>
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include "common/logging.hpp"
+#include "common/option.hpp"
+#include "common/result.hpp"
+#include "common/seconds.hpp"
+#include "common/strings.hpp"
+#include "common/try.hpp"
+#include "common/uuid.hpp"
+
+#include "messages/state.hpp"
+
+#include "state/state.hpp"
+
+#include "zookeeper/authentication.hpp"
+#include "zookeeper/watcher.hpp"
+#include "zookeeper/zookeeper.hpp"
+
+using namespace process;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+using std::queue;
+using std::string;
+
+using zookeeper::Authentication;
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+class ZooKeeperStateProcess : public Process<ZooKeeperStateProcess>
+{
+public:
+  ZooKeeperStateProcess(
+      const string& servers,
+      const seconds& timeout,
+      const string& znode,
+      const Option<Authentication>& auth);
+  virtual ~ZooKeeperStateProcess();
+
+  virtual void initialize();
+
+  // State implementation.
+  Future<Option<Entry> > fetch(const string& name);
+  Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+  // ZooKeeper events.
+  void connected(bool reconnect);
+  void reconnecting();
+  void expired();
+  void updated(const string& path);
+  void created(const string& path);
+  void deleted(const string& path);
+
+private:
+  // Helpers for fetching and swapping.
+  Result<Option<Entry> > doFetch(const string& name);
+  Result<bool> doSwap(const Entry& entry, const UUID& uuid);
+
+  const string servers;
+  const seconds timeout;
+  const string znode;
+
+  Option<Authentication> auth; // ZooKeeper authentication.
+
+  const ACL_vector acl; // Default ACL to use.
+
+  Watcher* watcher;
+  ZooKeeper* zk;
+
+  enum State { // ZooKeeper connection state.
+    DISCONNECTED,
+    CONNECTING,
+    CONNECTED,
+  } state;
+
+  struct Fetch
+  {
+    Fetch(const string& _name)
+      : name(_name) {}
+    string name;
+    Promise<Option<Entry> > promise;
+  };
+
+  struct Swap
+  {
+    Swap(const Entry& _entry, const UUID& _uuid)
+      : entry(_entry), uuid(_uuid) {}
+    Entry entry;
+    UUID uuid;
+    Promise<bool> promise;
+  };
+
+  struct {
+    queue<Fetch*> fetches;
+    queue<Swap*> swaps;
+  } pending;
+
+  Option<string> error;
+};
+
+
+// Helper for failing a queue of promises.
+template <typename T>
+void fail(queue<T*>* queue, const string& message)
+{
+  while (!queue->empty()) {
+    T* t = queue->front();
+    queue->pop();
+    t->promise.fail(message);
+    delete t;
+  }
+}
+
+
+ZooKeeperStateProcess::ZooKeeperStateProcess(
+    const string& _servers,
+    const seconds& _timeout,
+    const string& _znode,
+    const Option<Authentication>& _auth)
+  : servers(_servers),
+    timeout(_timeout),
+    znode(strings::remove(_znode, "/", strings::SUFFIX)),
+    auth(_auth),
+    acl(_auth.isSome()
+        ? zookeeper::EVERYONE_READ_CREATOR_ALL
+        : ZOO_OPEN_ACL_UNSAFE),
+    watcher(NULL),
+    zk(NULL),
+    state(DISCONNECTED)
+{}
+
+
+ZooKeeperStateProcess::~ZooKeeperStateProcess()
+{
+  fail(&pending.fetches, "No longer managing state");
+  fail(&pending.swaps, "No longer managing state");
+
+  delete zk;
+  delete watcher;
+}
+
+
+void ZooKeeperStateProcess::initialize()
+{
+  // Doing initialization here allows to avoid the race between
+  // instantiating the ZooKeeper instance and being spawned ourself.
+  watcher = new ProcessWatcher<ZooKeeperStateProcess>(self());
+  zk = new ZooKeeper(servers, timeout, watcher);
+}
+
+
+Future<Option<Entry> > ZooKeeperStateProcess::fetch(const string& name)
+{
+  if (error.isSome()) {
+    return Future<Option<Entry> >::failed(error.get());
+  } else if (state != CONNECTED) {
+    Fetch* fetch = new Fetch(name);
+    pending.fetches.push(fetch);
+    return fetch->promise.future();
+  }
+
+  Result<Option<Entry> > result = doFetch(name);
+
+  if (result.isNone()) { // Try again later.
+    Fetch* fetch = new Fetch(name);
+    pending.fetches.push(fetch);
+    return fetch->promise.future();
+  } else if (result.isError()) {
+    return Future<Option<Entry> >::failed(result.error());
+  }
+
+  return result.get();
+}
+
+
+Future<bool> ZooKeeperStateProcess::swap(const Entry& entry, const UUID& uuid)
+{
+  if (error.isSome()) {
+    return Future<bool>::failed(error.get());
+  } else if (state != CONNECTED) {
+    Swap* swap = new Swap(entry, uuid);
+    pending.swaps.push(swap);
+    return swap->promise.future();
+  }
+
+  Result<bool> result = doSwap(entry, uuid);
+
+  if (result.isNone()) { // Try again later.
+    Swap* swap = new Swap(entry, uuid);
+    pending.swaps.push(swap);
+    return swap->promise.future();
+  } else if (result.isError()) {
+    return Future<bool>::failed(result.error());
+  }
+
+  return result.get();
+}
+
+
+void ZooKeeperStateProcess::connected(bool reconnect)
+{
+  if (!reconnect) {
+    // Authenticate if necessary (and we are connected for the first
+    // time, or after a session expiration).
+    if (auth.isSome()) {
+      LOG(INFO) << "Authenticating with ZooKeeper using " << auth.get().scheme;
+
+      int code = zk->authenticate(auth.get().scheme, auth.get().credentials);
+
+      if (code != ZOK) { // TODO(benh): Authentication retries?
+        Try<string> message = strings::format(
+            "Failed to authenticate with ZooKeeper: %s", zk->message(code));
+        error = message.isSome()
+          ? message.get()
+          : "Failed to authenticate with ZooKeeper";
+        return;
+      }
+    }
+  }
+
+  state = CONNECTED;
+
+  while (!pending.fetches.empty()) {
+    Fetch* fetch = pending.fetches.front();
+    Result<Option<Entry> > result = doFetch(fetch->name);
+    if (result.isNone()) {
+      return; // Try again later.
+    } else if (result.isError()) {
+      fetch->promise.fail(result.error());
+    } else {
+      fetch->promise.set(result.get());
+    }
+    pending.fetches.pop();
+    delete fetch;
+  }
+
+  while (!pending.swaps.empty()) {
+    Swap* swap = pending.swaps.front();
+    Result<bool> result = doSwap(swap->entry, swap->uuid);
+    if (result.isNone()) {
+      return; // Try again later.
+    } else if (result.isError()) {
+      swap->promise.fail(result.error());
+    } else {
+      swap->promise.set(result.get());
+    }
+    pending.swaps.pop();
+    delete swap;
+  }
+}
+
+
+void ZooKeeperStateProcess::reconnecting()
+{
+  state = CONNECTING;
+}
+
+
+void ZooKeeperStateProcess::expired()
+{
+  state = DISCONNECTED;
+
+  delete zk;
+  zk = new ZooKeeper(servers, timeout, watcher);
+
+  state = CONNECTING;
+}
+
+
+void ZooKeeperStateProcess::updated(const string& path)
+{
+  LOG(FATAL) << "Unexpected ZooKeeper event";
+}
+
+
+void ZooKeeperStateProcess::created(const string& path)
+{
+  LOG(FATAL) << "Unexpected ZooKeeper event";
+}
+
+
+void ZooKeeperStateProcess::deleted(const string& path)
+{
+  LOG(FATAL) << "Unexpected ZooKeeper event";
+}
+
+
+Result<Option<Entry> > ZooKeeperStateProcess::doFetch(const string& name)
+{
+  CHECK(error.isNone()) << ": " << error.get();
+  CHECK(state == CONNECTED);
+
+  string result;
+  Stat stat;
+
+  int code = zk->get(znode + "/" + name, false, &result, &stat);
+
+  if (code == ZNONODE) {
+    return Option<Entry>::none();
+  } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    return Result<Option<Entry> >::none();
+  } else if (code != ZOK) {
+    return Result<Option<Entry> >::error(
+        "Failed to get '" + znode + "/" + name +
+        "' in ZooKeeper: " + zk->message(code));
+  }
+
+  google::protobuf::io::ArrayInputStream stream(result.data(), result.size());
+
+  Entry entry;
+
+  if (!entry.ParseFromZeroCopyStream(&stream)) {
+    return Result<Option<Entry> >::error("Failed to deserialize Entry");
+  }
+
+  return Option<Entry>::some(entry);
+}
+
+
+Result<bool> ZooKeeperStateProcess::doSwap(const Entry& entry, const UUID& uuid)
+{
+  CHECK(error.isNone()) << ": " << error.get();
+  CHECK(state == CONNECTED);
+
+  // Serialize to make sure we're under the 1 MB limit.
+  string data;
+
+  if (!entry.SerializeToString(&data)) {
+    return Result<bool>::error("Failed to serialize Entry");
+  }
+
+  if (data.size() > 1024 * 1024) { // 1 MB
+    // TODO(benh): Implement compression.
+    return Result<bool>::error("Serialized data is too big (> 1 MB)");
+  }
+
+  string result;
+  Stat stat;
+
+  int code = zk->get(znode + "/" + entry.name(), false, &result, &stat);
+
+  if (code == ZNONODE) {
+    // Create directory path znodes as necessary.
+    CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
+    size_t index = znode.find("/", 0);
+
+    while (index < string::npos) {
+      // Get out the prefix to create.
+      index = znode.find("/", index + 1);
+      string prefix = znode.substr(0, index);
+
+      // Create the znode (even if it already exists).
+      code = zk->create(prefix, "", acl, 0, NULL);
+
+      if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+        CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+        return Result<bool>::none();
+      } else if (code != ZOK && code != ZNODEEXISTS) {
+        return Result<bool>::error(
+            "Failed to create '" + prefix +
+            "' in ZooKeeper: " + zk->message(code));
+      }
+    }
+
+    code = zk->create(znode + "/" + entry.name(), data, acl, 0, NULL);
+
+    if (code == ZNODEEXISTS) {
+      return false; // Lost a race with someone else.
+    } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+      CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+      return Result<bool>::none();
+    } else if (code != ZOK) {
+      return Result<bool>::error(
+          "Failed to create '" + znode + "/" + entry.name() +
+          "' in ZooKeeper: " + zk->message(code));
+    }
+
+    return true;
+  } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    return Result<bool>::none();
+  } else if (code != ZOK) {
+    return Result<bool>::error(
+        "Failed to get '" + znode + "/" + entry.name() +
+        "' in ZooKeeper: " + zk->message(code));
+  }
+
+  google::protobuf::io::ArrayInputStream stream(result.data(), result.size());
+
+  Entry current;
+
+  if (!current.ParseFromZeroCopyStream(&stream)) {
+    return Result<bool>::error("Failed to deserialize Entry");
+  }
+
+  if (UUID::fromBytes(current.uuid()) != uuid) {
+    return false;
+  }
+
+  // Okay, do a set, we get atomic swap by requiring 'stat.version'.
+  code = zk->set(znode + "/" + entry.name(), data, stat.version);
+
+  if (code == ZBADVERSION) {
+    return false;
+  } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    return Result<bool>::none();
+  } else if (code != ZOK) {
+    return Result<bool>::error(
+        "Failed to set '" + znode + "/" + entry.name() +
+        "' in ZooKeeper: " + zk->message(code));
+  }
+
+  return true;
+}
+
+
+ZooKeeperState::ZooKeeperState(
+    const string& servers,
+    const seconds& timeout,
+    const string& znode,
+    const Option<Authentication>& auth)
+{
+  process = new ZooKeeperStateProcess(servers, timeout, znode, auth);
+  spawn(process);
+}
+
+
+ZooKeeperState::~ZooKeeperState()
+{
+  terminate(process);
+  wait(process);
+  delete process;
+}
+
+
+Future<Option<Entry> > ZooKeeperState::fetch(const string& name)
+{
+  return dispatch(process, &ZooKeeperStateProcess::fetch, name);
+}
+
+
+Future<bool> ZooKeeperState::swap(const Entry& entry, const UUID& uuid)
+{
+  return dispatch(process, &ZooKeeperStateProcess::swap, entry, uuid);
+}
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {

Modified: incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp?rev=1338382&r1=1338381&r2=1338382&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp (original)
+++ incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp Mon May 14 20:16:52 2012
@@ -62,7 +62,7 @@ public:
     };
 
     TestWatcher();
-    ~TestWatcher();
+    virtual ~TestWatcher();
 
     virtual void process(ZooKeeper* zk, int type, int state,
         const std::string& path);

Added: incubator/mesos/trunk/src/tests/state_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/state_tests.cpp?rev=1338382&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/state_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/state_tests.cpp Mon May 14 20:16:52 2012
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <set>
+#include <string>
+
+#include <mesos/mesos.hpp>
+
+#include <process/future.hpp>
+#include <process/protobuf.hpp>
+
+#include "common/option.hpp"
+#include "common/type_utils.hpp"
+#include "common/utils.hpp"
+
+#include "messages/messages.hpp"
+
+#include "state/state.hpp"
+
+#include "tests/base_zookeeper_test.hpp"
+#include "tests/utils.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::state;
+using namespace mesos::internal::test;
+
+using namespace process;
+
+
+void GetSetGet(State* state)
+{
+  Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+
+  variable.await();
+
+  ASSERT_TRUE(variable.isReady());
+
+  State::Variable<Slaves> slaves1 = variable.get();
+
+  EXPECT_TRUE(slaves1->infos().size() == 0);
+
+  SlaveInfo info;
+  info.set_hostname("localhost");
+  info.set_webui_hostname("localhost");
+
+  slaves1->add_infos()->MergeFrom(info);
+
+  Future<bool> result = state->set(&slaves1);
+
+  result.await();
+
+  ASSERT_TRUE(result.isReady());
+  EXPECT_TRUE(result.get());
+
+  variable = state->get<Slaves>("slaves");
+
+  variable.await();
+
+  ASSERT_TRUE(variable.isReady());
+
+  State::Variable<Slaves> slaves2 = variable.get();
+
+  ASSERT_TRUE(slaves2->infos().size() == 1);
+  EXPECT_EQ("localhost", slaves2->infos(0).hostname());
+  EXPECT_EQ("localhost", slaves2->infos(0).webui_hostname());
+}
+
+
+void GetSetSetGet(State* state)
+{
+  Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+
+  variable.await();
+
+  ASSERT_TRUE(variable.isReady());
+
+  State::Variable<Slaves> slaves1 = variable.get();
+
+  EXPECT_TRUE(slaves1->infos().size() == 0);
+
+  SlaveInfo info;
+  info.set_hostname("localhost");
+  info.set_webui_hostname("localhost");
+
+  slaves1->add_infos()->MergeFrom(info);
+
+  Future<bool> result = state->set(&slaves1);
+
+  result.await();
+
+  ASSERT_TRUE(result.isReady());
+  EXPECT_TRUE(result.get());
+
+  result = state->set(&slaves1);
+
+  result.await();
+
+  ASSERT_TRUE(result.isReady());
+  EXPECT_TRUE(result.get());
+
+  variable = state->get<Slaves>("slaves");
+
+  variable.await();
+
+  ASSERT_TRUE(variable.isReady());
+
+  State::Variable<Slaves> slaves2 = variable.get();
+
+  ASSERT_TRUE(slaves2->infos().size() == 1);
+  EXPECT_EQ("localhost", slaves2->infos(0).hostname());
+  EXPECT_EQ("localhost", slaves2->infos(0).webui_hostname());
+}
+
+
+void GetGetSetSetGet(State* state)
+{
+  Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+
+  variable.await();
+
+  ASSERT_TRUE(variable.isReady());
+
+  State::Variable<Slaves> slaves1 = variable.get();
+
+  EXPECT_TRUE(slaves1->infos().size() == 0);
+
+  variable = state->get<Slaves>("slaves");
+
+  variable.await();
+
+  ASSERT_TRUE(variable.isReady());
+
+  State::Variable<Slaves> slaves2 = variable.get();
+
+  EXPECT_TRUE(slaves2->infos().size() == 0);
+
+  SlaveInfo info2;
+  info2.set_hostname("localhost2");
+  info2.set_webui_hostname("localhost2");
+
+  slaves2->add_infos()->MergeFrom(info2);
+
+  Future<bool> result = state->set(&slaves2);
+
+  result.await();
+
+  ASSERT_TRUE(result.isReady());
+  EXPECT_TRUE(result.get());
+
+  SlaveInfo info1;
+  info1.set_hostname("localhost1");
+  info1.set_webui_hostname("localhost1");
+
+  slaves1->add_infos()->MergeFrom(info1);
+
+  result = state->set(&slaves1);
+
+  result.await();
+
+  ASSERT_TRUE(result.isReady());
+  EXPECT_FALSE(result.get());
+
+  variable = state->get<Slaves>("slaves");
+
+  variable.await();
+
+  ASSERT_TRUE(variable.isReady());
+
+  slaves1 = variable.get();
+
+  ASSERT_TRUE(slaves1->infos().size() == 1);
+  EXPECT_EQ("localhost2", slaves1->infos(0).hostname());
+  EXPECT_EQ("localhost2", slaves1->infos(0).webui_hostname());
+}
+
+
+class LevelDBStateTest : public ::testing::Test
+{
+public:
+  LevelDBStateTest()
+    : state(NULL), path(utils::os::getcwd() + "/.state") {}
+
+protected:
+  virtual void SetUp()
+  {
+    utils::os::rmdir(path);
+    state = new LevelDBState(path);
+  }
+
+  virtual void TearDown()
+  {
+    delete state;
+    utils::os::rmdir(path);
+  }
+
+  State* state;
+
+private:
+  const std::string path;
+};
+
+
+TEST_F(LevelDBStateTest, GetSetGet)
+{
+  GetSetGet(state);
+}
+
+
+TEST_F(LevelDBStateTest, GetSetSetGet)
+{
+  GetSetSetGet(state);
+}
+
+
+TEST_F(LevelDBStateTest, GetGetSetSetGet)
+{
+  GetGetSetSetGet(state);
+}
+
+
+class ZooKeeperStateTest : public mesos::internal::test::BaseZooKeeperTest
+{
+public:
+  ZooKeeperStateTest()
+    : state(NULL) {}
+
+protected:
+  virtual void SetUp()
+  {
+    BaseZooKeeperTest::SetUp();
+    state = new ZooKeeperState(zks->connectString(), NO_TIMEOUT, "/state/");
+  }
+
+  virtual void TearDown()
+  {
+    delete state;
+    BaseZooKeeperTest::TearDown();
+  }
+
+  State* state;
+};
+
+
+TEST_F(ZooKeeperStateTest, GetSetGet)
+{
+  GetSetGet(state);
+}
+
+
+TEST_F(ZooKeeperStateTest, GetSetSetGet)
+{
+  GetSetSetGet(state);
+}
+
+
+TEST_F(ZooKeeperStateTest, GetGetSetSetGet)
+{
+  GetGetSetSetGet(state);
+}

Modified: incubator/mesos/trunk/src/zookeeper/group.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper/group.cpp?rev=1338382&r1=1338381&r2=1338382&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper/group.cpp (original)
+++ incubator/mesos/trunk/src/zookeeper/group.cpp Mon May 14 20:16:52 2012
@@ -42,8 +42,8 @@ public:
   GroupProcess(const string& servers,
                const seconds& timeout,
                const string& znode,
-               const Option<Authentication>& auth = Option<Authentication>());
-  ~GroupProcess();
+               const Option<Authentication>& auth);
+  virtual ~GroupProcess();
 
   virtual void initialize();
 
@@ -183,6 +183,8 @@ GroupProcess::GroupProcess(
     acl(_auth.isSome()
         ? EVERYONE_READ_CREATOR_ALL
         : ZOO_OPEN_ACL_UNSAFE),
+    watcher(NULL),
+    zk(NULL),
     state(DISCONNECTED),
     retrying(false)
 {}
@@ -213,9 +215,7 @@ void GroupProcess::initialize()
 Future<Group::Membership> GroupProcess::join(const string& data)
 {
   if (error.isSome()) {
-    Promise<Group::Membership> promise;
-    promise.fail(error.get());
-    return promise.future();
+    return Future<Group::Membership>::failed(error.get());
   } else if (state != CONNECTED) {
     Join* join = new Join(data);
     pending.joins.push(join);
@@ -241,9 +241,7 @@ Future<Group::Membership> GroupProcess::
     pending.joins.push(join);
     return join->promise.future();
   } else if (membership.isError()) {
-    Promise<Group::Membership> promise;
-    promise.fail(membership.error());
-    return promise.future();
+    return Future<Group::Membership>::failed(membership.error());
   }
 
   return membership.get();
@@ -253,9 +251,7 @@ Future<Group::Membership> GroupProcess::
 Future<bool> GroupProcess::cancel(const Group::Membership& membership)
 {
   if (error.isSome()) {
-    Promise<bool> promise;
-    promise.fail(error.get());
-    return promise.future();
+    return Future<bool>::failed(error.get());
   } else if (owned.count(membership.id()) == 0) {
     // TODO(benh): Should this be an error? Right now a user can't
     // differentiate when 'false' means they can't cancel because it's
@@ -286,9 +282,7 @@ Future<bool> GroupProcess::cancel(const 
     pending.cancels.push(cancel);
     return cancel->promise.future();
   } else if (cancellation.isError()) {
-    Promise<bool> promise;
-    promise.fail(cancellation.error());
-    return promise.future();
+    return Future<bool>::failed(cancellation.error());
   }
 
   return cancellation.get();
@@ -298,9 +292,7 @@ Future<bool> GroupProcess::cancel(const 
 Future<string> GroupProcess::data(const Group::Membership& membership)
 {
   if (error.isSome()) {
-    Promise<string> promise;
-    promise.fail(error.get());
-    return promise.future();
+    return Future<string>::failed(error.get());
   } else if (state != CONNECTED) {
     Data* data = new Data(membership);
     pending.datas.push(data);
@@ -318,9 +310,7 @@ Future<string> GroupProcess::data(const 
     pending.datas.push(data);
     return data->promise.future();
   } else if (result.isError()) {
-    Promise<string> promise;
-    promise.fail(result.error());
-    return promise.future();
+    return Future<string>::failed(result.error());
   }
 
   return result.get();
@@ -331,9 +321,7 @@ Future<set<Group::Membership> > GroupPro
     const set<Group::Membership>& expected)
 {
   if (error.isSome()) {
-    Promise<set<Group::Membership> > promise;
-    promise.fail(error.get());
-    return promise.future();
+    return Future<set<Group::Membership> >::failed(error.get());
   } else if (state != CONNECTED) {
     Watch* watch = new Watch(expected);
     pending.watches.push(watch);
@@ -388,7 +376,8 @@ Future<Option<int64_t> > GroupProcess::s
 void GroupProcess::connected(bool reconnect)
 {
   if (!reconnect) {
-    // Authenticate if necessary.
+    // Authenticate if necessary (and we are connected for the first
+    // time, or after a session expiration).
     if (auth.isSome()) {
       LOG(INFO) << "Authenticating with ZooKeeper using " << auth.get().scheme;
 
@@ -405,9 +394,8 @@ void GroupProcess::connected(bool reconn
       }
     }
 
-    CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
-
     // Create directory path znodes as necessary.
+    CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
     size_t index = znode.find("/", 0);
 
     while (index < string::npos) {