You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:21:17 UTC

svn commit: r1132297 - in /incubator/mesos/trunk/src: Makefile.in common/zookeeper.cpp examples/Makefile.in tests/Makefile.in

Author: benh
Date: Sun Jun  5 09:21:17 2011
New Revision: 1132297

URL: http://svn.apache.org/viewvc?rev=1132297&view=rev
Log:
Added support for using the "threaded" ZooKeeper library. This is to
avoid issues that arise because libprocess currently uses only a
single thread. The original, non-threaded, version of the code has
been kept around in case in the future (i.e., after libprocess uses
multiple threads) we want to return to the non-threaded ZooKeeper
library.

Modified:
    incubator/mesos/trunk/src/Makefile.in
    incubator/mesos/trunk/src/common/zookeeper.cpp
    incubator/mesos/trunk/src/examples/Makefile.in
    incubator/mesos/trunk/src/tests/Makefile.in

Modified: incubator/mesos/trunk/src/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.in?rev=1132297&r1=1132296&r2=1132297&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.in (original)
+++ incubator/mesos/trunk/src/Makefile.in Sun Jun  5 09:21:17 2011
@@ -92,7 +92,7 @@ LIBS += -lprotobuf -lglog -lprocess -lev
 
 # Add ZooKeeper if necessary.
 ifeq ($(WITH_ZOOKEEPER),1)
-  LIBS += -lzookeeper_st
+  LIBS += -lzookeeper_mt
 endif
 
 MASTER_OBJ = master/master.o master/slaves_manager.o			\

Modified: incubator/mesos/trunk/src/common/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/zookeeper.cpp?rev=1132297&r1=1132296&r2=1132297&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/common/zookeeper.cpp Sun Jun  5 09:21:17 2011
@@ -11,9 +11,12 @@
 
 #include "common/fatal.hpp"
 
+#define USE_THREADED_ZOOKEEPER
+
 using boost::cref;
 using boost::tuple;
 
+using process::Future;
 using process::PID;
 using process::Process;
 using process::Promise;
@@ -57,29 +60,22 @@ WatcherProcessManager* manager;
 class WatcherProcess : public Process<WatcherProcess>
 {
 public:
-  WatcherProcess(Watcher* _watcher) : watcher(_watcher) {}
+  WatcherProcess(Watcher* watcher) : watcher(watcher) {}
 
   void event(ZooKeeper* zk, int type, int state, const string& path)
   {
     watcher->process(zk, type, state, path);
   }
 
-protected:
-  virtual void operator () ()
-  {
-    do serve();
-    while (name() != process::TERMINATE);
-  }
-
 private:
-  Watcher *watcher;
+  Watcher* watcher;
 };
 
 
 class WatcherProcessManager : public Process<WatcherProcessManager>
 {
 public:
-  Promise<WatcherProcess*> create(Watcher* watcher)
+  WatcherProcess* create(Watcher* watcher)
   {
     WatcherProcess* process = new WatcherProcess(watcher);
     spawn(process);
@@ -98,7 +94,7 @@ public:
     }
   }
 
