You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:31:45 UTC

svn commit: r1131627 - in /incubator/mesos/trunk/src: zookeeper.cpp zookeeper.hpp

Author: benh
Date: Sun Jun  5 03:31:45 2011
New Revision: 1131627

URL: http://svn.apache.org/viewvc?rev=1131627&view=rev
Log:
Oops, forgot these files.

Added:
    incubator/mesos/trunk/src/zookeeper.cpp
    incubator/mesos/trunk/src/zookeeper.hpp

Added: incubator/mesos/trunk/src/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.cpp?rev=1131627&view=auto
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.cpp (added)
+++ incubator/mesos/trunk/src/zookeeper.cpp Sun Jun  5 03:31:45 2011
@@ -0,0 +1,698 @@
+#include <assert.h>
+
+#include <iostream>
+#include <map>
+
+#include <process.hpp>
+
+#include "fatal.hpp"
+#include "hash_pid.hpp"
+#include "zookeeper.hpp"
+
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::map;
+using std::string;
+
+
+/* Forward (and first) declaration of ZooKeeperProcess. */
+class ZooKeeperProcess;
+
+
+enum {
+  /* Generic messages: */
+  TERMINATE = PROCESS_MSGID, // Terminate process.
+
+  /* WatcherProcessManager messages: */
+  OK, // Generic success response.
+  ERROR, // Generic failure response.
+  REGISTER, // Register WatcherProcess.
+  UNREGISTER, // Unregister WatcherProcess.
+  LOOKUP_PROCESS, // Lookup WatcherProcess associated with Watcher.
+  LOOKUP_PID, // Lookup WatcherProcess PID associated with Watcher.
+
+  /* WatcherProcess messages: */
+  EVENT, // Invoke Watcher::process callback.
+
+  /* ZooKeeperProcess messages: */
+  COMPLETED, // After an asynchronous "call" has completed.
+  CREATE, // Perform an asynchronous create.
+  REMOVE, // Perform an asynchronous remove (delete).
+  EXISTS, // Perform an asysnchronous exists.
+  GET, // Perform an asynchronous get.
+};
+
+
+/* Event "message" for invoking Watcher. */
+struct Event
+{
+  ZooKeeper *zk;
+  int type;
+  int state;
+  string path;
+};
+
+
+/* Create "message" for performing ZooKeeper::create. */
+struct CreateCall
+{
+  int ret;
+  const string *path;
+  const string *data;
+  const ACL_vector *acl;
+  int flags;
+  string *result;
+  PID from;
+  ZooKeeperProcess *zooKeeperProcess;
+};
+
+
+/* Remove "message" for performing ZooKeeper::remove. */
+struct RemoveCall
+{
+  int ret;
+  const string *path;
+  int version;
+  PID from;
+  ZooKeeperProcess *zooKeeperProcess;
+};
+
+
+/* Exists "message" for performing ZooKeeper::exists. */
+struct ExistsCall
+{
+  int ret;
+  const string *path;
+  bool watch;
+  Stat *stat;
+  PID from;
+  ZooKeeperProcess *zooKeeperProcess;
+};
+
+
+/* Get "message" for performing ZooKeeper::get. */
+struct GetCall
+{
+  int ret;
+  const string *path;
+  bool watch;
+  string *result;
+  Stat *stat;
+  PID from;
+  ZooKeeperProcess *zooKeeperProcess;
+};
+
+
+/* PID of singleton instance of WatcherProcessManager. */
+PID manager;
+
+
+/**
+ * In order to make callbacks on Watcher, we create a proxy
+ * WatcherProcess. The ZooKeeperProcess (defined below) stores the PID
+ * of the WatcherProcess associated with a watcher and sends it an
+ * event "message" which it uses to invoke Watcher::process. Care
+ * needed to be taken to assure that a WatcherProcess was only valid
+ * as long as a Watcher was valid. This was done by ensuring that the
+ * WatcherProcess object created gets cleaned up in ~Watcher(). We
+ * wanted to keep the Watcher interface clean and simple, so rather
+ * than add a member in Watcher that points to a WatcherProcess
+ * instance (or points to a WatcherImpl), we choose to create a
+ * WatcherProcessManager that stores the Watcher and WatcherProcess
+ * associations. The WatcherProcessManager is akin to having a shared
+ * dictionary or hashtable and using locks to access it rathe then
+ * sending and receiving messages. Their is probably a performance hit
+ * here, but it would be interesting to see how bad the perforamnce is
+ * across a range of low and high-contention states.
+ */
+class WatcherProcess : public Process
+{
+  friend class WatcherProcessManager;
+
+private:
+  Watcher *watcher;
+
+protected:
+  void operator () ()
+  {
+    WatcherProcess *process = this;
+    if (call(manager, REGISTER,
+	     reinterpret_cast<char *>(&process), sizeof(process)) != OK)
+      fatal("failed to setup underlying watcher mechanism");
+    while (true) {
+      switch (receive()) {
+      case EVENT: {
+	Event *event =
+	  *reinterpret_cast<Event **>(const_cast<char *>(body(NULL)));
+	watcher->process(event->zk, event->type, event->state, event->path);
+	delete event;
+	break;
+      }
+      case TERMINATE:
+	if (call(manager, UNREGISTER,
+		 reinterpret_cast<char *>(&process), sizeof(process)) != OK)
+	  fatal("failed to cleanup underlying watcher mechanism");
+	return;
+      }
+    }
+  }
+
+public:
+  WatcherProcess(Watcher *_watcher) : watcher(_watcher) {}
+};
+
+
+class WatcherProcessManager : public Process
+{
+private:
+  map<Watcher *, WatcherProcess *> watchers;
+
+protected:
+  void operator () ()
+  {
+    while (true) {
+      switch (receive()) {
+        case REGISTER: {
+	  WatcherProcess *process =
+	    *reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
+	  Watcher *watcher = process->watcher;
+	  assert(watchers.find(watcher) == watchers.end());
+	  watchers[watcher] = process;
+	  send(from(), OK);
+	  break;
+	}
+        case UNREGISTER: {
+	  WatcherProcess *process =
+	    *reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
+	  Watcher *watcher = process->watcher;
+	  assert(watchers.find(watcher) != watchers.end());
+	  watchers.erase(watcher);
+	  send(from(), OK);
+	  break;
+	}
+        case LOOKUP_PROCESS: {
+	  Watcher *watcher =
+	    *reinterpret_cast<Watcher **>(const_cast<char *>(body(NULL)));
+	  if (watchers.find(watcher) != watchers.end()) {
+	    WatcherProcess *process = watchers[watcher];
+	    send(from(), OK, reinterpret_cast<char *>(&process), sizeof(process));
+	  } else {
+	    send(from(), ERROR);
+	  }
+	  break;
+	}
+        case LOOKUP_PID: {
+	  Watcher *watcher =
+	    *reinterpret_cast<Watcher **>(const_cast<char *>(body(NULL)));
+	  if (watchers.find(watcher) != watchers.end()) {
+	    WatcherProcess *process = watchers[watcher];
+	    PID pid = process->getPID();
+	    send(from(), OK, reinterpret_cast<char *>(&pid), sizeof(pid));
+	  } else {
+	    send(from(), ERROR);
+	  }
+	  break;
+	}
+      }
+    }
+  }
+};
+
+
+static void __attribute__((constructor)) init()
+{
+  manager = Process::spawn(new WatcherProcessManager());
+}
+
+
+Watcher::Watcher()
+{
+  Process::spawn(new WatcherProcess(this));
+}
+
+
+Watcher::~Watcher()
+{
+  class WatcherProcessWaiter : public Process
+  {
+  private:
+    Watcher *watcher;
+
+  protected:
+    void operator () ()
+    {
+      if (call(manager, LOOKUP_PROCESS,
+	       reinterpret_cast<char *>(&watcher), sizeof(watcher)) != OK)
+	fatal("failed to deallocate resources associated with Watcher");
+      WatcherProcess *process =
+	*reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
+      send(process->getPID(), TERMINATE);
+      wait(process->getPID());
+      delete process;
+    }
+
+  public:
+    WatcherProcessWaiter(Watcher *_watcher) : watcher(_watcher) {}
+  } watcherProcessWaiter(this);
+
+  Process::wait(Process::spawn(&watcherProcessWaiter));
+}
+
+
+class ZooKeeperImpl : public Process {};
+
+
+class ZooKeeperProcess : public ZooKeeperImpl
+{
+  friend class ZooKeeper;
+
+private:
+  ZooKeeper *zk; // ZooKeeper instance.
+  string hosts; // ZooKeeper host:port pairs.
+  int timeout; // ZooKeeper session timeout.
+  zhandle_t *zh; // ZooKeeper connection handle.
+  
+  Watcher *watcher; // Associated Watcher instance. 
+  PID watcherProcess; // PID of WatcherProcess that invokes Watcher.
+
+  static void watch(zhandle_t *zh, int type, int state,
+		    const char *path, void *context)
+  {
+    ZooKeeperProcess *zooKeeperProcess =
+      static_cast<ZooKeeperProcess*>(context);
+    Event *event = new Event();
+    event->zk = zooKeeperProcess->zk;
+    event->type = type;
+    event->state = state;
+    event->path = path;
+    zooKeeperProcess->send(zooKeeperProcess->watcherProcess,
+			   EVENT,
+			   reinterpret_cast<char *>(&event),
+			   sizeof(Event *));
+  }
+  
+  static void createCompletion(int ret, const char *value, const void *data)
+  {
+    CreateCall *createCall =
+      static_cast<CreateCall *>(const_cast<void *>((data)));
+    createCall->ret = ret;
+    if (createCall->result != NULL && value != NULL)
+      createCall->result->assign(value);
+    createCall->zooKeeperProcess->send(createCall->from, COMPLETED);
+  }
+
+  static void removeCompletion(int ret, const void *data)
+  {
+    RemoveCall *removeCall =
+      static_cast<RemoveCall *>(const_cast<void *>((data)));
+    removeCall->ret = ret;
+    removeCall->zooKeeperProcess->send(removeCall->from, COMPLETED);
+  }
+
+  static void existsCompletion(int ret, const Stat *stat, const void *data)
+  {
+    ExistsCall *existsCall =
+      static_cast<ExistsCall *>(const_cast<void *>((data)));
+    existsCall->ret = ret;
+    if (existsCall->stat != NULL && stat != NULL)
+      *(existsCall->stat) = *(stat);      
+    existsCall->zooKeeperProcess->send(existsCall->from, COMPLETED);
+  }
+
+  static void getCompletion(int ret, const char *value, int value_len,
+			    const Stat *stat, const void *data)
+  {
+    GetCall *getCall =
+      static_cast<GetCall *>(const_cast<void *>((data)));
+    getCall->ret = ret;
+    if (getCall->result != NULL && value != NULL)
+      getCall->result->assign(value, value_len);
+    if (getCall->stat != NULL && stat != NULL)
+      *(getCall->stat) = *(stat);
+    getCall->zooKeeperProcess->send(getCall->from, COMPLETED);
+  }
+
+  void prepare(int *fd, int *ops, timeval *tv)
+  {
+    int interest = 0;
+
+    int ret = zookeeper_interest(zh, fd, &interest, tv);
+
+    if (ret != ZOK)
+      fatal("zookeeper_interest failed! (%s)", zerror(ret));
+
+    *ops = 0;
+
+    if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
+      *ops |= RDWR;
+    } else if (interest & ZOOKEEPER_READ) {
+      *ops |= RDONLY;
+    } else if (interest & ZOOKEEPER_WRITE) {
+      *ops |= WRONLY;
+    }
+  }
+
+  void process(int fd, int ops)
+  {
+    int events = 0;
+
+    if (ready(fd, RDONLY)) {
+      events |= ZOOKEEPER_READ;
+    } if (ready(fd, WRONLY)) {
+      events |= ZOOKEEPER_WRITE;
+    }
+
+    int ret = zookeeper_process(zh, events);
+
+    if (ret != ZOK && ret != ZNOTHING)
+      fatal("zookeeper_process failed! (%s)", zerror(ret));
+  }
+
+protected:
+  void operator () ()
+  {
+    // Lookup and cache the WatcherProcess PID associated with our
+    // Watcher _before_ we yield control via calling zookeeper_process
+    // so that Watcher callbacks can occur.
+    if (call(manager, LOOKUP_PID,
+	     reinterpret_cast<char *>(&watcher), sizeof(watcher)) != OK)
+      fatal("failed to setup underlying ZooKeeper mechanisms");
+
+    // TODO(benh): Link with WatcherProcess?
+    watcherProcess =
+      *reinterpret_cast<PID *>(const_cast<char *>(body(NULL)));
+
+    while (true) {
+      int fd;
+      int ops;
+      timeval tv;
+
+      prepare(&fd, &ops, &tv);
+
+      // TODO(benh): If tv is 0, invoke await and ignore queued messages?
+
+      if (!await(fd, ops, tv, false)) {
+	// TODO(benh): Don't handle incoming "calls" until we are connected!
+	switch (receive()) {
+	  case CREATE: {
+	    CreateCall *createCall =
+	      *reinterpret_cast<CreateCall **>(const_cast<char *>(body(NULL)));
+	    createCall->from = from();
+	    createCall->zooKeeperProcess = this;
+	    int ret = zoo_acreate(zh, createCall->path->c_str(),
+				  createCall->data->data(),
+				  createCall->data->size(),
+				  createCall->acl, createCall->flags,
+				  createCompletion, createCall);
+	    if (ret != ZOK) {
+	      createCall->ret = ret;
+	      send(createCall->from, COMPLETED);
+	    }
+	    break;
+	  }
+	  case REMOVE: {
+	    RemoveCall *removeCall =
+	      *reinterpret_cast<RemoveCall **>(const_cast<char *>(body(NULL)));
+	    removeCall->from = from();
+	    removeCall->zooKeeperProcess = this;
+	    int ret = zoo_adelete(zh, removeCall->path->c_str(),
+				  removeCall->version,
+				  removeCompletion, removeCall);
+	    if (ret != ZOK) {
+	      removeCall->ret = ret;
+	      send(removeCall->from, COMPLETED);
+	    }
+	    break;
+	  }
+	  case EXISTS: {
+	    ExistsCall *existsCall =
+	      *reinterpret_cast<ExistsCall **>(const_cast<char *>(body(NULL)));
+	    existsCall->from = from();
+	    existsCall->zooKeeperProcess = this;
+	    int ret = zoo_aexists(zh, existsCall->path->c_str(),
+				  existsCall->watch,
+				  existsCompletion, existsCall);
+	    if (ret != ZOK) {
+	      existsCall->ret = ret;
+	      send(existsCall->from, COMPLETED);
+	    }
+	    break;
+	  }
+	  case GET: {
+	    GetCall *getCall =
+	      *reinterpret_cast<GetCall **>(const_cast<char *>(body(NULL)));
+	    getCall->from = from();
+	    getCall->zooKeeperProcess = this;
+	    int ret = zoo_aget(zh, getCall->path->c_str(), getCall->watch,
+			       getCompletion, getCall);
+	    if (ret != ZOK) {
+	      getCall->ret = ret;
+	      send(getCall->from, COMPLETED);
+	    }
+	    break;
+	  }
+	  case TERMINATE: {
+	    return;
+	  }
+	  default: {
+	    fatal("unexpected interruption during await");
+	  }
+	}
+      } else {
+	process(fd, ops);
+      }
+    }
+  }
+
+public:
+  ZooKeeperProcess(ZooKeeper *_zk,
+		   const string &_hosts,
+		   int _timeout,
+		   Watcher *_watcher)
+    : zk(_zk), hosts(_hosts), timeout(_timeout), watcher(_watcher)
+  {
+    zh = zookeeper_init(hosts.c_str(), watch, timeout, NULL, this, 0);
+    if (zh == NULL)
+      fatalerror("failed to create ZooKeeper (zookeeper_init)");
+  }
+
+  ~ZooKeeperProcess()
+  {
+    int ret = zookeeper_close(zh);
+    if (ret != ZOK)
+      fatal("failed to destroy ZooKeeper (zookeeper_close): %s", zerror(ret));
+  }
+};
+
+
+
+ZooKeeper::ZooKeeper(const string &hosts, int timeout, Watcher *watcher)
+{
+  impl = new ZooKeeperProcess(this, hosts, timeout, watcher);
+  Process::spawn(impl);
+}
+
+
+ZooKeeper::~ZooKeeper()
+{
+  Process::post(impl->getPID(), TERMINATE);
+  Process::wait(impl->getPID());
+  delete impl;
+}
+
+
+int ZooKeeper::getState()
+{
+  ZooKeeperProcess *zooKeeperProcess = static_cast<ZooKeeperProcess *>(impl);
+  return zoo_state(zooKeeperProcess->zh);
+}
+
+
+int ZooKeeper::create(const string &path,
+		      const string &data,
+		      const ACL_vector &acl,
+		      int flags,
+		      string *result)
+{
+  CreateCall createCall;
+  createCall.path = &path;
+  createCall.data = &data;
+  createCall.acl = &acl;
+  createCall.flags = flags;
+  createCall.result = result;
+
+  class CreateCallProcess : public Process
+  {
+  private:
+    ZooKeeperProcess *zooKeeperProcess;
+    CreateCall *createCall;
+
+  protected:
+    void operator () ()
+    {
+      if (call(zooKeeperProcess->getPID(),
+	       CREATE,
+	       reinterpret_cast<char *>(&createCall),
+	       sizeof(CreateCall *)) != COMPLETED)
+	createCall->ret = ZSYSTEMERROR;
+    }
+
+  public:
+    CreateCallProcess(ZooKeeperProcess *_zooKeeperProcess,
+		      CreateCall *_createCall)
+      : zooKeeperProcess(_zooKeeperProcess),
+	createCall(_createCall)
+    {}
+  } createCallProcess(static_cast<ZooKeeperProcess *>(impl),
+		      &createCall);
+
+  Process::wait(Process::spawn(&createCallProcess));
+
+  return createCall.ret;
+}
+
+
+
+int ZooKeeper::remove(const string &path, int version)
+{
+  RemoveCall removeCall;
+  removeCall.path = &path;
+  removeCall.version = version;
+
+  class RemoveCallProcess : public Process
+  {
+  private:
+    ZooKeeperProcess *zooKeeperProcess;
+    RemoveCall *removeCall;
+
+  protected:
+    void operator () ()
+    {
+      if (call(zooKeeperProcess->getPID(),
+	       REMOVE,
+	       reinterpret_cast<char *>(&removeCall),
+	       sizeof(RemoveCall *)) != COMPLETED)
+	removeCall->ret = ZSYSTEMERROR;
+    }
+
+  public:
+    RemoveCallProcess(ZooKeeperProcess *_zooKeeperProcess,
+		      RemoveCall *_removeCall)
+      : zooKeeperProcess(_zooKeeperProcess),
+	removeCall(_removeCall)
+    {}
+  } removeCallProcess(static_cast<ZooKeeperProcess *>(impl),
+		      &removeCall);
+
+  Process::wait(Process::spawn(&removeCallProcess));
+
+  return removeCall.ret;
+}
+
+
+int ZooKeeper::exists(const string &path,
+		      bool watch,
+		      Stat *stat)
+{
+  ExistsCall existsCall;
+  existsCall.path = &path;
+  existsCall.watch = watch;
+  existsCall.stat = stat;
+
+  class ExistsCallProcess : public Process
+  {
+  private:
+    ZooKeeperProcess *zooKeeperProcess;
+    ExistsCall *existsCall;
+
+  protected:
+    void operator () ()
+    {
+      if (call(zooKeeperProcess->getPID(),
+	       EXISTS,
+	       reinterpret_cast<char *>(&existsCall),
+	       sizeof(ExistsCall *)) != COMPLETED)
+	existsCall->ret = ZSYSTEMERROR;
+    }
+
+  public:
+    ExistsCallProcess(ZooKeeperProcess *_zooKeeperProcess,
+		      ExistsCall *_existsCall)
+      : zooKeeperProcess(_zooKeeperProcess),
+	existsCall(_existsCall)
+    {}
+  } existsCallProcess(static_cast<ZooKeeperProcess *>(impl),
+		      &existsCall);
+
+  Process::wait(Process::spawn(&existsCallProcess));
+
+  return existsCall.ret;
+}
+
+
+int ZooKeeper::get(const string &path,
+		   bool watch,
+		   string *result,
+		   Stat *stat)
+{
+  GetCall getCall;
+  getCall.path = &path;
+  getCall.watch = watch;
+  getCall.result = result;
+  getCall.stat = stat;
+
+  class GetCallProcess : public Process
+  {
+  private:
+    ZooKeeperProcess *zooKeeperProcess;
+    GetCall *getCall;
+
+  protected:
+    void operator () ()
+    {
+      if (call(zooKeeperProcess->getPID(),
+	       GET,
+	       reinterpret_cast<char *>(&getCall),
+	       sizeof(GetCall *)) != COMPLETED)
+	getCall->ret = ZSYSTEMERROR;
+    }
+
+  public:
+    GetCallProcess(ZooKeeperProcess *_zooKeeperProcess,
+		   GetCall *_getCall)
+      : zooKeeperProcess(_zooKeeperProcess),
+	getCall(_getCall)
+    {}
+  } getCallProcess(static_cast<ZooKeeperProcess *>(impl),
+		   &getCall);
+
+  Process::wait(Process::spawn(&getCallProcess));
+
+  return getCall.ret;
+}
+
+
+const char * ZooKeeper::error(int ret) const
+{
+  return zerror(ret);
+}
+
+
+// class TestWatcher : public Watcher
+// {
+// public:
+//   void process(ZooKeeper *zk, int type, int state, const string &path)
+//   {
+//     cout << "TestWatcher::process" << endl;
+//   }
+// };
+
+
+// int main(int argc, char** argv)
+// {
+//   TestWatcher watcher;
+//   ZooKeeper zk(argv[1], 10000, &watcher);
+//   sleep(10);
+//   return 0;
+// }

