You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/05/08 19:50:27 UTC
git commit: Refactored ZooKeeperImpl into ZooKeeperProcess.
Repository: mesos
Updated Branches:
refs/heads/master a3d0d88af -> e4336578a
Refactored ZooKeeperImpl into ZooKeeperProcess.
See MESOS-1318 for more details.
Review: https://reviews.apache.org/r/21182
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e4336578
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e4336578
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e4336578
Branch: refs/heads/master
Commit: e4336578a9ed932b0891e23fd83f7f7148689e0e
Parents: a3d0d88
Author: Benjamin Hindman <be...@berkeley.edu>
Authored: Thu May 8 10:50:04 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Thu May 8 10:50:04 2014 -0700
----------------------------------------------------------------------
src/zookeeper/zookeeper.cpp | 300 ++++++++++++++++++++++++++++-----------
src/zookeeper/zookeeper.hpp | 40 +++---
2 files changed, 235 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e4336578/src/zookeeper/zookeeper.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/zookeeper.cpp b/src/zookeeper/zookeeper.cpp
index e3b65e4..11029be 100644
--- a/src/zookeeper/zookeeper.cpp
+++ b/src/zookeeper/zookeeper.cpp
@@ -21,8 +21,8 @@
#include <iostream>
#include <map>
+#include <process/defer.hpp>
#include <process/dispatch.hpp>
-#include <process/once.hpp>
#include <process/process.hpp>
#include <stout/duration.hpp>
@@ -47,27 +47,32 @@ using std::vector;
using tuples::tuple;
-class ZooKeeperImpl
+class ZooKeeperProcess : public Process<ZooKeeperProcess>
{
public:
- ZooKeeperImpl(ZooKeeper* zk,
- const string& servers,
- const Duration& timeout,
- Watcher* watcher)
+ ZooKeeperProcess(
+ ZooKeeper* zk,
+ const string& servers,
+ const Duration& timeout,
+ Watcher* watcher)
: servers(servers),
- zk(zk),
- watcher(watcher)
+ timeout(timeout),
+ zh(NULL)
{
- if (watcher == NULL) {
- LOG(FATAL) << "Cannot instantiate ZooKeeper with NULL watcher";
- }
+ // We bind the Watcher::process callback so we can pass it to the
+ // C callback as a pointer and invoke it directly.
+ callback = lambda::bind(
+ &Watcher::process, watcher, zk, lambda::_1, lambda::_2, lambda::_3);
+ }
+ virtual void initialize()
+ {
zh = zookeeper_init(
servers.c_str(),
event,
static_cast<int>(timeout.ms()),
NULL,
- this,
+ &callback,
0);
if (zh == NULL) {
@@ -75,7 +80,7 @@ public:
}
}
- ~ZooKeeperImpl()
+ virtual void finalize()
{
int ret = zookeeper_close(zh);
if (ret != ZOK) {
@@ -84,6 +89,25 @@ public:
}
}
+ int getState()
+ {
+ return zoo_state(zh);
+ }
+
+ int64_t getSessionId()
+ {
+ return zoo_client_id(zh)->client_id;
+ }
+
+ Duration getSessionTimeout()
+ {
+ // ZooKeeper server uses int representation of milliseconds for
+ // session timeouts.
+ // See:
+ // http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html
+ return Milliseconds(zoo_recv_timeout(zh));
+ }
+
Future<int> authenticate(const string& scheme, const string& credentials)
{
Promise<int>* promise = new Promise<int>();
@@ -92,8 +116,13 @@ public:
tuple<Promise<int>*>* args = new tuple<Promise<int>*>(promise);
- int ret = zoo_add_auth(zh, scheme.c_str(), credentials.data(),
- credentials.size(), voidCompletion, args);
+ int ret = zoo_add_auth(
+ zh,
+ scheme.c_str(),
+ credentials.data(),
+ credentials.size(),
+ voidCompletion,
+ args);
if (ret != ZOK) {
delete promise;
@@ -104,8 +133,12 @@ public:
return future;
}
- Future<int> create(const string& path, const string& data,
- const ACL_vector& acl, int flags, string* result)
+ Future<int> create(
+ const string& path,
+ const string& data,
+ const ACL_vector& acl,
+ int flags,
+ string* result)
{
Promise<int>* promise = new Promise<int>();
@@ -114,8 +147,15 @@ public:
tuple<Promise<int>*, string*>* args =
new tuple<Promise<int>*, string*>(promise, result);
- int ret = zoo_acreate(zh, path.c_str(), data.data(), data.size(), &acl,
- flags, stringCompletion, args);
+ int ret = zoo_acreate(
+ zh,
+ path.c_str(),
+ data.data(),
+ data.size(),
+ &acl,
+ flags,
+ stringCompletion,
+ args);
if (ret != ZOK) {
delete promise;
@@ -126,6 +166,82 @@ public:
return future;
}
+ Future<int> create(
+ const string& path,
+ const string& data,
+ const ACL_vector& acl,
+ int flags,
+ string* result,
+ bool recursive)
+ {
+ if (!recursive) {
+ return create(path, data, acl, flags, result);
+ }
+
+ // First check if the path exists.
+ return exists(path, false, NULL)
+ .then(defer(self(),
+ &Self::_create,
+ path,
+ data,
+ acl,
+ flags,
+ result,
+ lambda::_1));
+ }
+
+ Future<int> _create(
+ const string& path,
+ const string& data,
+ const ACL_vector& acl,
+ int flags,
+ string* result,
+ int code)
+ {
+ if (code == ZOK) {
+ return ZNODEEXISTS;
+ }
+
+ // Now recursively create the parent path.
+ // NOTE: We don't use 'dirname()' to get the parent path here
+ // because, it doesn't return the expected path when a path ends
+ // with "/". For example, to create path "/a/b/", we want to
+ // recursively create "/a/b", instead of just creating "/a".
+ const string& parent = path.substr(0, path.find_last_of("/"));
+ if (!parent.empty()) {
+ return create(parent, "", acl, 0, result, true)
+ .then(defer(self(),
+ &Self::__create,
+ path,
+ data,
+ acl,
+ flags,
+ result,
+ lambda::_1));
+ }
+
+ return __create(path, data, acl, flags, result, ZOK);
+ }
+
+ Future<int> __create(
+ const string& path,
+ const string& data,
+ const ACL_vector& acl,
+ int flags,
+ string* result,
+ int code)
+ {
+ if (code != ZOK && code != ZNODEEXISTS) {
+ return code;
+ }
+
+ // Finally create the path.
+ // TODO(vinod): Delete any intermediate nodes created if this fails.
+ // This requires synchronization because the deletion might affect
+ // other callers (different threads/processes) acting on this path.
+ return create(path, data, acl, flags, result);
+ }
+
Future<int> remove(const string& path, int version)
{
Promise<int>* promise = new Promise<int>();
@@ -185,9 +301,10 @@ public:
return future;
}
- Future<int> getChildren(const string& path,
- bool watch,
- vector<string>* results)
+ Future<int> getChildren(
+ const string& path,
+ bool watch,
+ vector<string>* results)
{
Promise<int>* promise = new Promise<int>();
@@ -196,8 +313,8 @@ public:
tuple<Promise<int>*, vector<string>*>* args =
new tuple<Promise<int>*, vector<string>*>(promise, results);
- int ret = zoo_aget_children(zh, path.c_str(), watch, stringsCompletion,
- args);
+ int ret =
+ zoo_aget_children(zh, path.c_str(), watch, stringsCompletion, args);
if (ret != ZOK) {
delete promise;
@@ -217,8 +334,14 @@ public:
tuple<Promise<int>*, Stat*>* args =
new tuple<Promise<int>*, Stat*>(promise, NULL);
- int ret = zoo_aset(zh, path.c_str(), data.data(), data.size(),
- version, statCompletion, args);
+ int ret = zoo_aset(
+ zh,
+ path.c_str(),
+ data.data(),
+ data.size(),
+ version,
+ statCompletion,
+ args);
if (ret != ZOK) {
delete promise;
@@ -233,16 +356,17 @@ private:
// This method is registered as a watcher callback function and is
// invoked by a single ZooKeeper event thread.
static void event(
- zhandle_t* zh,
+ zhandle_t*,
int type,
int state,
const char* path,
- void* ctx)
+ void* context)
{
- ZooKeeperImpl* impl = static_cast<ZooKeeperImpl*>(ctx);
- impl->watcher->process(impl->zk, type, state, string(path));
- }
+ lambda::function<void(int, int, const string&)>* callback =
+ static_cast<lambda::function<void(int, int, const string&)>*>(context);
+ (*callback)(type, state, string(path));
+ }
static void voidCompletion(int ret, const void *data)
{
@@ -257,7 +381,6 @@ private:
delete args;
}
-
static void stringCompletion(int ret, const char* value, const void* data)
{
const tuple<Promise<int>*, string*> *args =
@@ -278,7 +401,6 @@ private:
delete args;
}
-
static void statCompletion(int ret, const Stat* stat, const void* data)
{
const tuple<Promise<int>*, Stat*>* args =
@@ -299,7 +421,6 @@ private:
delete args;
}
-
static void dataCompletion(
int ret,
const char* value,
@@ -330,7 +451,6 @@ private:
delete args;
}
-
static void stringsCompletion(
int ret,
const String_vector* values,
@@ -360,53 +480,59 @@ private:
friend class ZooKeeper;
const string servers; // ZooKeeper host:port pairs.
+ const Duration timeout; // ZooKeeper session timeout;
- ZooKeeper* zk; // ZooKeeper instance.
zhandle_t* zh; // ZooKeeper connection handle.
- Watcher* watcher; // Associated Watcher instance.
+ // Callback for invoking Watcher::process with the ZooKeeper*
+ // argument and Watcher* receiver already bound.
+ lambda::function<void(int, int, const string&)> callback;
};
-ZooKeeper::ZooKeeper(const string& servers,
- const Duration& timeout,
- Watcher* watcher)
+ZooKeeper::ZooKeeper(
+ const string& servers,
+ const Duration& timeout,
+ Watcher* watcher)
{
- impl = new ZooKeeperImpl(this, servers, timeout, watcher);
+ process = new ZooKeeperProcess(this, servers, timeout, watcher);
+ spawn(process);
}
ZooKeeper::~ZooKeeper()
{
- delete impl;
+ terminate(process);
+ wait(process);
+ delete process;
}
int ZooKeeper::getState()
{
- return zoo_state(impl->zh);
+ return dispatch(process, &ZooKeeperProcess::getState).get();
}
int64_t ZooKeeper::getSessionId()
{
- return zoo_client_id(impl->zh)->client_id;
+ return dispatch(process, &ZooKeeperProcess::getSessionId).get();
}
Duration ZooKeeper::getSessionTimeout() const
{
- // ZooKeeper server uses int representation of milliseconds for
- // session timeouts.
- // See:
- // http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html
- return Milliseconds(zoo_recv_timeout(impl->zh));
+ return dispatch(process, &ZooKeeperProcess::getSessionTimeout).get();
}
int ZooKeeper::authenticate(const string& scheme, const string& credentials)
{
- return impl->authenticate(scheme, credentials).get();
+ return dispatch(
+ process,
+ &ZooKeeperProcess::authenticate,
+ scheme,
+ credentials).get();
}
@@ -418,65 +544,69 @@ int ZooKeeper::create(
string* result,
bool recursive)
{
- if (!recursive) {
- return impl->create(path, data, acl, flags, result).get();
- }
-
- // First check if the path exists.
- int code = impl->exists(path, false, NULL).get();
- if (code == ZOK) {
- return ZNODEEXISTS;
- }
-
- // Now recursively create the parent path.
- // NOTE: We don't use 'dirname()' to get the parent path here
- // because, it doesn't return the expected path when a path ends
- // with "/". For example, to create path "/a/b/", we want to
- // recursively create "/a/b", instead of just creating "/a".
- const string& parent = path.substr(0, path.find_last_of("/"));
- if (!parent.empty()) {
- code = create(parent, "", acl, 0, result, true);
- if (code != ZOK && code != ZNODEEXISTS) {
- return code;
- }
- }
-
- // Finally create the path.
- // TODO(vinod): Delete any intermediate nodes created if this fails.
- // This requires synchronization because the deletion might affect
- // other callers (different threads/processes) acting on this path.
- return impl->create(path, data, acl, flags, result).get();
+ return dispatch(
+ process,
+ &ZooKeeperProcess::create,
+ path,
+ data,
+ acl,
+ flags,
+ result,
+ recursive).get();
}
int ZooKeeper::remove(const string& path, int version)
{
- return impl->remove(path, version).get();
+ return dispatch(process, &ZooKeeperProcess::remove, path, version).get();
}
int ZooKeeper::exists(const string& path, bool watch, Stat* stat)
{
- return impl->exists(path, watch, stat).get();
+ return dispatch(
+ process,
+ &ZooKeeperProcess::exists,
+ path,
+ watch,
+ stat).get();
}
int ZooKeeper::get(const string& path, bool watch, string* result, Stat* stat)
{
- return impl->get(path, watch, result, stat).get();
+ return dispatch(
+ process,
+ &ZooKeeperProcess::get,
+ path,
+ watch,
+ result,
+ stat).get();
}
-int ZooKeeper::getChildren(const string& path, bool watch,
- vector<string>* results)
+int ZooKeeper::getChildren(
+ const string& path,
+ bool watch,
+ vector<string>* results)
{
- return impl->getChildren(path, watch, results).get();
+ return dispatch(
+ process,
+ &ZooKeeperProcess::getChildren,
+ path,
+ watch,
+ results).get();
}
int ZooKeeper::set(const string& path, const string& data, int version)
{
- return impl->set(path, data, version).get();
+ return dispatch(
+ process,
+ &ZooKeeperProcess::set,
+ path,
+ data,
+ version).get();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/e4336578/src/zookeeper/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/zookeeper.hpp b/src/zookeeper/zookeeper.hpp
index 242e2de..c3c182b 100644
--- a/src/zookeeper/zookeeper.hpp
+++ b/src/zookeeper/zookeeper.hpp
@@ -36,7 +36,7 @@
/* Forward declarations of classes we are using. */
class ZooKeeper;
-class ZooKeeperImpl;
+class ZooKeeperProcess;
/**
* This interface specifies the public interface an event handler
@@ -53,10 +53,10 @@ class Watcher
{
public:
virtual void process(
- ZooKeeper *zk,
+ ZooKeeper* zk,
int type,
int state,
- const std::string &path) = 0;
+ const std::string& path) = 0;
virtual ~Watcher() {}
};
@@ -121,9 +121,9 @@ public:
* callbacks. When notifications are triggered the Watcher::process
* method will be invoked.
*/
- ZooKeeper(const std::string &servers,
+ ZooKeeper(const std::string& servers,
const Duration& timeout,
- Watcher *watcher);
+ Watcher* watcher);
~ZooKeeper();
@@ -189,11 +189,11 @@ public:
* ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
*/
int create(
- const std::string &path,
- const std::string &data,
- const ACL_vector &acl,
+ const std::string& path,
+ const std::string& data,
+ const ACL_vector& acl,
int flags,
- std::string *result,
+ std::string* result,
bool recursive = false);
/**
@@ -215,7 +215,7 @@ public:
* ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
* ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
*/
- int remove(const std::string &path, int version);
+ int remove(const std::string& path, int version);
/**
* \brief checks the existence of a node in zookeeper synchronously.
@@ -235,7 +235,7 @@ public:
* ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
* ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
*/
- int exists(const std::string &path, bool watch, Stat *stat);
+ int exists(const std::string& path, bool watch, Stat* stat);
/**
* \brief gets the data associated with a node synchronously.
@@ -256,10 +256,10 @@ public:
* ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
*/
int get(
- const std::string &path,
+ const std::string& path,
bool watch,
- std::string *result,
- Stat *stat);
+ std::string* result,
+ Stat* stat);
/**
* \brief lists the children of a node synchronously.
@@ -278,9 +278,9 @@ public:
* ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
*/
int getChildren(
- const std::string &path,
+ const std::string& path,
bool watch,
- std::vector<std::string> *results);
+ std::vector<std::string>* results);
/**
* \brief sets the data associated with a node.
@@ -300,7 +300,7 @@ public:
* ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
* ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
*/
- int set(const std::string &path, const std::string &data, int version);
+ int set(const std::string& path, const std::string& data, int version);
/**
* \brief return a message describing the return code.
@@ -321,12 +321,12 @@ public:
protected:
/* Underlying implementation (pimpl idiom). */
- ZooKeeperImpl *impl;
+ ZooKeeperProcess* process;
private:
/* ZooKeeper instances are not copyable. */
- ZooKeeper(const ZooKeeper &that);
- ZooKeeper & operator = (const ZooKeeper &that);
+ ZooKeeper(const ZooKeeper& that);
+ ZooKeeper& operator = (const ZooKeeper& that);
};