-  Promise<PID<WatcherProcess> > lookup(Watcher* watcher)
+  PID<WatcherProcess> lookup(Watcher* watcher)
   {
     if (processes.count(watcher) > 0) {
       return processes[watcher]->self();
@@ -144,27 +140,37 @@ Watcher::~Watcher()
 }
 
 
-class ZooKeeperImpl : public Process<ZooKeeperImpl> {};
-
-
-class ZooKeeperProcess : public ZooKeeperImpl
+#ifndef USE_THREADED_ZOOKEEPER
+class ZooKeeperImpl : public Process<ZooKeeperImpl>
+#else
+class ZooKeeperImpl
+#endif // USE_THREADED_ZOOKEEPER
 {
 public:
-  ZooKeeperProcess(ZooKeeper* _zk, const string& _hosts, int _timeout,
-                   Watcher* _watcher)
-    : zk(_zk), hosts(_hosts), timeout(_timeout), watcher(_watcher)
+  ZooKeeperImpl(ZooKeeper* zk, const string& hosts, int timeout,
+		Watcher* watcher)
+    : zk(zk), hosts(hosts), timeout(timeout), watcher(watcher)
   {
     if (watcher == NULL) {
       fatalerror("cannot instantiate ZooKeeper with NULL watcher");
     }
 
-    zh = zookeeper_init(hosts.c_str(), watch, timeout, NULL, this, 0);
+    // Lookup PID of the WatcherProcess associated with the Watcher.
+    pid = call(manager->self(), &WatcherProcessManager::lookup, watcher);
+
+    // N.B. The Watcher and thus WatcherProcess may already be gone,
+    // in which case, each dispatch to the WatcherProcess that we do
+    // will just get dropped on the floor.
+
+    // TODO(benh): Link with WatcherProcess PID?
+
+    zh = zookeeper_init(hosts.c_str(), event, timeout, NULL, this, 0);
     if (zh == NULL) {
       fatalerror("failed to create ZooKeeper (zookeeper_init)");
     }
   }
 
-  ~ZooKeeperProcess()
+  ~ZooKeeperImpl()
   {
     int ret = zookeeper_close(zh);
     if (ret != ZOK) {
@@ -278,20 +284,10 @@ public:
     return promise;
   }
 
+#ifndef USE_THREADED_ZOOKEEPER
 protected:
   virtual void operator () ()
   {
-    // Lookup and cache the PID of the WatcherProcess associated with
-    // our Watcher before we yield control via calling
-    // zookeeper_process so that Watcher callbacks can occur.
-    pid = call(manager->self(), &WatcherProcessManager::lookup, watcher);
-
-    // N.B. The Watcher and thus WatcherProcess may already be gone,
-    // in which case, each dispatch to the WatcherProcess that we do
-    // will just get dropped on the floor.
-
-    // TODO(benh): Link with WatcherProcess PID?
-
     while (true) {
       int fd;
       int ops;
@@ -325,13 +321,66 @@ protected:
     }
   }
 
+  bool prepare(int* fd, int* ops, timeval* tv)
+  {
+    int interest = 0;
+
+    int ret = zookeeper_interest(zh, fd, &interest, tv);
+
+    // If in some disconnected state, try again later.
+    if (ret == ZINVALIDSTATE ||
+        ret == ZCONNECTIONLOSS ||
+	ret == ZOPERATIONTIMEOUT) {
+      return false;
+    }
+
+    if (ret != ZOK) {
+      fatal("zookeeper_interest failed! (%s)", zerror(ret));
+    }
+
+    *ops = 0;
+
+    if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
+      *ops |= RDWR;
+    } else if (interest & ZOOKEEPER_READ) {
+      *ops |= RDONLY;
+    } else if (interest & ZOOKEEPER_WRITE) {
+      *ops |= WRONLY;
+    }
+
+    return true;
+  }
+
+  void process(int fd, int ops)
+  {
+    int events = 0;
+
+    if (ready(fd, RDONLY)) {
+      events |= ZOOKEEPER_READ;
+    } if (ready(fd, WRONLY)) {
+      events |= ZOOKEEPER_WRITE;
+    }
+
+    int ret = zookeeper_process(zh, events);
+
+    // If in some disconnected state, try again later.
+    if (ret == ZINVALIDSTATE || ret == ZCONNECTIONLOSS) {
+      return;
+    }
+
+    if (ret != ZOK && ret != ZNOTHING) {
+      fatal("zookeeper_process failed! (%s)", zerror(ret));
+    }
+  }
+#endif // USE_THREADED_ZOOKEEPER
+
 private:
-  static void watch(zhandle_t* zh, int type, int state,
+  static void event(zhandle_t* zh, int type, int state,
 		    const char* path, void* ctx)
   {
-    ZooKeeperProcess* zooKeeperProcess = static_cast<ZooKeeperProcess*>(ctx);
-    process::dispatch(zooKeeperProcess->pid, &WatcherProcess::event,
-                      zooKeeperProcess->zk, type, state, string(path));
+    ZooKeeperImpl* impl = static_cast<ZooKeeperImpl*>(ctx);
+    process::dispatch(impl->pid, &WatcherProcess::event,
+		      impl->zk, type, state, string(path));
   }
 
   static void voidCompletion(int ret, const void *data)
@@ -346,7 +395,6 @@ private:
     delete args;
   }
 
-
   static void stringCompletion(int ret, const char* value, const void* data)
   {
     const tuple<Promise<int>, string*> *args =
@@ -357,7 +405,7 @@ private:
 
     if (ret == 0) {
       if (result != NULL) {
-        result->assign(value);
+	result->assign(value);
       }
     }
 
@@ -366,7 +414,6 @@ private:
     delete args;
   }
 
-
   static void statCompletion(int ret, const Stat* stat, const void* data)
   {
     const tuple<Promise<int>, Stat*>* args =
@@ -377,7 +424,7 @@ private:
 
     if (ret == 0) {
       if (stat_result != NULL) {
-        *stat_result = *stat;
+	*stat_result = *stat;
       }
     }
 
@@ -386,8 +433,9 @@ private:
     delete args;
   }
 
+
   static void dataCompletion(int ret, const char* value, int value_len,
-                             const Stat* stat, const void* data)
+			     const Stat* stat, const void* data)
   {
     const tuple<Promise<int>, string*, Stat*>* args =
       reinterpret_cast<const tuple<Promise<int>, string*, Stat*>*>(data);
@@ -398,11 +446,11 @@ private:
 
     if (ret == 0) {
       if (result != NULL) {
-        result->assign(value, value_len);
+	result->assign(value, value_len);
       }
 
       if (stat_result != NULL) {
-        *stat_result = *stat;
+	*stat_result = *stat;
       }
     }
 
@@ -411,8 +459,9 @@ private:
     delete args;
   }
 
+
   static void stringsCompletion(int ret, const String_vector* values,
-                                const void* data)
+				const void* data)
   {
     const tuple<Promise<int>, vector<string>*>* args =
       reinterpret_cast<const tuple<Promise<int>, vector<string>*>*>(data);
@@ -422,9 +471,9 @@ private:
 
     if (ret == 0) {
       if (results != NULL) {
-        for (int i = 0; i < values->count; i++) {
-          results->push_back(values->data[i]);
-        }
+	for (int i = 0; i < values->count; i++) {
+	  results->push_back(values->data[i]);
+	}
       }
     }
 
@@ -433,64 +482,13 @@ private:
     delete args;
   }
 