Added: incubator/mesos/trunk/src/zookeeper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.hpp?rev=1131627&view=auto
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.hpp (added)
+++ incubator/mesos/trunk/src/zookeeper.hpp Sun Jun  5 03:31:45 2011
@@ -0,0 +1,241 @@
+/**
+ * ZooKeeper C++ API. Originally created to be used with the
+ * Libprocess library (http://www.eecs.berkeley.edu/~benh/libprocess),
+ * these C++ classes could also use a different underlying
+ * implementation, such as calling the synchronous functions exposed
+ * by the ZooKeeper C API.
+ *
+ * To provide for varying underlying implementations the pimpl idiom
+ * (also known as the compiler-firewall, bridge pattern, etc) was used
+ * for the ZooKeeper class. While the Watcher class may need some to
+ * support varying underlying implementation details, we choose for
+ * now to keep it a cleaner interface.
+ *
+ * Author: Benjamin Hindman <be...@berkeley.edu>
+*/
+#ifndef ZOOKEEPER_HPP
+#define ZOOKEEPER_HPP
+
+#include <zookeeper.h>
+
+#include <string>
+
+
+/* Forward declarations of classes we are using. */
+class ZooKeeper;
+class ZooKeeperImpl;
+
+
+/**
+ * This interface specifies the public interface an event handler
+ * class must implement. A ZooKeeper client will get various events
+ * from the ZooKeepr server it connects to. An application using such
+ * a client handles these events by registering a callback object with
+ * the client. The callback object is expected to be an instance of a
+ * class that implements Watcher interface.
+ */
+class Watcher
+{
+public:
+  virtual void process(ZooKeeper *zk,
+		       int type,
+		       int state,
+		       const std::string &path) = 0;
+
+  Watcher();
+  ~Watcher();
+  Watcher(const Watcher &that);
+  Watcher & operator = (const Watcher &that);
+};
+
+
+/**
+ * TODO(benh): Clean up this documentation.
+ *
+ * This is the main class of ZooKeeper client library. To use a
+ * ZooKeeper service, an application must first instantiate an object
+ * of ZooKeeper class. All the iterations will be done by calling the
+ * methods of ZooKeeper class.
+ *
+ * Once a connection to a server is established, a session ID is
+ * assigned to the client. The client will send heart beats to the
+ * server periodically to keep the session valid.
+ *
+ * The application can call ZooKeeper APIs through a client as long as
+ * the session ID of the client remains valid.
+ *
+ * If for some reason, the client fails to send heart beats to the
+ * server for a prolonged period of time (exceeding the sessionTimeout
+ * value, for instance), the server will expire the session, and the
+ * session ID will become invalid. The client object will no longer be
+ * usable. To make ZooKeeper API calls, the application must create a
+ * new client object.
+ *
+ * If the ZooKeeper server the client currently connects to fails or
+ * otherwise does not respond, the client will automatically try to
+ * connect to another server before its session ID expires. If
+ * successful, the application can continue to use the client.
+ *
+ * Some successful ZooKeeper API calls can leave watches on the "data
+ * nodes" in the ZooKeeper server. Other successful ZooKeeper API
+ * calls can trigger those watches. Once a watch is triggered, an
+ * event will be delivered to the client which left the watch at the
+ * first place. Each watch can be triggered only once. Thus, up to one
+ * event will be delivered to a client for every watch it leaves.
+ *
+ * A client needs an object of a class implementing Watcher interface
+ * for processing the events delivered to the client. When a client
+ * drops current connection and re-connects to a server, all the
+ * existing watches are considered as being triggered but the
+ * undelivered events are lost. To emulate this, the client will
+ * generate a special event to tell the event handler a connection has
+ * been dropped. This special event has type EventNone and state
+ * sKeeperStateDisconnected.
+ */
+class ZooKeeper
+{
+private:
+  /* ZooKeeper instances are not copyable. */
+  ZooKeeper(const ZooKeeper &that);
+  ZooKeeper & operator = (const ZooKeeper &that);
+
+protected:
+  /* Underlying implementation (pimpl idiom). */
+  ZooKeeperImpl *impl;
+
+public:
+  /**
+   * \brief instantiate new ZooKeeper client.
+   *
+   * The constructor initiates a new session, however session
+   * establishment is asynchronous, meaning that the session should
+   * not be considered established until (and unless) an event of
+   * state ZOO_CONNECTED_STATE is received.
+   * \param hosts comma separated host:port pairs, each corresponding
+   *    to a ZooKeeper server. e.g. "127.0.0.1:3000,127.0.0.1:3001"
+   * \param watcher the instance of Watcher that receives event
+   *    callbacks. When notifications are triggered the Watcher::process
+   *    method will be invoked.
+   */
+  ZooKeeper(const std::string &hosts, int timeout, Watcher *watcher);
+
+  ~ZooKeeper();
+
+  /**
+   * \brief get the state of the zookeeper connection.
+   * 
+   * The return value will be one of the \ref State Consts.
+   */
+  int getState();
+
+  /**
+   * \brief create a node synchronously.
+   * 
+   * This method will create a node in ZooKeeper. A node can only be
+   * created if it does not already exists. The Create Flags affect
+   * the creation of nodes.  If ZOO_EPHEMERAL flag is set, the node
+   * will automatically get removed if the client session goes
+   * away. If the ZOO_SEQUENCE flag is set, a unique monotonically
+   * increasing sequence number is appended to the path name.
+   * 
+   * \param path The name of the node. Expressed as a file name with
+   *    slashes separating ancestors of the node.
+   * \param data The data to be stored in the node.
+   * \param acl The initial ACL of the node. If null, the ACL of the
+   *    parent will be used.
+   * \param flags this parameter can be set to 0 for normal create or
+   *    an OR of the Create Flags
+   * \param result A string which will be filled with the path of
+   *    the new node (this might be different than the supplied path
+   *    because of the ZOO_SEQUENCE flag).  The path string will always
+   *    be null-terminated.
+   * \return  one of the following codes are returned:
+   * ZOK operation completed succesfully
+   * ZNONODE the parent node does not exist.
+   * ZNODEEXISTS the node already exists
+   * ZNOAUTH the client does not have permission.
+   * ZNOCHILDRENFOREPHEMERALS cannot create children of ephemeral nodes.
+   * ZBADARGUMENTS - invalid input parameters
+   * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+   * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+   */
+  int create(const std::string &path,
+	     const std::string &data,
+	     const ACL_vector &acl,
+	     int flags,
+	     std::string *result);
+
+  /**
+   * \brief delete a node in zookeeper synchronously.
+   * 
+   * \param path the name of the node. Expressed as a file name with
+   *    slashes separating ancestors of the node.
+   * \param version the expected version of the node. The function
+   *    will fail if the actual version of the node does not match the
+   *    expected version. If -1 is used the version check will not take
+   *    place.
+   * \return one of the following values is returned.
+   * ZOK operation completed succesfully
+   * ZNONODE the node does not exist.
+   * ZNOAUTH the client does not have permission.
+   * ZBADVERSION expected version does not match actual version.
+   * ZNOTEMPTY children are present; node cannot be deleted.
+   * ZBADARGUMENTS - invalid input parameters
+   * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+   * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+   */
+  int remove(const std::string &path, int version);
+
+  /**
+   * \brief checks the existence of a node in zookeeper synchronously.
+   * 
+   * \param path the name of the node. Expressed as a file name with
+   *    slashes separating ancestors of the node.
+   * \param watch if true, a watch will be set at the server to
+   *    notify the client if the node changes. The watch will be set even
+   *    if the node does not exist. This allows clients to watch for
+   *    nodes to appear.
+   * \param stat the return stat value of the node.
+   * \return return code of the function call.
+   * ZOK operation completed succesfully
+   * ZNONODE the node does not exist.
+   * ZNOAUTH the client does not have permission.
+   * ZBADARGUMENTS - invalid input parameters
+   * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+   * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+   */
+  int exists(const std::string &path, bool watch, Stat *stat);
+
+  /**
+   * \brief gets the data associated with a node synchronously.
+   * 
+   * \param path the name of the node. Expressed as a file name with
+   *    slashes separating ancestors of the node.
+   * \param watch if nonzero, a watch will be set at the server to
+   *    notify the client if the node changes.
+   * \param result the data returned by the server
+   * \param stat if not NULL, will hold the value of stat for the path
+   *    on return.
+   * \return return value of the function call.
+   * ZOK operation completed succesfully
+   * ZNONODE the node does not exist.
+   * ZNOAUTH the client does not have permission.
+   * ZBADARGUMENTS - invalid input parameters
+   * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+   * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+   */
+  int get(const std::string &path,
+	  bool watch,
+	  std::string *result,
+	  Stat *stat);
+
+  /**
+   * \brief return a string describing the last error.
+   * 
+   * \return string corresponding to the last error
+   */
+  const char * error(int ret) const;
+};
+
+
+#endif /* ZOOKEEPER_HPP */