You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:31:45 UTC
svn commit: r1131627 - in /incubator/mesos/trunk/src: zookeeper.cpp
zookeeper.hpp
Author: benh
Date: Sun Jun 5 03:31:45 2011
New Revision: 1131627
URL: http://svn.apache.org/viewvc?rev=1131627&view=rev
Log:
Oops, forgot these files.
Added:
incubator/mesos/trunk/src/zookeeper.cpp
incubator/mesos/trunk/src/zookeeper.hpp
Added: incubator/mesos/trunk/src/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.cpp?rev=1131627&view=auto
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.cpp (added)
+++ incubator/mesos/trunk/src/zookeeper.cpp Sun Jun 5 03:31:45 2011
@@ -0,0 +1,698 @@
+#include <assert.h>
+
+#include <iostream>
+#include <map>
+
+#include <process.hpp>
+
+#include "fatal.hpp"
+#include "hash_pid.hpp"
+#include "zookeeper.hpp"
+
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::map;
+using std::string;
+
+
+/* Forward (and first) declaration of ZooKeeperProcess. */
+class ZooKeeperProcess;
+
+
+enum {
+ /* Generic messages: */
+ TERMINATE = PROCESS_MSGID, // Terminate process.
+
+ /* WatcherProcessManager messages: */
+ OK, // Generic success response.
+ ERROR, // Generic failure response.
+ REGISTER, // Register WatcherProcess.
+ UNREGISTER, // Unregister WatcherProcess.
+ LOOKUP_PROCESS, // Lookup WatcherProcess associated with Watcher.
+ LOOKUP_PID, // Lookup WatcherProcess PID associated with Watcher.
+
+ /* WatcherProcess messages: */
+ EVENT, // Invoke Watcher::process callback.
+
+ /* ZooKeeperProcess messages: */
+ COMPLETED, // After an asynchronous "call" has completed.
+ CREATE, // Perform an asynchronous create.
+ REMOVE, // Perform an asynchronous remove (delete).
+ EXISTS, // Perform an asysnchronous exists.
+ GET, // Perform an asynchronous get.
+};
+
+
+/* Event "message" for invoking Watcher. */
+struct Event
+{
+ ZooKeeper *zk;
+ int type;
+ int state;
+ string path;
+};
+
+
+/* Create "message" for performing ZooKeeper::create. */
+struct CreateCall
+{
+ int ret;
+ const string *path;
+ const string *data;
+ const ACL_vector *acl;
+ int flags;
+ string *result;
+ PID from;
+ ZooKeeperProcess *zooKeeperProcess;
+};
+
+
+/* Remove "message" for performing ZooKeeper::remove. */
+struct RemoveCall
+{
+ int ret;
+ const string *path;
+ int version;
+ PID from;
+ ZooKeeperProcess *zooKeeperProcess;
+};
+
+
+/* Exists "message" for performing ZooKeeper::exists. */
+struct ExistsCall
+{
+ int ret;
+ const string *path;
+ bool watch;
+ Stat *stat;
+ PID from;
+ ZooKeeperProcess *zooKeeperProcess;
+};
+
+
+/* Get "message" for performing ZooKeeper::get. */
+struct GetCall
+{
+ int ret;
+ const string *path;
+ bool watch;
+ string *result;
+ Stat *stat;
+ PID from;
+ ZooKeeperProcess *zooKeeperProcess;
+};
+
+
+/* PID of singleton instance of WatcherProcessManager. */
+PID manager;
+
+
+/**
+ * In order to make callbacks on Watcher, we create a proxy
+ * WatcherProcess. The ZooKeeperProcess (defined below) stores the PID
+ * of the WatcherProcess associated with a watcher and sends it an
+ * event "message" which it uses to invoke Watcher::process. Care
+ * needed to be taken to assure that a WatcherProcess was only valid
+ * as long as a Watcher was valid. This was done by ensuring that the
+ * WatcherProcess object created gets cleaned up in ~Watcher(). We
+ * wanted to keep the Watcher interface clean and simple, so rather
+ * than add a member in Watcher that points to a WatcherProcess
+ * instance (or points to a WatcherImpl), we choose to create a
+ * WatcherProcessManager that stores the Watcher and WatcherProcess
+ * associations. The WatcherProcessManager is akin to having a shared
+ * dictionary or hashtable and using locks to access it rathe then
+ * sending and receiving messages. Their is probably a performance hit
+ * here, but it would be interesting to see how bad the perforamnce is
+ * across a range of low and high-contention states.
+ */
+class WatcherProcess : public Process
+{
+ friend class WatcherProcessManager;
+
+private:
+ Watcher *watcher;
+
+protected:
+ void operator () ()
+ {
+ WatcherProcess *process = this;
+ if (call(manager, REGISTER,
+ reinterpret_cast<char *>(&process), sizeof(process)) != OK)
+ fatal("failed to setup underlying watcher mechanism");
+ while (true) {
+ switch (receive()) {
+ case EVENT: {
+ Event *event =
+ *reinterpret_cast<Event **>(const_cast<char *>(body(NULL)));
+ watcher->process(event->zk, event->type, event->state, event->path);
+ delete event;
+ break;
+ }
+ case TERMINATE:
+ if (call(manager, UNREGISTER,
+ reinterpret_cast<char *>(&process), sizeof(process)) != OK)
+ fatal("failed to cleanup underlying watcher mechanism");
+ return;
+ }
+ }
+ }
+
+public:
+ WatcherProcess(Watcher *_watcher) : watcher(_watcher) {}
+};
+
+
+class WatcherProcessManager : public Process
+{
+private:
+ map<Watcher *, WatcherProcess *> watchers;
+
+protected:
+ void operator () ()
+ {
+ while (true) {
+ switch (receive()) {
+ case REGISTER: {
+ WatcherProcess *process =
+ *reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
+ Watcher *watcher = process->watcher;
+ assert(watchers.find(watcher) == watchers.end());
+ watchers[watcher] = process;
+ send(from(), OK);
+ break;
+ }
+ case UNREGISTER: {
+ WatcherProcess *process =
+ *reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
+ Watcher *watcher = process->watcher;
+ assert(watchers.find(watcher) != watchers.end());
+ watchers.erase(watcher);
+ send(from(), OK);
+ break;
+ }
+ case LOOKUP_PROCESS: {
+ Watcher *watcher =
+ *reinterpret_cast<Watcher **>(const_cast<char *>(body(NULL)));
+ if (watchers.find(watcher) != watchers.end()) {
+ WatcherProcess *process = watchers[watcher];
+ send(from(), OK, reinterpret_cast<char *>(&process), sizeof(process));
+ } else {
+ send(from(), ERROR);
+ }
+ break;
+ }
+ case LOOKUP_PID: {
+ Watcher *watcher =
+ *reinterpret_cast<Watcher **>(const_cast<char *>(body(NULL)));
+ if (watchers.find(watcher) != watchers.end()) {
+ WatcherProcess *process = watchers[watcher];
+ PID pid = process->getPID();
+ send(from(), OK, reinterpret_cast<char *>(&pid), sizeof(pid));
+ } else {
+ send(from(), ERROR);
+ }
+ break;
+ }
+ }
+ }
+ }
+};
+
+
+static void __attribute__((constructor)) init()
+{
+ manager = Process::spawn(new WatcherProcessManager());
+}
+
+
+Watcher::Watcher()
+{
+ Process::spawn(new WatcherProcess(this));
+}
+
+
+Watcher::~Watcher()
+{
+ class WatcherProcessWaiter : public Process
+ {
+ private:
+ Watcher *watcher;
+
+ protected:
+ void operator () ()
+ {
+ if (call(manager, LOOKUP_PROCESS,
+ reinterpret_cast<char *>(&watcher), sizeof(watcher)) != OK)
+ fatal("failed to deallocate resources associated with Watcher");
+ WatcherProcess *process =
+ *reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
+ send(process->getPID(), TERMINATE);
+ wait(process->getPID());
+ delete process;
+ }
+
+ public:
+ WatcherProcessWaiter(Watcher *_watcher) : watcher(_watcher) {}
+ } watcherProcessWaiter(this);
+
+ Process::wait(Process::spawn(&watcherProcessWaiter));
+}
+
+
+class ZooKeeperImpl : public Process {};
+
+
+class ZooKeeperProcess : public ZooKeeperImpl
+{
+ friend class ZooKeeper;
+
+private:
+ ZooKeeper *zk; // ZooKeeper instance.
+ string hosts; // ZooKeeper host:port pairs.
+ int timeout; // ZooKeeper session timeout.
+ zhandle_t *zh; // ZooKeeper connection handle.
+
+ Watcher *watcher; // Associated Watcher instance.
+ PID watcherProcess; // PID of WatcherProcess that invokes Watcher.
+
+ static void watch(zhandle_t *zh, int type, int state,
+ const char *path, void *context)
+ {
+ ZooKeeperProcess *zooKeeperProcess =
+ static_cast<ZooKeeperProcess*>(context);
+ Event *event = new Event();
+ event->zk = zooKeeperProcess->zk;
+ event->type = type;
+ event->state = state;
+ event->path = path;
+ zooKeeperProcess->send(zooKeeperProcess->watcherProcess,
+ EVENT,
+ reinterpret_cast<char *>(&event),
+ sizeof(Event *));
+ }
+
+ static void createCompletion(int ret, const char *value, const void *data)
+ {
+ CreateCall *createCall =
+ static_cast<CreateCall *>(const_cast<void *>((data)));
+ createCall->ret = ret;
+ if (createCall->result != NULL && value != NULL)
+ createCall->result->assign(value);
+ createCall->zooKeeperProcess->send(createCall->from, COMPLETED);
+ }
+
+ static void removeCompletion(int ret, const void *data)
+ {
+ RemoveCall *removeCall =
+ static_cast<RemoveCall *>(const_cast<void *>((data)));
+ removeCall->ret = ret;
+ removeCall->zooKeeperProcess->send(removeCall->from, COMPLETED);
+ }
+
+ static void existsCompletion(int ret, const Stat *stat, const void *data)
+ {
+ ExistsCall *existsCall =
+ static_cast<ExistsCall *>(const_cast<void *>((data)));
+ existsCall->ret = ret;
+ if (existsCall->stat != NULL && stat != NULL)
+ *(existsCall->stat) = *(stat);
+ existsCall->zooKeeperProcess->send(existsCall->from, COMPLETED);
+ }
+
+ static void getCompletion(int ret, const char *value, int value_len,
+ const Stat *stat, const void *data)
+ {
+ GetCall *getCall =
+ static_cast<GetCall *>(const_cast<void *>((data)));
+ getCall->ret = ret;
+ if (getCall->result != NULL && value != NULL)
+ getCall->result->assign(value, value_len);
+ if (getCall->stat != NULL && stat != NULL)
+ *(getCall->stat) = *(stat);
+ getCall->zooKeeperProcess->send(getCall->from, COMPLETED);
+ }
+
+ void prepare(int *fd, int *ops, timeval *tv)
+ {
+ int interest = 0;
+
+ int ret = zookeeper_interest(zh, fd, &interest, tv);
+
+ if (ret != ZOK)
+ fatal("zookeeper_interest failed! (%s)", zerror(ret));
+
+ *ops = 0;
+
+ if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
+ *ops |= RDWR;
+ } else if (interest & ZOOKEEPER_READ) {
+ *ops |= RDONLY;
+ } else if (interest & ZOOKEEPER_WRITE) {
+ *ops |= WRONLY;
+ }
+ }
+
+ void process(int fd, int ops)
+ {
+ int events = 0;
+
+ if (ready(fd, RDONLY)) {
+ events |= ZOOKEEPER_READ;
+ } if (ready(fd, WRONLY)) {
+ events |= ZOOKEEPER_WRITE;
+ }
+
+ int ret = zookeeper_process(zh, events);
+
+ if (ret != ZOK && ret != ZNOTHING)
+ fatal("zookeeper_process failed! (%s)", zerror(ret));
+ }
+
+protected:
+ void operator () ()
+ {
+ // Lookup and cache the WatcherProcess PID associated with our
+ // Watcher _before_ we yield control via calling zookeeper_process
+ // so that Watcher callbacks can occur.
+ if (call(manager, LOOKUP_PID,
+ reinterpret_cast<char *>(&watcher), sizeof(watcher)) != OK)
+ fatal("failed to setup underlying ZooKeeper mechanisms");
+
+ // TODO(benh): Link with WatcherProcess?
+ watcherProcess =
+ *reinterpret_cast<PID *>(const_cast<char *>(body(NULL)));
+
+ while (true) {
+ int fd;
+ int ops;
+ timeval tv;
+
+ prepare(&fd, &ops, &tv);
+
+ // TODO(benh): If tv is 0, invoke await and ignore queued messages?
+
+ if (!await(fd, ops, tv, false)) {
+ // TODO(benh): Don't handle incoming "calls" until we are connected!
+ switch (receive()) {
+ case CREATE: {
+ CreateCall *createCall =
+ *reinterpret_cast<CreateCall **>(const_cast<char *>(body(NULL)));
+ createCall->from = from();
+ createCall->zooKeeperProcess = this;
+ int ret = zoo_acreate(zh, createCall->path->c_str(),
+ createCall->data->data(),
+ createCall->data->size(),
+ createCall->acl, createCall->flags,
+ createCompletion, createCall);
+ if (ret != ZOK) {
+ createCall->ret = ret;
+ send(createCall->from, COMPLETED);
+ }
+ break;
+ }
+ case REMOVE: {
+ RemoveCall *removeCall =
+ *reinterpret_cast<RemoveCall **>(const_cast<char *>(body(NULL)));
+ removeCall->from = from();
+ removeCall->zooKeeperProcess = this;
+ int ret = zoo_adelete(zh, removeCall->path->c_str(),
+ removeCall->version,
+ removeCompletion, removeCall);
+ if (ret != ZOK) {
+ removeCall->ret = ret;
+ send(removeCall->from, COMPLETED);
+ }
+ break;
+ }
+ case EXISTS: {
+ ExistsCall *existsCall =
+ *reinterpret_cast<ExistsCall **>(const_cast<char *>(body(NULL)));
+ existsCall->from = from();
+ existsCall->zooKeeperProcess = this;
+ int ret = zoo_aexists(zh, existsCall->path->c_str(),
+ existsCall->watch,
+ existsCompletion, existsCall);
+ if (ret != ZOK) {
+ existsCall->ret = ret;
+ send(existsCall->from, COMPLETED);
+ }
+ break;
+ }
+ case GET: {
+ GetCall *getCall =
+ *reinterpret_cast<GetCall **>(const_cast<char *>(body(NULL)));
+ getCall->from = from();
+ getCall->zooKeeperProcess = this;
+ int ret = zoo_aget(zh, getCall->path->c_str(), getCall->watch,
+ getCompletion, getCall);
+ if (ret != ZOK) {
+ getCall->ret = ret;
+ send(getCall->from, COMPLETED);
+ }
+ break;
+ }
+ case TERMINATE: {
+ return;
+ }
+ default: {
+ fatal("unexpected interruption during await");
+ }
+ }
+ } else {
+ process(fd, ops);
+ }
+ }
+ }
+
+public:
+ ZooKeeperProcess(ZooKeeper *_zk,
+ const string &_hosts,
+ int _timeout,
+ Watcher *_watcher)
+ : zk(_zk), hosts(_hosts), timeout(_timeout), watcher(_watcher)
+ {
+ zh = zookeeper_init(hosts.c_str(), watch, timeout, NULL, this, 0);
+ if (zh == NULL)
+ fatalerror("failed to create ZooKeeper (zookeeper_init)");
+ }
+
+ ~ZooKeeperProcess()
+ {
+ int ret = zookeeper_close(zh);
+ if (ret != ZOK)
+ fatal("failed to destroy ZooKeeper (zookeeper_close): %s", zerror(ret));
+ }
+};
+
+
+
+ZooKeeper::ZooKeeper(const string &hosts, int timeout, Watcher *watcher)
+{
+ impl = new ZooKeeperProcess(this, hosts, timeout, watcher);
+ Process::spawn(impl);
+}
+
+
+ZooKeeper::~ZooKeeper()
+{
+ Process::post(impl->getPID(), TERMINATE);
+ Process::wait(impl->getPID());
+ delete impl;
+}
+
+
+int ZooKeeper::getState()
+{
+ ZooKeeperProcess *zooKeeperProcess = static_cast<ZooKeeperProcess *>(impl);
+ return zoo_state(zooKeeperProcess->zh);
+}
+
+
+int ZooKeeper::create(const string &path,
+ const string &data,
+ const ACL_vector &acl,
+ int flags,
+ string *result)
+{
+ CreateCall createCall;
+ createCall.path = &path;
+ createCall.data = &data;
+ createCall.acl = &acl;
+ createCall.flags = flags;
+ createCall.result = result;
+
+ class CreateCallProcess : public Process
+ {
+ private:
+ ZooKeeperProcess *zooKeeperProcess;
+ CreateCall *createCall;
+
+ protected:
+ void operator () ()
+ {
+ if (call(zooKeeperProcess->getPID(),
+ CREATE,
+ reinterpret_cast<char *>(&createCall),
+ sizeof(CreateCall *)) != COMPLETED)
+ createCall->ret = ZSYSTEMERROR;
+ }
+
+ public:
+ CreateCallProcess(ZooKeeperProcess *_zooKeeperProcess,
+ CreateCall *_createCall)
+ : zooKeeperProcess(_zooKeeperProcess),
+ createCall(_createCall)
+ {}
+ } createCallProcess(static_cast<ZooKeeperProcess *>(impl),
+ &createCall);
+
+ Process::wait(Process::spawn(&createCallProcess));
+
+ return createCall.ret;
+}
+
+
+
+int ZooKeeper::remove(const string &path, int version)
+{
+ RemoveCall removeCall;
+ removeCall.path = &path;
+ removeCall.version = version;
+
+ class RemoveCallProcess : public Process
+ {
+ private:
+ ZooKeeperProcess *zooKeeperProcess;
+ RemoveCall *removeCall;
+
+ protected:
+ void operator () ()
+ {
+ if (call(zooKeeperProcess->getPID(),
+ REMOVE,
+ reinterpret_cast<char *>(&removeCall),
+ sizeof(RemoveCall *)) != COMPLETED)
+ removeCall->ret = ZSYSTEMERROR;
+ }
+
+ public:
+ RemoveCallProcess(ZooKeeperProcess *_zooKeeperProcess,
+ RemoveCall *_removeCall)
+ : zooKeeperProcess(_zooKeeperProcess),
+ removeCall(_removeCall)
+ {}
+ } removeCallProcess(static_cast<ZooKeeperProcess *>(impl),
+ &removeCall);
+
+ Process::wait(Process::spawn(&removeCallProcess));
+
+ return removeCall.ret;
+}
+
+
+int ZooKeeper::exists(const string &path,
+ bool watch,
+ Stat *stat)
+{
+ ExistsCall existsCall;
+ existsCall.path = &path;
+ existsCall.watch = watch;
+ existsCall.stat = stat;
+
+ class ExistsCallProcess : public Process
+ {
+ private:
+ ZooKeeperProcess *zooKeeperProcess;
+ ExistsCall *existsCall;
+
+ protected:
+ void operator () ()
+ {
+ if (call(zooKeeperProcess->getPID(),
+ EXISTS,
+ reinterpret_cast<char *>(&existsCall),
+ sizeof(ExistsCall *)) != COMPLETED)
+ existsCall->ret = ZSYSTEMERROR;
+ }
+
+ public:
+ ExistsCallProcess(ZooKeeperProcess *_zooKeeperProcess,
+ ExistsCall *_existsCall)
+ : zooKeeperProcess(_zooKeeperProcess),
+ existsCall(_existsCall)
+ {}
+ } existsCallProcess(static_cast<ZooKeeperProcess *>(impl),
+ &existsCall);
+
+ Process::wait(Process::spawn(&existsCallProcess));
+
+ return existsCall.ret;
+}
+
+
+int ZooKeeper::get(const string &path,
+ bool watch,
+ string *result,
+ Stat *stat)
+{
+ GetCall getCall;
+ getCall.path = &path;
+ getCall.watch = watch;
+ getCall.result = result;
+ getCall.stat = stat;
+
+ class GetCallProcess : public Process
+ {
+ private:
+ ZooKeeperProcess *zooKeeperProcess;
+ GetCall *getCall;
+
+ protected:
+ void operator () ()
+ {
+ if (call(zooKeeperProcess->getPID(),
+ GET,
+ reinterpret_cast<char *>(&getCall),
+ sizeof(GetCall *)) != COMPLETED)
+ getCall->ret = ZSYSTEMERROR;
+ }
+
+ public:
+ GetCallProcess(ZooKeeperProcess *_zooKeeperProcess,
+ GetCall *_getCall)
+ : zooKeeperProcess(_zooKeeperProcess),
+ getCall(_getCall)
+ {}
+ } getCallProcess(static_cast<ZooKeeperProcess *>(impl),
+ &getCall);
+
+ Process::wait(Process::spawn(&getCallProcess));
+
+ return getCall.ret;
+}
+
+
+const char * ZooKeeper::error(int ret) const
+{
+ return zerror(ret);
+}
+
+
+// class TestWatcher : public Watcher
+// {
+// public:
+// void process(ZooKeeper *zk, int type, int state, const string &path)
+// {
+// cout << "TestWatcher::process" << endl;
+// }
+// };
+
+
+// int main(int argc, char** argv)
+// {
+// TestWatcher watcher;
+// ZooKeeper zk(argv[1], 10000, &watcher);
+// sleep(10);
+// return 0;
+// }
Added: incubator/mesos/trunk/src/zookeeper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.hpp?rev=1131627&view=auto
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.hpp (added)
+++ incubator/mesos/trunk/src/zookeeper.hpp Sun Jun 5 03:31:45 2011
@@ -0,0 +1,241 @@
+/**
+ * ZooKeeper C++ API. Originally created to be used with the
+ * Libprocess library (http://www.eecs.berkeley.edu/~benh/libprocess),
+ * these C++ classes could also use a different underlying
+ * implementation, such as calling the synchronous functions exposed
+ * by the ZooKeeper C API.
+ *
+ * To provide for varying underlying implementations the pimpl idiom
+ * (also known as the compiler-firewall, bridge pattern, etc) was used
+ * for the ZooKeeper class. While the Watcher class may need some to
+ * support varying underlying implementation details, we choose for
+ * now to keep it a cleaner interface.
+ *
+ * Author: Benjamin Hindman <be...@berkeley.edu>
+*/
+#ifndef ZOOKEEPER_HPP
+#define ZOOKEEPER_HPP
+
+#include <zookeeper.h>
+
+#include <string>
+
+
+/* Forward declarations of classes we are using. */
+class ZooKeeper;
+class ZooKeeperImpl;
+
+
+/**
+ * This interface specifies the public interface an event handler
+ * class must implement. A ZooKeeper client will get various events
+ * from the ZooKeepr server it connects to. An application using such
+ * a client handles these events by registering a callback object with
+ * the client. The callback object is expected to be an instance of a
+ * class that implements Watcher interface.
+ */
+class Watcher
+{
+public:
+ virtual void process(ZooKeeper *zk,
+ int type,
+ int state,
+ const std::string &path) = 0;
+
+ Watcher();
+ ~Watcher();
+ Watcher(const Watcher &that);
+ Watcher & operator = (const Watcher &that);
+};
+
+
+/**
+ * TODO(benh): Clean up this documentation.
+ *
+ * This is the main class of ZooKeeper client library. To use a
+ * ZooKeeper service, an application must first instantiate an object
+ * of ZooKeeper class. All the iterations will be done by calling the
+ * methods of ZooKeeper class.
+ *
+ * Once a connection to a server is established, a session ID is
+ * assigned to the client. The client will send heart beats to the
+ * server periodically to keep the session valid.
+ *
+ * The application can call ZooKeeper APIs through a client as long as
+ * the session ID of the client remains valid.
+ *
+ * If for some reason, the client fails to send heart beats to the
+ * server for a prolonged period of time (exceeding the sessionTimeout
+ * value, for instance), the server will expire the session, and the
+ * session ID will become invalid. The client object will no longer be
+ * usable. To make ZooKeeper API calls, the application must create a
+ * new client object.
+ *
+ * If the ZooKeeper server the client currently connects to fails or
+ * otherwise does not respond, the client will automatically try to
+ * connect to another server before its session ID expires. If
+ * successful, the application can continue to use the client.
+ *
+ * Some successful ZooKeeper API calls can leave watches on the "data
+ * nodes" in the ZooKeeper server. Other successful ZooKeeper API
+ * calls can trigger those watches. Once a watch is triggered, an
+ * event will be delivered to the client which left the watch at the
+ * first place. Each watch can be triggered only once. Thus, up to one
+ * event will be delivered to a client for every watch it leaves.
+ *
+ * A client needs an object of a class implementing Watcher interface
+ * for processing the events delivered to the client. When a client
+ * drops current connection and re-connects to a server, all the
+ * existing watches are considered as being triggered but the
+ * undelivered events are lost. To emulate this, the client will
+ * generate a special event to tell the event handler a connection has
+ * been dropped. This special event has type EventNone and state
+ * sKeeperStateDisconnected.
+ */
+class ZooKeeper
+{
+private:
+ /* ZooKeeper instances are not copyable. */
+ ZooKeeper(const ZooKeeper &that);
+ ZooKeeper & operator = (const ZooKeeper &that);
+
+protected:
+ /* Underlying implementation (pimpl idiom). */
+ ZooKeeperImpl *impl;
+
+public:
+ /**
+ * \brief instantiate new ZooKeeper client.
+ *
+ * The constructor initiates a new session, however session
+ * establishment is asynchronous, meaning that the session should
+ * not be considered established until (and unless) an event of
+ * state ZOO_CONNECTED_STATE is received.
+ * \param hosts comma separated host:port pairs, each corresponding
+ * to a ZooKeeper server. e.g. "127.0.0.1:3000,127.0.0.1:3001"
+ * \param watcher the instance of Watcher that receives event
+ * callbacks. When notifications are triggered the Watcher::process
+ * method will be invoked.
+ */
+ ZooKeeper(const std::string &hosts, int timeout, Watcher *watcher);
+
+ ~ZooKeeper();
+
+ /**
+ * \brief get the state of the zookeeper connection.
+ *
+ * The return value will be one of the \ref State Consts.
+ */
+ int getState();
+
+ /**
+ * \brief create a node synchronously.
+ *
+ * This method will create a node in ZooKeeper. A node can only be
+ * created if it does not already exists. The Create Flags affect
+ * the creation of nodes. If ZOO_EPHEMERAL flag is set, the node
+ * will automatically get removed if the client session goes
+ * away. If the ZOO_SEQUENCE flag is set, a unique monotonically
+ * increasing sequence number is appended to the path name.
+ *
+ * \param path The name of the node. Expressed as a file name with
+ * slashes separating ancestors of the node.
+ * \param data The data to be stored in the node.
+ * \param acl The initial ACL of the node. If null, the ACL of the
+ * parent will be used.
+ * \param flags this parameter can be set to 0 for normal create or
+ * an OR of the Create Flags
+ * \param result A string which will be filled with the path of
+ * the new node (this might be different than the supplied path
+ * because of the ZOO_SEQUENCE flag). The path string will always
+ * be null-terminated.
+ * \return one of the following codes are returned:
+ * ZOK operation completed succesfully
+ * ZNONODE the parent node does not exist.
+ * ZNODEEXISTS the node already exists
+ * ZNOAUTH the client does not have permission.
+ * ZNOCHILDRENFOREPHEMERALS cannot create children of ephemeral nodes.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ int create(const std::string &path,
+ const std::string &data,
+ const ACL_vector &acl,
+ int flags,
+ std::string *result);
+
+ /**
+ * \brief delete a node in zookeeper synchronously.
+ *
+ * \param path the name of the node. Expressed as a file name with
+ * slashes separating ancestors of the node.
+ * \param version the expected version of the node. The function
+ * will fail if the actual version of the node does not match the
+ * expected version. If -1 is used the version check will not take
+ * place.
+ * \return one of the following values is returned.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADVERSION expected version does not match actual version.
+ * ZNOTEMPTY children are present; node cannot be deleted.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ int remove(const std::string &path, int version);
+
+ /**
+ * \brief checks the existence of a node in zookeeper synchronously.
+ *
+ * \param path the name of the node. Expressed as a file name with
+ * slashes separating ancestors of the node.
+ * \param watch if true, a watch will be set at the server to
+ * notify the client if the node changes. The watch will be set even
+ * if the node does not exist. This allows clients to watch for
+ * nodes to appear.
+ * \param stat the return stat value of the node.
+ * \return return code of the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ int exists(const std::string &path, bool watch, Stat *stat);
+
+ /**
+ * \brief gets the data associated with a node synchronously.
+ *
+ * \param path the name of the node. Expressed as a file name with
+ * slashes separating ancestors of the node.
+ * \param watch if nonzero, a watch will be set at the server to
+ * notify the client if the node changes.
+ * \param result the data returned by the server
+ * \param stat if not NULL, will hold the value of stat for the path
+ * on return.
+ * \return return value of the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ int get(const std::string &path,
+ bool watch,
+ std::string *result,
+ Stat *stat);
+
+ /**
+ * \brief return a string describing the last error.
+ *
+ * \return string corresponding to the last error
+ */
+ const char * error(int ret) const;
+};
+
+
+#endif /* ZOOKEEPER_HPP */