-  bool prepare(int* fd, int* ops, timeval* tv)
-  {
-    int interest = 0;
-
-    int ret = zookeeper_interest(zh, fd, &interest, tv);
-
-    // If in some disconnected state, try again later.
-    if (ret == ZINVALIDSTATE ||
-        ret == ZCONNECTIONLOSS ||
-	ret == ZOPERATIONTIMEOUT) {
-      return false;
-    }
-
-    if (ret != ZOK) {
-      fatal("zookeeper_interest failed! (%s)", zerror(ret));
-    }
-
-    *ops = 0;
-
-    if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
-      *ops |= RDWR;
-    } else if (interest & ZOOKEEPER_READ) {
-      *ops |= RDONLY;
-    } else if (interest & ZOOKEEPER_WRITE) {
-      *ops |= WRONLY;
-    }
-
-    return true;
-  }
-
-  void process(int fd, int ops)
-  {
-    int events = 0;
-
-    if (ready(fd, RDONLY)) {
-      events |= ZOOKEEPER_READ;
-    } if (ready(fd, WRONLY)) {
-      events |= ZOOKEEPER_WRITE;
-    }
-
-    int ret = zookeeper_process(zh, events);
-
-    // If in some disconnected state, try again later.
-    if (ret == ZINVALIDSTATE || ret == ZCONNECTIONLOSS) {
-      return;
-    }
-
-    if (ret != ZOK && ret != ZNOTHING) {
-      fatal("zookeeper_process failed! (%s)", zerror(ret));
-    }
-  }
-
 private:
   friend class ZooKeeper;
 
+  const string hosts; // ZooKeeper host:port pairs.
+  const int timeout; // ZooKeeper session timeout.
+
   ZooKeeper* zk; // ZooKeeper instance.
-  string hosts; // ZooKeeper host:port pairs.
-  int timeout; // ZooKeeper session timeout.
   zhandle_t* zh; // ZooKeeper connection handle.
   
   Watcher* watcher; // Associated Watcher instance. 
@@ -498,82 +496,96 @@ private:
 };
 
 
