You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/05/14 22:16:53 UTC
svn commit: r1338382 - in /incubator/mesos/trunk/src: Makefile.am
messages/state.hpp messages/state.proto state/ state/leveldb.cpp
state/state.hpp state/zookeeper.cpp tests/base_zookeeper_test.hpp
tests/state_tests.cpp zookeeper/group.cpp
Author: benh
Date: Mon May 14 20:16:52 2012
New Revision: 1338382
URL: http://svn.apache.org/viewvc?rev=1338382&view=rev
Log:
Added "state" abstraction, including implementations using leveldb and ZooKeeper.
Added:
incubator/mesos/trunk/src/messages/state.hpp
incubator/mesos/trunk/src/messages/state.proto
incubator/mesos/trunk/src/state/
incubator/mesos/trunk/src/state/leveldb.cpp
incubator/mesos/trunk/src/state/state.hpp
incubator/mesos/trunk/src/state/zookeeper.cpp
incubator/mesos/trunk/src/tests/state_tests.cpp
Modified:
incubator/mesos/trunk/src/Makefile.am
incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp
incubator/mesos/trunk/src/zookeeper/group.cpp
Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1338382&r1=1338381&r2=1338382&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Mon May 14 20:16:52 2012
@@ -119,6 +119,11 @@ LOG_PROTOS = messages/log.pb.cc messages
BUILT_SOURCES += $(LOG_PROTOS)
CLEANFILES += $(LOG_PROTOS)
+STATE_PROTOS = messages/state.pb.cc messages/state.pb.h
+
+BUILT_SOURCES += $(STATE_PROTOS)
+CLEANFILES += $(STATE_PROTOS)
+
# Targets for generating protocol buffer code.
%.pb.cc %.pb.h: $(top_srcdir)/include/mesos/%.proto
@@ -157,7 +162,7 @@ libmesos_no_third_party_la_SOURCES = sch
common/utils.cpp common/date_utils.cpp common/resources.cpp \
common/attributes.cpp common/values.cpp \
zookeeper/zookeeper.cpp zookeeper/authentication.cpp \
- zookeeper/group.cpp messages/log.proto messages/messages.proto
+ zookeeper/group.cpp messages/messages.proto
pkginclude_HEADERS = $(top_srcdir)/include/mesos/executor.hpp \
$(top_srcdir)/include/mesos/scheduler.hpp \
@@ -185,12 +190,11 @@ libmesos_no_third_party_la_SOURCES += co
common/strings.hpp common/values.hpp \
configurator/configuration.hpp configurator/configurator.hpp \
configurator/option.hpp detector/detector.hpp \
- launcher/launcher.hpp local/local.hpp log/coordinator.hpp \
- log/replica.hpp log/log.hpp log/network.hpp \
- master/allocator.hpp master/allocator_factory.hpp \
- master/constants.hpp master/frameworks_manager.hpp \
- master/http.hpp master/master.hpp master/simple_allocator.hpp \
- master/slaves_manager.hpp master/webui.hpp messages/log.hpp \
+ launcher/launcher.hpp local/local.hpp master/allocator.hpp \
+ master/allocator_factory.hpp master/constants.hpp \
+ master/frameworks_manager.hpp master/http.hpp \
+ master/master.hpp master/simple_allocator.hpp \
+ master/slaves_manager.hpp master/webui.hpp \
messages/messages.hpp slave/constants.hpp slave/http.hpp \
slave/isolation_module.hpp slave/isolation_module_factory.hpp \
slave/lxc_isolation_module.hpp \
@@ -229,12 +233,26 @@ libmesos_no_third_party_la_LIBADD += lib
# include the leveldb headers.
noinst_LTLIBRARIES += liblog.la
liblog_la_SOURCES = log/coordinator.cpp log/replica.cpp
+liblog_la_SOURCES += log/coordinator.hpp log/replica.hpp log/log.hpp \
+ log/network.hpp messages/log.hpp messages/log.proto
nodist_liblog_la_SOURCES = $(LOG_PROTOS)
liblog_la_CPPFLAGS = -I../$(LEVELDB)/include $(MESOS_CPPFLAGS)
libmesos_no_third_party_la_LIBADD += liblog.la
+# Convenience library for building "state" abstraction in order to
+# include the leveldb headers.
+noinst_LTLIBRARIES += libstate.la
+libstate_la_SOURCES = state/leveldb.cpp state/zookeeper.cpp
+libstate_la_SOURCES += state/state.hpp messages/state.hpp \
+ messages/state.proto
+nodist_libstate_la_SOURCES = $(STATE_PROTOS)
+libstate_la_CPPFLAGS = -I../$(LEVELDB)/include $(MESOS_CPPFLAGS)
+
+libmesos_no_third_party_la_LIBADD += libstate.la
+
+
# Convenience library for the webui to include Python specific flags.
if WEBUI
noinst_LTLIBRARIES += libwebui.la
@@ -632,7 +650,7 @@ no_executor_framework_LDADD = libmesos.l
check_PROGRAMS += mesos-tests
mesos_tests_SOURCES = tests/main.cpp tests/utils.cpp \
- tests/master_tests.cpp \
+ tests/master_tests.cpp tests/state_tests.cpp \
tests/resource_offers_tests.cpp \
tests/fault_tolerance_tests.cpp \
tests/log_tests.cpp tests/resources_tests.cpp \
Added: incubator/mesos/trunk/src/messages/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/state.hpp?rev=1338382&view=auto
==============================================================================
--- incubator/mesos/trunk/src/messages/state.hpp (added)
+++ incubator/mesos/trunk/src/messages/state.hpp Mon May 14 20:16:52 2012
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESSAGE_STATE_HPP__
+#define __MESSAGES_STATE_HPP__
+
+#include "messages/state.pb.h"
+
+#endif // __MESSAGES_STATE_HPP__
Added: incubator/mesos/trunk/src/messages/state.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/state.proto?rev=1338382&view=auto
==============================================================================
--- incubator/mesos/trunk/src/messages/state.proto (added)
+++ incubator/mesos/trunk/src/messages/state.proto Mon May 14 20:16:52 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mesos.internal.state;
+
+// Describes a state entry, effectively a key/value pair. We include
+// the protocol buffer descriptor so that we can check usage.
+message Entry {
+ required string name = 1;
+ required bytes uuid = 2;
+ required string type = 3;
+ // TODO(benh): Consider saving more information about the type.
+ // required google.protobuf.DescriptorProto message_type = 3;
+ required bytes value = 4;
+}
Added: incubator/mesos/trunk/src/state/leveldb.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/leveldb.cpp?rev=1338382&view=auto
==============================================================================
--- incubator/mesos/trunk/src/state/leveldb.cpp (added)
+++ incubator/mesos/trunk/src/state/leveldb.cpp Mon May 14 20:16:52 2012
@@ -0,0 +1,211 @@
+#include <leveldb/db.h>
+
+#include <google/protobuf/message.h>
+
+#include <string>
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include "common/logging.hpp"
+#include "common/option.hpp"
+#include "common/try.hpp"
+#include "common/uuid.hpp"
+
+#include "messages/state.hpp"
+
+#include "state/state.hpp"
+
+using namespace process;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+class LevelDBStateProcess : public Process<LevelDBStateProcess>
+{
+public:
+ LevelDBStateProcess(const string& path);
+ virtual ~LevelDBStateProcess();
+
+ virtual void initialize();
+
+ // State implementation.
+ Future<Option<Entry> > fetch(const string& name);
+ Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+private:
+ // Helpers for interacting with leveldb.
+ Try<Option<Entry> > get(const string& name);
+ Try<bool> put(const Entry& entry);
+
+ const string path;
+ leveldb::DB* db;
+
+ Option<string> error;
+};
+
+
+LevelDBStateProcess::LevelDBStateProcess(const string& _path)
+ : path(_path), db(NULL) {}
+
+
+LevelDBStateProcess::~LevelDBStateProcess()
+{
+ delete db; // Might be null if open failed in LevelDBStateProcess::initialize.
+}
+
+
+void LevelDBStateProcess::initialize()
+{
+ leveldb::Options options;
+ options.create_if_missing = true;
+
+ leveldb::Status status = leveldb::DB::Open(options, path, &db);
+
+ if (!status.ok()) {
+ // TODO(benh): Consider trying to repair the DB.
+ error = Option<string>::some(status.ToString());
+ }
+
+ // TODO(benh): Conditionally compact to avoid long recovery times?
+ db->CompactRange(NULL, NULL);
+}
+
+
+Future<Option<Entry> > LevelDBStateProcess::fetch(const string& name)
+{
+ if (error.isSome()) {
+ return Future<Option<Entry> >::failed(error.get());
+ }
+
+ Try<Option<Entry> > option = get(name);
+
+ if (option.isError()) {
+ return Future<Option<Entry> >::failed(option.error());
+ }
+
+ return option.get();
+}
+
+
+Future<bool> LevelDBStateProcess::swap(const Entry& entry, const UUID& uuid)
+{
+ if (error.isSome()) {
+ return Future<bool>::failed(error.get());
+ }
+
+ // We do a fetch first to make sure the version has not changed. This
+ // could be optimized in the future, for now it will probably hit
+ // the cache anyway.
+ Try<Option<Entry> > option = get(entry.name());
+
+ if (option.isError()) {
+ return Future<bool>::failed(option.error());
+ }
+
+ if (option.get().isSome()) {
+ if (UUID::fromBytes(option.get().get().uuid()) != uuid) {
+ return false;
+ }
+ }
+
+ // Note that there is no need to do the DB::Get and DB::Put
+ // "atomically" because only one db can be opened at a time, so
+ // there can not be any writes that occur concurrently.
+
+ Try<bool> result = put(entry);
+
+ if (result.isError()) {
+ return Future<bool>::failed(result.error());
+ }
+
+ return result.get();
+}
+
+
+Try<Option<Entry> > LevelDBStateProcess::get(const string& name)
+{
+ CHECK(error.isNone());
+
+ leveldb::ReadOptions options;
+
+ string value;
+
+ leveldb::Status status = db->Get(options, name, &value);
+
+ if (status.IsNotFound()) {
+ return Option<Entry>::none();
+ } else if (!status.ok()) {
+ return Try<Option<Entry> >::error(status.ToString());
+ }
+
+ google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
+
+ Entry entry;
+
+ if (!entry.ParseFromZeroCopyStream(&stream)) {
+ return Try<Option<Entry> >::error("Failed to deserialize Entry");
+ }
+
+ return Option<Entry>::some(entry);
+}
+
+
+Try<bool> LevelDBStateProcess::put(const Entry& entry)
+{
+ CHECK(error.isNone());
+
+ leveldb::WriteOptions options;
+ options.sync = true;
+
+ string value;
+
+ if (!entry.SerializeToString(&value)) {
+ return Try<bool>::error("Failed to serialize Entry");
+ }
+
+ leveldb::Status status = db->Put(options, entry.name(), value);
+
+ if (!status.ok()) {
+ return Try<bool>::error(status.ToString());
+ }
+
+ return true;
+}
+
+
+LevelDBState::LevelDBState(const std::string& path)
+{
+ process = new LevelDBStateProcess(path);
+ spawn(process);
+}
+
+
+LevelDBState::~LevelDBState()
+{
+ terminate(process);
+ wait(process);
+ delete process;
+}
+
+
+Future<Option<Entry> > LevelDBState::fetch(const string& name)
+{
+ return dispatch(process, &LevelDBStateProcess::fetch, name);
+}
+
+
+Future<bool> LevelDBState::swap(const Entry& entry, const UUID& uuid)
+{
+ return dispatch(process, &LevelDBStateProcess::swap, entry, uuid);
+}
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
Added: incubator/mesos/trunk/src/state/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/state.hpp?rev=1338382&view=auto
==============================================================================
--- incubator/mesos/trunk/src/state/state.hpp (added)
+++ incubator/mesos/trunk/src/state/state.hpp Mon May 14 20:16:52 2012
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STATE_HPP__
+#define __STATE_HPP__
+
+#include <google/protobuf/message.h>
+
+#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+
+#include <string>
+
+#include <process/future.hpp>
+
+#include "common/logging.hpp"
+#include "common/option.hpp"
+#include "common/seconds.hpp"
+#include "common/uuid.hpp"
+
+#include "messages/state.hpp"
+
+#include "zookeeper/authentication.hpp"
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+// Forward declarations.
+class LevelDBStateProcess;
+class ZooKeeperStateProcess;
+
+// An abstraction of "state" (possibly between multiple distributed
+// components) represented by "variables" (effectively key/value
+// pairs). The value of a variable in the state must be a protocol
+// buffer. Variables are versioned such that setting a variable in the
+// state will only succeed if the variable has not changed since last
+// fetched. Varying implementations of state provide varying
+// replicated guarantees.
+//
+// Note that the semantics of 'get' and 'set' provide atomicity. That
+// is, you can not set a variable that has changed since you did the
+// last get. That is, if a set succeeds then no other writes have been
+// performed on the variable since your get.
+
+// Example:
+// State* state = new ZooKeeperState();
+// Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+// State::Variable<Slaves> slaves = variable.get();
+// slaves->add_infos()->MergeFrom(info);
+// Future<bool> set = state->set(&slaves);
+
+class State
+{
+public:
+ template <typename T>
+ class Variable
+ {
+ public:
+ T* operator -> ()
+ {
+ return &t;
+ }
+
+ private:
+ friend class State; // Creates and manages variables.
+
+ Variable(const Entry& _entry, const T& _t)
+ : entry(_entry), t(_t)
+ {
+ const google::protobuf::Message* message = &t; // Check T is a protobuf.
+ }
+
+ Entry entry;
+ T t;
+ };
+
+ State() {}
+ virtual ~State() {}
+
+ // Returns a variable from the state, creating a new one if one
+ // previously did not exist (or an error if one occurs).
+ template <typename T>
+ process::Future<Variable<T> > get(const std::string& name);
+
+ // Returns true if the variable was successfully set in the state,
+ // otherwise false if the version of the variable was no longer
+ // valid (or an error if one occurs).
+ template <typename T>
+ process::Future<bool> set(Variable<T>* variable);
+
+protected:
+ // Fetch and swap state entries, factored out to allow State
+ // implementations to be agnostic of Variable which is templated.
+ virtual process::Future<Option<Entry> > fetch(const std::string& name) = 0;
+ virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid) = 0;
+
+private:
+ // Helper to convert an Entry to a Variable<T>. We make this a
+ // static member of State for friend access to Variable's
+ // constructor.
+ template <typename T>
+ static process::Future<State::Variable<T> > convert(
+ const std::string& name,
+ const Option<Entry>& option);
+};
+
+
+// Helper for converting an Entry into a Variable<T>.
+template <typename T>
+process::Future<State::Variable<T> > State::convert(
+ const std::string& name,
+ const Option<Entry>& option)
+{
+ T t;
+
+ if (option.isSome()) {
+ const Entry& entry = option.get();
+
+ // TODO(benh): Check _compatibility_ versus equivalance.
+ CHECK(t.GetDescriptor()->full_name() == entry.type());
+
+ const std::string& value = entry.value();
+ google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
+ if (!t.ParseFromZeroCopyStream(&stream)) {
+ return process::Future<State::Variable<T> >::failed(
+ "Failed to deserialize " + t.GetDescriptor()->full_name());
+ }
+
+ return State::Variable<T>(entry, t);
+ }
+
+ // Otherwise, construct a Variable out of a new Entry with a default
+ // value for T (and a random UUID to start).
+ std::string value;
+
+ if (!t.SerializeToString(&value)) {
+ return process::Future<State::Variable<T> >::failed(
+ "Failed to serialize " + t.GetDescriptor()->full_name());
+ }
+
+ Entry entry;
+ entry.set_name(name);
+ entry.set_uuid(UUID::random().toBytes());
+ entry.set_type(t.GetDescriptor()->full_name());
+ entry.set_value(value);
+
+ return State::Variable<T>(entry, t);
+}
+
+
+template <typename T>
+process::Future<State::Variable<T> > State::get(const std::string& name)
+{
+ std::tr1::function<
+ process::Future<State::Variable<T> >(const Option<Entry>&)> convert =
+ std::tr1::bind(&State::convert<T>, name, std::tr1::placeholders::_1);
+
+ return fetch(name).then(convert);
+}
+
+
+template <typename T>
+process::Future<bool> State::set(State::Variable<T>* variable)
+{
+ std::string value;
+ if (!variable->t.SerializeToString(&value)) {
+ return process::Future<bool>::failed(
+ "Failed to serialize " + variable->entry.type());
+ }
+
+ // Note that we try and swap an entry even if the value didn't change!
+ UUID uuid = UUID::fromBytes(variable->entry.uuid());
+
+ // Update the UUID and value of the entry.
+ variable->entry.set_uuid(UUID::random().toBytes());
+ variable->entry.set_value(value);
+
+ return swap(variable->entry, uuid);
+}
+
+
+class LevelDBState : public State
+{
+public:
+ LevelDBState(const std::string& path);
+ virtual ~LevelDBState();
+
+protected:
+ // State implementation.
+ virtual process::Future<Option<Entry> > fetch(const std::string& name);
+ virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+private:
+ LevelDBStateProcess* process;
+};
+
+
+class ZooKeeperState : public State
+{
+public:
+ // TODO(benh): Just take a zookeeper::URL.
+ ZooKeeperState(
+ const std::string& servers,
+ const seconds& timeout,
+ const std::string& znode,
+ const Option<zookeeper::Authentication>& auth =
+ Option<zookeeper::Authentication>());
+ virtual ~ZooKeeperState();
+
+protected:
+ // State implementation.
+ virtual process::Future<Option<Entry> > fetch(const std::string& name);
+ virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+private:
+ ZooKeeperStateProcess* process;
+};
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_HPP__
+
+
+
+
+// need a Future::operator -> (), plus a test
+
+
+// have a way to "watch" a Variable and get a future that signifies when/if it has changed?
+
+// need a master work directory for local leveldb version of State!
+
+// use leveldb for non-ha version of master (no reading/writing files)
+
+// need to set the location of master detector zk znode, and set this znode location
Added: incubator/mesos/trunk/src/state/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/zookeeper.cpp?rev=1338382&view=auto
==============================================================================
--- incubator/mesos/trunk/src/state/zookeeper.cpp (added)
+++ incubator/mesos/trunk/src/state/zookeeper.cpp Mon May 14 20:16:52 2012
@@ -0,0 +1,460 @@
+#include <google/protobuf/message.h>
+
+#include <queue>
+#include <string>
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include "common/logging.hpp"
+#include "common/option.hpp"
+#include "common/result.hpp"
+#include "common/seconds.hpp"
+#include "common/strings.hpp"
+#include "common/try.hpp"
+#include "common/uuid.hpp"
+
+#include "messages/state.hpp"
+
+#include "state/state.hpp"
+
+#include "zookeeper/authentication.hpp"
+#include "zookeeper/watcher.hpp"
+#include "zookeeper/zookeeper.hpp"
+
+using namespace process;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+using std::queue;
+using std::string;
+
+using zookeeper::Authentication;
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+class ZooKeeperStateProcess : public Process<ZooKeeperStateProcess>
+{
+public:
+ ZooKeeperStateProcess(
+ const string& servers,
+ const seconds& timeout,
+ const string& znode,
+ const Option<Authentication>& auth);
+ virtual ~ZooKeeperStateProcess();
+
+ virtual void initialize();
+
+ // State implementation.
+ Future<Option<Entry> > fetch(const string& name);
+ Future<bool> swap(const Entry& entry, const UUID& uuid);
+
+ // ZooKeeper events.
+ void connected(bool reconnect);
+ void reconnecting();
+ void expired();
+ void updated(const string& path);
+ void created(const string& path);
+ void deleted(const string& path);
+
+private:
+ // Helpers for fetching and swapping.
+ Result<Option<Entry> > doFetch(const string& name);
+ Result<bool> doSwap(const Entry& entry, const UUID& uuid);
+
+ const string servers;
+ const seconds timeout;
+ const string znode;
+
+ Option<Authentication> auth; // ZooKeeper authentication.
+
+ const ACL_vector acl; // Default ACL to use.
+
+ Watcher* watcher;
+ ZooKeeper* zk;
+
+ enum State { // ZooKeeper connection state.
+ DISCONNECTED,
+ CONNECTING,
+ CONNECTED,
+ } state;
+
+ struct Fetch
+ {
+ Fetch(const string& _name)
+ : name(_name) {}
+ string name;
+ Promise<Option<Entry> > promise;
+ };
+
+ struct Swap
+ {
+ Swap(const Entry& _entry, const UUID& _uuid)
+ : entry(_entry), uuid(_uuid) {}
+ Entry entry;
+ UUID uuid;
+ Promise<bool> promise;
+ };
+
+ struct {
+ queue<Fetch*> fetches;
+ queue<Swap*> swaps;
+ } pending;
+
+ Option<string> error;
+};
+
+
+// Helper for failing a queue of promises.
+template <typename T>
+void fail(queue<T*>* queue, const string& message)
+{
+ while (!queue->empty()) {
+ T* t = queue->front();
+ queue->pop();
+ t->promise.fail(message);
+ delete t;
+ }
+}
+
+
+ZooKeeperStateProcess::ZooKeeperStateProcess(
+ const string& _servers,
+ const seconds& _timeout,
+ const string& _znode,
+ const Option<Authentication>& _auth)
+ : servers(_servers),
+ timeout(_timeout),
+ znode(strings::remove(_znode, "/", strings::SUFFIX)),
+ auth(_auth),
+ acl(_auth.isSome()
+ ? zookeeper::EVERYONE_READ_CREATOR_ALL
+ : ZOO_OPEN_ACL_UNSAFE),
+ watcher(NULL),
+ zk(NULL),
+ state(DISCONNECTED)
+{}
+
+
+ZooKeeperStateProcess::~ZooKeeperStateProcess()
+{
+ fail(&pending.fetches, "No longer managing state");
+ fail(&pending.swaps, "No longer managing state");
+
+ delete zk;
+ delete watcher;
+}
+
+
+void ZooKeeperStateProcess::initialize()
+{
+ // Doing initialization here allows to avoid the race between
+ // instantiating the ZooKeeper instance and being spawned ourself.
+ watcher = new ProcessWatcher<ZooKeeperStateProcess>(self());
+ zk = new ZooKeeper(servers, timeout, watcher);
+}
+
+
+Future<Option<Entry> > ZooKeeperStateProcess::fetch(const string& name)
+{
+ if (error.isSome()) {
+ return Future<Option<Entry> >::failed(error.get());
+ } else if (state != CONNECTED) {
+ Fetch* fetch = new Fetch(name);
+ pending.fetches.push(fetch);
+ return fetch->promise.future();
+ }
+
+ Result<Option<Entry> > result = doFetch(name);
+
+ if (result.isNone()) { // Try again later.
+ Fetch* fetch = new Fetch(name);
+ pending.fetches.push(fetch);
+ return fetch->promise.future();
+ } else if (result.isError()) {
+ return Future<Option<Entry> >::failed(result.error());
+ }
+
+ return result.get();
+}
+
+
+Future<bool> ZooKeeperStateProcess::swap(const Entry& entry, const UUID& uuid)
+{
+ if (error.isSome()) {
+ return Future<bool>::failed(error.get());
+ } else if (state != CONNECTED) {
+ Swap* swap = new Swap(entry, uuid);
+ pending.swaps.push(swap);
+ return swap->promise.future();
+ }
+
+ Result<bool> result = doSwap(entry, uuid);
+
+ if (result.isNone()) { // Try again later.
+ Swap* swap = new Swap(entry, uuid);
+ pending.swaps.push(swap);
+ return swap->promise.future();
+ } else if (result.isError()) {
+ return Future<bool>::failed(result.error());
+ }
+
+ return result.get();
+}
+
+
+void ZooKeeperStateProcess::connected(bool reconnect)
+{
+ if (!reconnect) {
+ // Authenticate if necessary (and we are connected for the first
+ // time, or after a session expiration).
+ if (auth.isSome()) {
+ LOG(INFO) << "Authenticating with ZooKeeper using " << auth.get().scheme;
+
+ int code = zk->authenticate(auth.get().scheme, auth.get().credentials);
+
+ if (code != ZOK) { // TODO(benh): Authentication retries?
+ Try<string> message = strings::format(
+ "Failed to authenticate with ZooKeeper: %s", zk->message(code));
+ error = message.isSome()
+ ? message.get()
+ : "Failed to authenticate with ZooKeeper";
+ return;
+ }
+ }
+ }
+
+ state = CONNECTED;
+
+ while (!pending.fetches.empty()) {
+ Fetch* fetch = pending.fetches.front();
+ Result<Option<Entry> > result = doFetch(fetch->name);
+ if (result.isNone()) {
+ return; // Try again later.
+ } else if (result.isError()) {
+ fetch->promise.fail(result.error());
+ } else {
+ fetch->promise.set(result.get());
+ }
+ pending.fetches.pop();
+ delete fetch;
+ }
+
+ while (!pending.swaps.empty()) {
+ Swap* swap = pending.swaps.front();
+ Result<bool> result = doSwap(swap->entry, swap->uuid);
+ if (result.isNone()) {
+ return; // Try again later.
+ } else if (result.isError()) {
+ swap->promise.fail(result.error());
+ } else {
+ swap->promise.set(result.get());
+ }
+ pending.swaps.pop();
+ delete swap;
+ }
+}
+
+
+void ZooKeeperStateProcess::reconnecting()
+{
+ state = CONNECTING;
+}
+
+
+void ZooKeeperStateProcess::expired()
+{
+ state = DISCONNECTED;
+
+ delete zk;
+ zk = new ZooKeeper(servers, timeout, watcher);
+
+ state = CONNECTING;
+}
+
+
+void ZooKeeperStateProcess::updated(const string& path)
+{
+ LOG(FATAL) << "Unexpected ZooKeeper event";
+}
+
+
+void ZooKeeperStateProcess::created(const string& path)
+{
+ LOG(FATAL) << "Unexpected ZooKeeper event";
+}
+
+
+void ZooKeeperStateProcess::deleted(const string& path)
+{
+ LOG(FATAL) << "Unexpected ZooKeeper event";
+}
+
+
+Result<Option<Entry> > ZooKeeperStateProcess::doFetch(const string& name)
+{
+ CHECK(error.isNone()) << ": " << error.get();
+ CHECK(state == CONNECTED);
+
+ string result;
+ Stat stat;
+
+ int code = zk->get(znode + "/" + name, false, &result, &stat);
+
+ if (code == ZNONODE) {
+ return Option<Entry>::none();
+ } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+ CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+ return Result<Option<Entry> >::none();
+ } else if (code != ZOK) {
+ return Result<Option<Entry> >::error(
+ "Failed to get '" + znode + "/" + name +
+ "' in ZooKeeper: " + zk->message(code));
+ }
+
+ google::protobuf::io::ArrayInputStream stream(result.data(), result.size());
+
+ Entry entry;
+
+ if (!entry.ParseFromZeroCopyStream(&stream)) {
+ return Result<Option<Entry> >::error("Failed to deserialize Entry");
+ }
+
+ return Option<Entry>::some(entry);
+}
+
+
+Result<bool> ZooKeeperStateProcess::doSwap(const Entry& entry, const UUID& uuid)
+{
+ CHECK(error.isNone()) << ": " << error.get();
+ CHECK(state == CONNECTED);
+
+ // Serialize to make sure we're under the 1 MB limit.
+ string data;
+
+ if (!entry.SerializeToString(&data)) {
+ return Result<bool>::error("Failed to serialize Entry");
+ }
+
+ if (data.size() > 1024 * 1024) { // 1 MB
+ // TODO(benh): Implement compression.
+ return Result<bool>::error("Serialized data is too big (> 1 MB)");
+ }
+
+ string result;
+ Stat stat;
+
+ int code = zk->get(znode + "/" + entry.name(), false, &result, &stat);
+
+ if (code == ZNONODE) {
+ // Create directory path znodes as necessary.
+ CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
+ size_t index = znode.find("/", 0);
+
+ while (index < string::npos) {
+ // Get out the prefix to create.
+ index = znode.find("/", index + 1);
+ string prefix = znode.substr(0, index);
+
+ // Create the znode (even if it already exists).
+ code = zk->create(prefix, "", acl, 0, NULL);
+
+ if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+ CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+ return Result<bool>::none();
+ } else if (code != ZOK && code != ZNODEEXISTS) {
+ return Result<bool>::error(
+ "Failed to create '" + prefix +
+ "' in ZooKeeper: " + zk->message(code));
+ }
+ }
+
+ code = zk->create(znode + "/" + entry.name(), data, acl, 0, NULL);
+
+ if (code == ZNODEEXISTS) {
+ return false; // Lost a race with someone else.
+ } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+ CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+ return Result<bool>::none();
+ } else if (code != ZOK) {
+ return Result<bool>::error(
+ "Failed to create '" + znode + "/" + entry.name() +
+ "' in ZooKeeper: " + zk->message(code));
+ }
+
+ return true;
+ } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+ CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+ return Result<bool>::none();
+ } else if (code != ZOK) {
+ return Result<bool>::error(
+ "Failed to get '" + znode + "/" + entry.name() +
+ "' in ZooKeeper: " + zk->message(code));
+ }
+
+ google::protobuf::io::ArrayInputStream stream(result.data(), result.size());
+
+ Entry current;
+
+ if (!current.ParseFromZeroCopyStream(&stream)) {
+ return Result<bool>::error("Failed to deserialize Entry");
+ }
+
+ if (UUID::fromBytes(current.uuid()) != uuid) {
+ return false;
+ }
+
+ // Okay, do a set, we get atomic swap by requiring 'stat.version'.
+ code = zk->set(znode + "/" + entry.name(), data, stat.version);
+
+ if (code == ZBADVERSION) {
+ return false;
+ } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+ CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+ return Result<bool>::none();
+ } else if (code != ZOK) {
+ return Result<bool>::error(
+ "Failed to set '" + znode + "/" + entry.name() +
+ "' in ZooKeeper: " + zk->message(code));
+ }
+
+ return true;
+}
+
+
+ZooKeeperState::ZooKeeperState(
+ const string& servers,
+ const seconds& timeout,
+ const string& znode,
+ const Option<Authentication>& auth)
+{
+ process = new ZooKeeperStateProcess(servers, timeout, znode, auth);
+ spawn(process);
+}
+
+
+ZooKeeperState::~ZooKeeperState()
+{
+ terminate(process);
+ wait(process);
+ delete process;
+}
+
+
+Future<Option<Entry> > ZooKeeperState::fetch(const string& name)
+{
+ return dispatch(process, &ZooKeeperStateProcess::fetch, name);
+}
+
+
+Future<bool> ZooKeeperState::swap(const Entry& entry, const UUID& uuid)
+{
+ return dispatch(process, &ZooKeeperStateProcess::swap, entry, uuid);
+}
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
Modified: incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp?rev=1338382&r1=1338381&r2=1338382&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp (original)
+++ incubator/mesos/trunk/src/tests/base_zookeeper_test.hpp Mon May 14 20:16:52 2012
@@ -62,7 +62,7 @@ public:
};
TestWatcher();
- ~TestWatcher();
+ virtual ~TestWatcher();
virtual void process(ZooKeeper* zk, int type, int state,
const std::string& path);
Added: incubator/mesos/trunk/src/tests/state_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/state_tests.cpp?rev=1338382&view=auto
==============================================================================
--- incubator/mesos/trunk/src/tests/state_tests.cpp (added)
+++ incubator/mesos/trunk/src/tests/state_tests.cpp Mon May 14 20:16:52 2012
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <set>
+#include <string>
+
+#include <mesos/mesos.hpp>
+
+#include <process/future.hpp>
+#include <process/protobuf.hpp>
+
+#include "common/option.hpp"
+#include "common/type_utils.hpp"
+#include "common/utils.hpp"
+
+#include "messages/messages.hpp"
+
+#include "state/state.hpp"
+
+#include "tests/base_zookeeper_test.hpp"
+#include "tests/utils.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::state;
+using namespace mesos::internal::test;
+
+using namespace process;
+
+
+void GetSetGet(State* state)
+{
+ Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+
+ variable.await();
+
+ ASSERT_TRUE(variable.isReady());
+
+ State::Variable<Slaves> slaves1 = variable.get();
+
+ EXPECT_TRUE(slaves1->infos().size() == 0);
+
+ SlaveInfo info;
+ info.set_hostname("localhost");
+ info.set_webui_hostname("localhost");
+
+ slaves1->add_infos()->MergeFrom(info);
+
+ Future<bool> result = state->set(&slaves1);
+
+ result.await();
+
+ ASSERT_TRUE(result.isReady());
+ EXPECT_TRUE(result.get());
+
+ variable = state->get<Slaves>("slaves");
+
+ variable.await();
+
+ ASSERT_TRUE(variable.isReady());
+
+ State::Variable<Slaves> slaves2 = variable.get();
+
+ ASSERT_TRUE(slaves2->infos().size() == 1);
+ EXPECT_EQ("localhost", slaves2->infos(0).hostname());
+ EXPECT_EQ("localhost", slaves2->infos(0).webui_hostname());
+}
+
+
+void GetSetSetGet(State* state)
+{
+ Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+
+ variable.await();
+
+ ASSERT_TRUE(variable.isReady());
+
+ State::Variable<Slaves> slaves1 = variable.get();
+
+ EXPECT_TRUE(slaves1->infos().size() == 0);
+
+ SlaveInfo info;
+ info.set_hostname("localhost");
+ info.set_webui_hostname("localhost");
+
+ slaves1->add_infos()->MergeFrom(info);
+
+ Future<bool> result = state->set(&slaves1);
+
+ result.await();
+
+ ASSERT_TRUE(result.isReady());
+ EXPECT_TRUE(result.get());
+
+ result = state->set(&slaves1);
+
+ result.await();
+
+ ASSERT_TRUE(result.isReady());
+ EXPECT_TRUE(result.get());
+
+ variable = state->get<Slaves>("slaves");
+
+ variable.await();
+
+ ASSERT_TRUE(variable.isReady());
+
+ State::Variable<Slaves> slaves2 = variable.get();
+
+ ASSERT_TRUE(slaves2->infos().size() == 1);
+ EXPECT_EQ("localhost", slaves2->infos(0).hostname());
+ EXPECT_EQ("localhost", slaves2->infos(0).webui_hostname());
+}
+
+
+void GetGetSetSetGet(State* state)
+{
+ Future<State::Variable<Slaves> > variable = state->get<Slaves>("slaves");
+
+ variable.await();
+
+ ASSERT_TRUE(variable.isReady());
+
+ State::Variable<Slaves> slaves1 = variable.get();
+
+ EXPECT_TRUE(slaves1->infos().size() == 0);
+
+ variable = state->get<Slaves>("slaves");
+
+ variable.await();
+
+ ASSERT_TRUE(variable.isReady());
+
+ State::Variable<Slaves> slaves2 = variable.get();
+
+ EXPECT_TRUE(slaves2->infos().size() == 0);
+
+ SlaveInfo info2;
+ info2.set_hostname("localhost2");
+ info2.set_webui_hostname("localhost2");
+
+ slaves2->add_infos()->MergeFrom(info2);
+
+ Future<bool> result = state->set(&slaves2);
+
+ result.await();
+
+ ASSERT_TRUE(result.isReady());
+ EXPECT_TRUE(result.get());
+
+ SlaveInfo info1;
+ info1.set_hostname("localhost1");
+ info1.set_webui_hostname("localhost1");
+
+ slaves1->add_infos()->MergeFrom(info1);
+
+ result = state->set(&slaves1);
+
+ result.await();
+
+ ASSERT_TRUE(result.isReady());
+ EXPECT_FALSE(result.get());
+
+ variable = state->get<Slaves>("slaves");
+
+ variable.await();
+
+ ASSERT_TRUE(variable.isReady());
+
+ slaves1 = variable.get();
+
+ ASSERT_TRUE(slaves1->infos().size() == 1);
+ EXPECT_EQ("localhost2", slaves1->infos(0).hostname());
+ EXPECT_EQ("localhost2", slaves1->infos(0).webui_hostname());
+}
+
+
+class LevelDBStateTest : public ::testing::Test
+{
+public:
+ LevelDBStateTest()
+ : state(NULL), path(utils::os::getcwd() + "/.state") {}
+
+protected:
+ virtual void SetUp()
+ {
+ utils::os::rmdir(path);
+ state = new LevelDBState(path);
+ }
+
+ virtual void TearDown()
+ {
+ delete state;
+ utils::os::rmdir(path);
+ }
+
+ State* state;
+
+private:
+ const std::string path;
+};
+
+
+TEST_F(LevelDBStateTest, GetSetGet)
+{
+ GetSetGet(state);
+}
+
+
+TEST_F(LevelDBStateTest, GetSetSetGet)
+{
+ GetSetSetGet(state);
+}
+
+
+TEST_F(LevelDBStateTest, GetGetSetSetGet)
+{
+ GetGetSetSetGet(state);
+}
+
+
+class ZooKeeperStateTest : public mesos::internal::test::BaseZooKeeperTest
+{
+public:
+ ZooKeeperStateTest()
+ : state(NULL) {}
+
+protected:
+ virtual void SetUp()
+ {
+ BaseZooKeeperTest::SetUp();
+ state = new ZooKeeperState(zks->connectString(), NO_TIMEOUT, "/state/");
+ }
+
+ virtual void TearDown()
+ {
+ delete state;
+ BaseZooKeeperTest::TearDown();
+ }
+
+ State* state;
+};
+
+
+TEST_F(ZooKeeperStateTest, GetSetGet)
+{
+ GetSetGet(state);
+}
+
+
+TEST_F(ZooKeeperStateTest, GetSetSetGet)
+{
+ GetSetSetGet(state);
+}
+
+
+TEST_F(ZooKeeperStateTest, GetGetSetSetGet)
+{
+ GetGetSetSetGet(state);
+}
Modified: incubator/mesos/trunk/src/zookeeper/group.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper/group.cpp?rev=1338382&r1=1338381&r2=1338382&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper/group.cpp (original)
+++ incubator/mesos/trunk/src/zookeeper/group.cpp Mon May 14 20:16:52 2012
@@ -42,8 +42,8 @@ public:
GroupProcess(const string& servers,
const seconds& timeout,
const string& znode,
- const Option<Authentication>& auth = Option<Authentication>());
- ~GroupProcess();
+ const Option<Authentication>& auth);
+ virtual ~GroupProcess();
virtual void initialize();
@@ -183,6 +183,8 @@ GroupProcess::GroupProcess(
acl(_auth.isSome()
? EVERYONE_READ_CREATOR_ALL
: ZOO_OPEN_ACL_UNSAFE),
+ watcher(NULL),
+ zk(NULL),
state(DISCONNECTED),
retrying(false)
{}
@@ -213,9 +215,7 @@ void GroupProcess::initialize()
Future<Group::Membership> GroupProcess::join(const string& data)
{
if (error.isSome()) {
- Promise<Group::Membership> promise;
- promise.fail(error.get());
- return promise.future();
+ return Future<Group::Membership>::failed(error.get());
} else if (state != CONNECTED) {
Join* join = new Join(data);
pending.joins.push(join);
@@ -241,9 +241,7 @@ Future<Group::Membership> GroupProcess::
pending.joins.push(join);
return join->promise.future();
} else if (membership.isError()) {
- Promise<Group::Membership> promise;
- promise.fail(membership.error());
- return promise.future();
+ return Future<Group::Membership>::failed(membership.error());
}
return membership.get();
@@ -253,9 +251,7 @@ Future<Group::Membership> GroupProcess::
Future<bool> GroupProcess::cancel(const Group::Membership& membership)
{
if (error.isSome()) {
- Promise<bool> promise;
- promise.fail(error.get());
- return promise.future();
+ return Future<bool>::failed(error.get());
} else if (owned.count(membership.id()) == 0) {
// TODO(benh): Should this be an error? Right now a user can't
// differentiate when 'false' means they can't cancel because it's
@@ -286,9 +282,7 @@ Future<bool> GroupProcess::cancel(const
pending.cancels.push(cancel);
return cancel->promise.future();
} else if (cancellation.isError()) {
- Promise<bool> promise;
- promise.fail(cancellation.error());
- return promise.future();
+ return Future<bool>::failed(cancellation.error());
}
return cancellation.get();
@@ -298,9 +292,7 @@ Future<bool> GroupProcess::cancel(const
Future<string> GroupProcess::data(const Group::Membership& membership)
{
if (error.isSome()) {
- Promise<string> promise;
- promise.fail(error.get());
- return promise.future();
+ return Future<string>::failed(error.get());
} else if (state != CONNECTED) {
Data* data = new Data(membership);
pending.datas.push(data);
@@ -318,9 +310,7 @@ Future<string> GroupProcess::data(const
pending.datas.push(data);
return data->promise.future();
} else if (result.isError()) {
- Promise<string> promise;
- promise.fail(result.error());
- return promise.future();
+ return Future<string>::failed(result.error());
}
return result.get();
@@ -331,9 +321,7 @@ Future<set<Group::Membership> > GroupPro
const set<Group::Membership>& expected)
{
if (error.isSome()) {
- Promise<set<Group::Membership> > promise;
- promise.fail(error.get());
- return promise.future();
+ return Future<set<Group::Membership> >::failed(error.get());
} else if (state != CONNECTED) {
Watch* watch = new Watch(expected);
pending.watches.push(watch);
@@ -388,7 +376,8 @@ Future<Option<int64_t> > GroupProcess::s
void GroupProcess::connected(bool reconnect)
{
if (!reconnect) {
- // Authenticate if necessary.
+ // Authenticate if necessary (and we are connected for the first
+ // time, or after a session expiration).
if (auth.isSome()) {
LOG(INFO) << "Authenticating with ZooKeeper using " << auth.get().scheme;
@@ -405,9 +394,8 @@ void GroupProcess::connected(bool reconn
}
}
- CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
-
// Create directory path znodes as necessary.
+ CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
size_t index = znode.find("/", 0);
while (index < string::npos) {