You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/05/08 19:50:27 UTC

git commit: Refactored ZooKeeperImpl into ZooKeeperProcess.

Repository: mesos
Updated Branches:
  refs/heads/master a3d0d88af -> e4336578a


Refactored ZooKeeperImpl into ZooKeeperProcess.

See MESOS-1318 for more details.

Review: https://reviews.apache.org/r/21182


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e4336578
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e4336578
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e4336578

Branch: refs/heads/master
Commit: e4336578a9ed932b0891e23fd83f7f7148689e0e
Parents: a3d0d88
Author: Benjamin Hindman <be...@berkeley.edu>
Authored: Thu May 8 10:50:04 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Thu May 8 10:50:04 2014 -0700

----------------------------------------------------------------------
 src/zookeeper/zookeeper.cpp | 300 ++++++++++++++++++++++++++++-----------
 src/zookeeper/zookeeper.hpp |  40 +++---
 2 files changed, 235 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e4336578/src/zookeeper/zookeeper.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/zookeeper.cpp b/src/zookeeper/zookeeper.cpp
index e3b65e4..11029be 100644
--- a/src/zookeeper/zookeeper.cpp
+++ b/src/zookeeper/zookeeper.cpp
@@ -21,8 +21,8 @@
 #include <iostream>
 #include <map>
 
+#include <process/defer.hpp>
 #include <process/dispatch.hpp>
-#include <process/once.hpp>
 #include <process/process.hpp>
 
 #include <stout/duration.hpp>
@@ -47,27 +47,32 @@ using std::vector;
 using tuples::tuple;
 
 