-
 ZooKeeper::ZooKeeper(const string& hosts, int timeout, Watcher* watcher)
 {
-  impl = new ZooKeeperProcess(this, hosts, timeout, watcher);
+  impl = new ZooKeeperImpl(this, hosts, timeout, watcher);
+#ifndef USE_THREADED_ZOOKEEPER
   process::spawn(impl);
+#endif // USE_THREADED_ZOOKEEPER
 }
 
 
 ZooKeeper::~ZooKeeper()
 {
+#ifndef USE_THREADED_ZOOKEEPER
   process::post(impl->self(), process::TERMINATE);
   process::wait(impl->self());
+#endif // USE_THREADED_ZOOKEEPER
   delete impl;
 }
 
 
 int ZooKeeper::getState()
 {
-  ZooKeeperProcess* zooKeeperProcess = static_cast<ZooKeeperProcess*>(impl);
-  return zoo_state(zooKeeperProcess->zh);
+  return zoo_state(impl->zh);
 }
 
 
 int ZooKeeper::create(const string& path, const string& data,
                       const ACL_vector& acl, int flags, string* result)
 {
-  ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
-  PID<ZooKeeperProcess> pid(*process);
-  return process::call(pid, &ZooKeeperProcess::create,
+#ifndef USE_THREADED_ZOOKEEPER
+  return process::call(impl->self(), &ZooKeeperImpl::create,
                        cref(path), cref(data), cref(acl), flags, result);
+#else
+  return Future<int>(&impl->create(path, data, acl, flags, result)).get();
+#endif // USE_THREADED_ZOOKEEPER
 }
 
 
 int ZooKeeper::remove(const string& path, int version)
 {
-  ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
-  PID<ZooKeeperProcess> pid(*process);
-  return process::call(pid, &ZooKeeperProcess::remove,
+#ifndef USE_THREADED_ZOOKEEPER
+  return process::call(impl->self(), &ZooKeeperImpl::remove,
                        cref(path), version);
+#else
+  return Future<int>(&impl->remove(path, version)).get();
+#endif // USE_THREADED_ZOOKEEPER
 }
 
 
 int ZooKeeper::exists(const string& path, bool watch, Stat* stat)
 {
-  ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
-  PID<ZooKeeperProcess> pid(*process);
-  return process::call(pid, &ZooKeeperProcess::exists,
+#ifndef USE_THREADED_ZOOKEEPER
+  return process::call(impl->self(), &ZooKeeperImpl::exists,
                        cref(path), watch, stat);
+#else
+  return Future<int>(&impl->exists(path, watch, stat)).get();
+#endif // USE_THREADED_ZOOKEEPER
 }
 
 
 int ZooKeeper::get(const string& path, bool watch, string* result, Stat* stat)
 {
-  ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
-  PID<ZooKeeperProcess> pid(*process);
-  return process::call(pid, &ZooKeeperProcess::get,
+#ifndef USE_THREADED_ZOOKEEPER
+  return process::call(impl->self(), &ZooKeeperImpl::get,
                        cref(path), watch, result, stat);
+#else
+  return Future<int>(&impl->get(path, watch, result, stat)).get();
+#endif // USE_THREADED_ZOOKEEPER
 }
 
 
 int ZooKeeper::getChildren(const string& path, bool watch,
                            vector<string>* results)
 {
-  ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
-  PID<ZooKeeperProcess> pid(*process);
-  return process::call(pid, &ZooKeeperProcess::getChildren,
+#ifndef USE_THREADED_ZOOKEEPER
+  return process::call(impl->self(), &ZooKeeperImpl::getChildren,
                        cref(path), watch, results);
+#else
+  return Future<int>(&impl->getChildren(path, watch, results)).get();
+#endif // USE_THREADED_ZOOKEEPER
 }
 
 
 int ZooKeeper::set(const string& path, const string& data, int version)
 {
-  ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
-  PID<ZooKeeperProcess> pid(*process);
-  return process::call(pid, &ZooKeeperProcess::set,
+#ifndef USE_THREADED_ZOOKEEPER
+  return process::call(impl->self(), &ZooKeeperImpl::set,
                        cref(path), cref(data), version);
+#else
+  return Future<int>(&impl->set(path, data, version)).get();
+#endif // USE_THREADED_ZOOKEEPER
 }
 
 
@@ -581,22 +593,3 @@ const char* ZooKeeper::error(int ret) co
 {
   return zerror(ret);
 }
-
-
-// class TestWatcher : public Watcher
-// {
-// public:
-//   void process(ZooKeeper *zk, int type, int state, const string &path)
-//   {
-//     cout << "TestWatcher::process" << endl;
-//   }
-// };
-
-
-// int main(int argc, char** argv)
-// {
-//   TestWatcher watcher;
-//   ZooKeeper zk(argv[1], 10000, &watcher);
-//   sleep(10);
-//   return 0;
-// }

Modified: incubator/mesos/trunk/src/examples/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/Makefile.in?rev=1132297&r1=1132296&r2=1132297&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/Makefile.in (original)
+++ incubator/mesos/trunk/src/examples/Makefile.in Sun Jun  5 09:21:17 2011
@@ -75,7 +75,7 @@ LIBS += -lprotobuf -lglog -lprocess -lev
 
 # Add ZooKeeper if necessary.
 ifeq ($(WITH_ZOOKEEPER),1)
-  LIBS += -lzookeeper_st
+  LIBS += -lzookeeper_mt
 endif
 
 SCHED_EXES = $(BINDIR)/examples/cpp-test-framework	\

Modified: incubator/mesos/trunk/src/tests/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/Makefile.in?rev=1132297&r1=1132296&r2=1132297&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/Makefile.in (original)
+++ incubator/mesos/trunk/src/tests/Makefile.in Sun Jun  5 09:21:17 2011
@@ -74,7 +74,7 @@ LIBS += -lprotobuf -lglog -lgmock -lgtes
 
 # Add ZooKeeper if necessary.
 ifeq ($(WITH_ZOOKEEPER),1)
-  LIBS += -lzookeeper_st
+  LIBS += -lzookeeper_mt
 endif
 
 SCHED_LIB = $(LIBDIR)/libmesos_sched.a