-class ZooKeeperImpl
+class ZooKeeperProcess : public Process<ZooKeeperProcess>
 {
 public:
-  ZooKeeperImpl(ZooKeeper* zk,
-                const string& servers,
-                const Duration& timeout,
-                Watcher* watcher)
+  ZooKeeperProcess(
+      ZooKeeper* zk,
+      const string& servers,
+      const Duration& timeout,
+      Watcher* watcher)
     : servers(servers),
-      zk(zk),
-      watcher(watcher)
+      timeout(timeout),
+      zh(NULL)
   {
-    if (watcher == NULL) {
-      LOG(FATAL) << "Cannot instantiate ZooKeeper with NULL watcher";
-    }
+    // We bind the Watcher::process callback so we can pass it to the
+    // C callback as a pointer and invoke it directly.
+    callback = lambda::bind(
+        &Watcher::process, watcher, zk, lambda::_1, lambda::_2, lambda::_3);
+  }
 
+  virtual void initialize()
+  {
     zh = zookeeper_init(
         servers.c_str(),
         event,
         static_cast<int>(timeout.ms()),
         NULL,
-        this,
+        &callback,
         0);
 
     if (zh == NULL) {
@@ -75,7 +80,7 @@ public:
     }
   }
 
-  ~ZooKeeperImpl()
+  virtual void finalize()
   {
     int ret = zookeeper_close(zh);
     if (ret != ZOK) {
@@ -84,6 +89,25 @@ public:
     }
   }
 
+  int getState()
+  {
+    return zoo_state(zh);
+  }
+
+  int64_t getSessionId()
+  {
+    return zoo_client_id(zh)->client_id;
+  }
+
+  Duration getSessionTimeout()
+  {
+    // ZooKeeper server uses int representation of milliseconds for
+    // session timeouts.
+    // See:
+    // http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html
+    return Milliseconds(zoo_recv_timeout(zh));
+  }
+
   Future<int> authenticate(const string& scheme, const string& credentials)
   {
     Promise<int>* promise = new Promise<int>();
@@ -92,8 +116,13 @@ public:
 
     tuple<Promise<int>*>* args = new tuple<Promise<int>*>(promise);
 
-    int ret = zoo_add_auth(zh, scheme.c_str(), credentials.data(),
-                           credentials.size(), voidCompletion, args);
+    int ret = zoo_add_auth(
+        zh,
+        scheme.c_str(),
+        credentials.data(),
+        credentials.size(),
+        voidCompletion,
+        args);
 
     if (ret != ZOK) {
       delete promise;
@@ -104,8 +133,12 @@ public:
     return future;
   }
 
-  Future<int> create(const string& path, const string& data,
-                     const ACL_vector& acl, int flags, string* result)
+  Future<int> create(
+      const string& path,
+      const string& data,
+      const ACL_vector& acl,
+      int flags,
+      string* result)
   {
     Promise<int>* promise = new Promise<int>();
 
@@ -114,8 +147,15 @@ public:
     tuple<Promise<int>*, string*>* args =
       new tuple<Promise<int>*, string*>(promise, result);
 
-    int ret = zoo_acreate(zh, path.c_str(), data.data(), data.size(), &acl,
-                          flags, stringCompletion, args);
+    int ret = zoo_acreate(
+        zh,
+        path.c_str(),
+        data.data(),
+        data.size(),
+        &acl,
+        flags,
+        stringCompletion,
+        args);
 
     if (ret != ZOK) {
       delete promise;
@@ -126,6 +166,82 @@ public:
     return future;
   }
 
+  Future<int> create(
+      const string& path,
+      const string& data,
+      const ACL_vector& acl,
+      int flags,
+      string* result,
+      bool recursive)
+  {
+    if (!recursive) {
+      return create(path, data, acl, flags, result);
+    }
+
+    // First check if the path exists.
+    return exists(path, false, NULL)
+      .then(defer(self(),
+                  &Self::_create,
+                  path,
+                  data,
+                  acl,
+                  flags,
+                  result,
+                  lambda::_1));
+  }
+
+  Future<int> _create(
+      const string& path,
+      const string& data,
+      const ACL_vector& acl,
+      int flags,
+      string* result,
+      int code)
+  {
+    if (code == ZOK) {
+      return ZNODEEXISTS;
+    }
+
+    // Now recursively create the parent path.
+    // NOTE: We don't use 'dirname()' to get the parent path here
+    // because, it doesn't return the expected path when a path ends
+    // with "/". For example, to create path "/a/b/", we want to
+    // recursively create "/a/b", instead of just creating "/a".
+    const string& parent = path.substr(0, path.find_last_of("/"));
+    if (!parent.empty()) {
+      return create(parent, "", acl, 0, result, true)
+        .then(defer(self(),
+                    &Self::__create,
+                    path,
+                    data,
+                    acl,
+                    flags,
+                    result,
+                    lambda::_1));
+    }
+
+    return __create(path, data, acl, flags, result, ZOK);
+  }
+
+  Future<int> __create(
+      const string& path,
+      const string& data,
+      const ACL_vector& acl,
+      int flags,
+      string* result,
+      int code)
+  {
+    if (code != ZOK && code != ZNODEEXISTS) {
+      return code;
+    }
+
+    // Finally create the path.
+    // TODO(vinod): Delete any intermediate nodes created if this fails.
+    // This requires synchronization because the deletion might affect
+    // other callers (different threads/processes) acting on this path.
+    return create(path, data, acl, flags, result);
+  }
+
   Future<int> remove(const string& path, int version)
   {
     Promise<int>* promise = new Promise<int>();
@@ -185,9 +301,10 @@ public:
     return future;
   }
 
-  Future<int> getChildren(const string& path,
-                          bool watch,
-                          vector<string>* results)
+  Future<int> getChildren(
+      const string& path,
+      bool watch,
+      vector<string>* results)
   {
     Promise<int>* promise = new Promise<int>();
 
@@ -196,8 +313,8 @@ public:
     tuple<Promise<int>*, vector<string>*>* args =
       new tuple<Promise<int>*, vector<string>*>(promise, results);
 
-    int ret = zoo_aget_children(zh, path.c_str(), watch, stringsCompletion,
-                                args);
+    int ret =
+      zoo_aget_children(zh, path.c_str(), watch, stringsCompletion, args);
 
     if (ret != ZOK) {
       delete promise;
@@ -217,8 +334,14 @@ public:
     tuple<Promise<int>*, Stat*>* args =
       new tuple<Promise<int>*, Stat*>(promise, NULL);
 
-    int ret = zoo_aset(zh, path.c_str(), data.data(), data.size(),
-                       version, statCompletion, args);
+    int ret = zoo_aset(
+        zh,
+        path.c_str(),
+        data.data(),
+        data.size(),
+        version,
+        statCompletion,
+        args);
 
     if (ret != ZOK) {
       delete promise;
@@ -233,16 +356,17 @@ private:
   // This method is registered as a watcher callback function and is
   // invoked by a single ZooKeeper event thread.
   static void event(
-      zhandle_t* zh,
+      zhandle_t*,
       int type,
       int state,
       const char* path,
-      void* ctx)
+      void* context)
   {
-    ZooKeeperImpl* impl = static_cast<ZooKeeperImpl*>(ctx);
-    impl->watcher->process(impl->zk, type, state, string(path));
-  }
+    lambda::function<void(int, int, const string&)>* callback =
+      static_cast<lambda::function<void(int, int, const string&)>*>(context);
 
+    (*callback)(type, state, string(path));
+  }
 
   static void voidCompletion(int ret, const void *data)
   {
@@ -257,7 +381,6 @@ private:
     delete args;
   }
 
-
   static void stringCompletion(int ret, const char* value, const void* data)
   {
     const tuple<Promise<int>*, string*> *args =
@@ -278,7 +401,6 @@ private:
     delete args;
   }
 
-
   static void statCompletion(int ret, const Stat* stat, const void* data)
   {
     const tuple<Promise<int>*, Stat*>* args =
@@ -299,7 +421,6 @@ private:
     delete args;
   }
 
-
   static void dataCompletion(
       int ret,
       const char* value,
@@ -330,7 +451,6 @@ private:
     delete args;
   }
 
-
   static void stringsCompletion(
       int ret,
       const String_vector* values,
@@ -360,53 +480,59 @@ private:
   friend class ZooKeeper;
 
   const string servers; // ZooKeeper host:port pairs.
+  const Duration timeout; // ZooKeeper session timeout;
 
-  ZooKeeper* zk; // ZooKeeper instance.
   zhandle_t* zh; // ZooKeeper connection handle.
 
-  Watcher* watcher; // Associated Watcher instance.
+  // Callback for invoking Watcher::process with the ZooKeeper*
+  // argument and Watcher* receiver already bound.
+  lambda::function<void(int, int, const string&)> callback;
 };
 
 
-ZooKeeper::ZooKeeper(const string& servers,
-                     const Duration& timeout,
-                     Watcher* watcher)
+ZooKeeper::ZooKeeper(
+    const string& servers,
+    const Duration& timeout,
+    Watcher* watcher)
 {
-  impl = new ZooKeeperImpl(this, servers, timeout, watcher);
+  process = new ZooKeeperProcess(this, servers, timeout, watcher);
+  spawn(process);
 }
 
 
 ZooKeeper::~ZooKeeper()
 {
-  delete impl;
+  terminate(process);
+  wait(process);
+  delete process;
 }
 
 
 int ZooKeeper::getState()
 {
-  return zoo_state(impl->zh);
+  return dispatch(process, &ZooKeeperProcess::getState).get();
 }
 
 
 int64_t ZooKeeper::getSessionId()
 {
-  return zoo_client_id(impl->zh)->client_id;
+  return dispatch(process, &ZooKeeperProcess::getSessionId).get();
 }
 
 
 Duration ZooKeeper::getSessionTimeout() const
 {
-  // ZooKeeper server uses int representation of milliseconds for
-  // session timeouts.
-  // See:
-  // http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html
-  return Milliseconds(zoo_recv_timeout(impl->zh));
+  return dispatch(process, &ZooKeeperProcess::getSessionTimeout).get();
 }
 
 
 int ZooKeeper::authenticate(const string& scheme, const string& credentials)
 {
-  return impl->authenticate(scheme, credentials).get();
+  return dispatch(
+      process,
+      &ZooKeeperProcess::authenticate,
+      scheme,
+      credentials).get();
 }
 
 
@@ -418,65 +544,69 @@ int ZooKeeper::create(
     string* result,
     bool recursive)
 {
-  if (!recursive) {
-    return impl->create(path, data, acl, flags, result).get();
-  }
-
-  // First check if the path exists.
-  int code = impl->exists(path, false, NULL).get();
-  if (code == ZOK) {
-    return ZNODEEXISTS;
-  }
-
-  // Now recursively create the parent path.
-  // NOTE: We don't use 'dirname()' to get the parent path here
-  // because, it doesn't return the expected path when a path ends
-  // with "/". For example, to create path "/a/b/", we want to
-  // recursively create "/a/b", instead of just creating "/a".
-  const string& parent = path.substr(0, path.find_last_of("/"));
-  if (!parent.empty()) {
-    code = create(parent, "", acl, 0, result, true);
-    if (code != ZOK && code != ZNODEEXISTS) {
-      return code;
-    }
-  }
-
-  // Finally create the path.
-  // TODO(vinod): Delete any intermediate nodes created if this fails.
-  // This requires synchronization because the deletion might affect
-  // other callers (different threads/processes) acting on this path.
-  return impl->create(path, data, acl, flags, result).get();
+  return dispatch(
+      process,
+      &ZooKeeperProcess::create,
+      path,
+      data,
+      acl,
+      flags,
+      result,
+      recursive).get();
 }
 
 
 int ZooKeeper::remove(const string& path, int version)
 {
-  return impl->remove(path, version).get();
+  return dispatch(process, &ZooKeeperProcess::remove, path, version).get();
 }
 
 
 int ZooKeeper::exists(const string& path, bool watch, Stat* stat)
 {
-  return impl->exists(path, watch, stat).get();
+  return dispatch(
+      process,
+      &ZooKeeperProcess::exists,
+      path,
+      watch,
+      stat).get();
 }
 
 
 int ZooKeeper::get(const string& path, bool watch, string* result, Stat* stat)
 {
-  return impl->get(path, watch, result, stat).get();
+  return dispatch(
+      process,
+      &ZooKeeperProcess::get,
+      path,
+      watch,
+      result,
+      stat).get();
 }
 
 
-int ZooKeeper::getChildren(const string& path, bool watch,
-                           vector<string>* results)
+int ZooKeeper::getChildren(
+    const string& path,
+    bool watch,
+    vector<string>* results)
 {
-  return impl->getChildren(path, watch, results).get();
+  return dispatch(
+      process,
+      &ZooKeeperProcess::getChildren,
+      path,
+      watch,
+      results).get();
 }
 
 
 int ZooKeeper::set(const string& path, const string& data, int version)
 {
-  return impl->set(path, data, version).get();
+  return dispatch(
+      process,
+      &ZooKeeperProcess::set,
+      path,
+      data,
+      version).get();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e4336578/src/zookeeper/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/zookeeper.hpp b/src/zookeeper/zookeeper.hpp
index 242e2de..c3c182b 100644
--- a/src/zookeeper/zookeeper.hpp
+++ b/src/zookeeper/zookeeper.hpp
@@ -36,7 +36,7 @@
 
 /* Forward declarations of classes we are using. */
 class ZooKeeper;
-class ZooKeeperImpl;
+class ZooKeeperProcess;
 
 /**
  * This interface specifies the public interface an event handler
@@ -53,10 +53,10 @@ class Watcher
 {
 public:
   virtual void process(
-      ZooKeeper *zk,
+      ZooKeeper* zk,
       int type,
       int state,
-      const std::string &path) = 0;
+      const std::string& path) = 0;
 
   virtual ~Watcher() {}
 };
@@ -121,9 +121,9 @@ public:
    *    callbacks. When notifications are triggered the Watcher::process
    *    method will be invoked.
    */
-  ZooKeeper(const std::string &servers,
+  ZooKeeper(const std::string& servers,
             const Duration& timeout,
-            Watcher *watcher);
+            Watcher* watcher);
 
   ~ZooKeeper();
 
@@ -189,11 +189,11 @@ public:
    * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
    */
   int create(
-      const std::string &path,
-      const std::string &data,
-      const ACL_vector &acl,
+      const std::string& path,
+      const std::string& data,
+      const ACL_vector& acl,
       int flags,
-      std::string *result,
+      std::string* result,
       bool recursive = false);
 
   /**
@@ -215,7 +215,7 @@ public:
    * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
    * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
    */
-  int remove(const std::string &path, int version);
+  int remove(const std::string& path, int version);
 
   /**
    * \brief checks the existence of a node in zookeeper synchronously.
@@ -235,7 +235,7 @@ public:
    * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
    * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
    */
-  int exists(const std::string &path, bool watch, Stat *stat);
+  int exists(const std::string& path, bool watch, Stat* stat);
 
   /**
    * \brief gets the data associated with a node synchronously.
@@ -256,10 +256,10 @@ public:
    * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
    */
   int get(
-      const std::string &path,
+      const std::string& path,
       bool watch,
-      std::string *result,
-      Stat *stat);
+      std::string* result,
+      Stat* stat);
 
   /**
    * \brief lists the children of a node synchronously.
@@ -278,9 +278,9 @@ public:
    * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
    */
   int getChildren(
-      const std::string &path,
+      const std::string& path,
       bool watch,
-      std::vector<std::string> *results);
+      std::vector<std::string>* results);
 
   /**
    * \brief sets the data associated with a node.
@@ -300,7 +300,7 @@ public:
    * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
    * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
    */
-  int set(const std::string &path, const std::string &data, int version);
+  int set(const std::string& path, const std::string& data, int version);
 
   /**
    * \brief return a message describing the return code.
@@ -321,12 +321,12 @@ public:
 
 protected:
   /* Underlying implementation (pimpl idiom). */
-  ZooKeeperImpl *impl;
+  ZooKeeperProcess* process;
 
 private:
   /* ZooKeeper instances are not copyable. */
-  ZooKeeper(const ZooKeeper &that);
-  ZooKeeper & operator = (const ZooKeeper &that);
+  ZooKeeper(const ZooKeeper& that);
+  ZooKeeper& operator = (const ZooKeeper& that);
 };