You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:21:17 UTC
svn commit: r1132297 - in /incubator/mesos/trunk/src: Makefile.in
common/zookeeper.cpp examples/Makefile.in tests/Makefile.in
Author: benh
Date: Sun Jun 5 09:21:17 2011
New Revision: 1132297
URL: http://svn.apache.org/viewvc?rev=1132297&view=rev
Log:
Added support for using the "threaded" ZooKeeper library. This is to
avoid issues that arise because libprocess currently uses only a
single thread. The original, non-threaded, version of the code has
been kept around in case in the future (i.e., after libprocess uses
multiple threads) we want to return to the non-threaded ZooKeeper
library.
Modified:
incubator/mesos/trunk/src/Makefile.in
incubator/mesos/trunk/src/common/zookeeper.cpp
incubator/mesos/trunk/src/examples/Makefile.in
incubator/mesos/trunk/src/tests/Makefile.in
Modified: incubator/mesos/trunk/src/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.in?rev=1132297&r1=1132296&r2=1132297&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.in (original)
+++ incubator/mesos/trunk/src/Makefile.in Sun Jun 5 09:21:17 2011
@@ -92,7 +92,7 @@ LIBS += -lprotobuf -lglog -lprocess -lev
# Add ZooKeeper if necessary.
ifeq ($(WITH_ZOOKEEPER),1)
- LIBS += -lzookeeper_st
+ LIBS += -lzookeeper_mt
endif
MASTER_OBJ = master/master.o master/slaves_manager.o \
Modified: incubator/mesos/trunk/src/common/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/zookeeper.cpp?rev=1132297&r1=1132296&r2=1132297&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/common/zookeeper.cpp Sun Jun 5 09:21:17 2011
@@ -11,9 +11,12 @@
#include "common/fatal.hpp"
+#define USE_THREADED_ZOOKEEPER
+
using boost::cref;
using boost::tuple;
+using process::Future;
using process::PID;
using process::Process;
using process::Promise;
@@ -57,29 +60,22 @@ WatcherProcessManager* manager;
class WatcherProcess : public Process<WatcherProcess>
{
public:
- WatcherProcess(Watcher* _watcher) : watcher(_watcher) {}
+ WatcherProcess(Watcher* watcher) : watcher(watcher) {}
void event(ZooKeeper* zk, int type, int state, const string& path)
{
watcher->process(zk, type, state, path);
}
-protected:
- virtual void operator () ()
- {
- do serve();
- while (name() != process::TERMINATE);
- }
-
private:
- Watcher *watcher;
+ Watcher* watcher;
};
class WatcherProcessManager : public Process<WatcherProcessManager>
{
public:
- Promise<WatcherProcess*> create(Watcher* watcher)
+ WatcherProcess* create(Watcher* watcher)
{
WatcherProcess* process = new WatcherProcess(watcher);
spawn(process);
@@ -98,7 +94,7 @@ public:
}
}
- Promise<PID<WatcherProcess> > lookup(Watcher* watcher)
+ PID<WatcherProcess> lookup(Watcher* watcher)
{
if (processes.count(watcher) > 0) {
return processes[watcher]->self();
@@ -144,27 +140,37 @@ Watcher::~Watcher()
}
-class ZooKeeperImpl : public Process<ZooKeeperImpl> {};
-
-
-class ZooKeeperProcess : public ZooKeeperImpl
+#ifndef USE_THREADED_ZOOKEEPER
+class ZooKeeperImpl : public Process<ZooKeeperImpl>
+#else
+class ZooKeeperImpl
+#endif // USE_THREADED_ZOOKEEPER
{
public:
- ZooKeeperProcess(ZooKeeper* _zk, const string& _hosts, int _timeout,
- Watcher* _watcher)
- : zk(_zk), hosts(_hosts), timeout(_timeout), watcher(_watcher)
+ ZooKeeperImpl(ZooKeeper* zk, const string& hosts, int timeout,
+ Watcher* watcher)
+ : zk(zk), hosts(hosts), timeout(timeout), watcher(watcher)
{
if (watcher == NULL) {
fatalerror("cannot instantiate ZooKeeper with NULL watcher");
}
- zh = zookeeper_init(hosts.c_str(), watch, timeout, NULL, this, 0);
+ // Lookup PID of the WatcherProcess associated with the Watcher.
+ pid = call(manager->self(), &WatcherProcessManager::lookup, watcher);
+
+ // N.B. The Watcher and thus WatcherProcess may already be gone,
+ // in which case, each dispatch to the WatcherProcess that we do
+ // will just get dropped on the floor.
+
+ // TODO(benh): Link with WatcherProcess PID?
+
+ zh = zookeeper_init(hosts.c_str(), event, timeout, NULL, this, 0);
if (zh == NULL) {
fatalerror("failed to create ZooKeeper (zookeeper_init)");
}
}
- ~ZooKeeperProcess()
+ ~ZooKeeperImpl()
{
int ret = zookeeper_close(zh);
if (ret != ZOK) {
@@ -278,20 +284,10 @@ public:
return promise;
}
+#ifndef USE_THREADED_ZOOKEEPER
protected:
virtual void operator () ()
{
- // Lookup and cache the PID of the WatcherProcess associated with
- // our Watcher before we yield control via calling
- // zookeeper_process so that Watcher callbacks can occur.
- pid = call(manager->self(), &WatcherProcessManager::lookup, watcher);
-
- // N.B. The Watcher and thus WatcherProcess may already be gone,
- // in which case, each dispatch to the WatcherProcess that we do
- // will just get dropped on the floor.
-
- // TODO(benh): Link with WatcherProcess PID?
-
while (true) {
int fd;
int ops;
@@ -325,13 +321,66 @@ protected:
}
}
+ bool prepare(int* fd, int* ops, timeval* tv)
+ {
+ int interest = 0;
+
+ int ret = zookeeper_interest(zh, fd, &interest, tv);
+
+ // If in some disconnected state, try again later.
+ if (ret == ZINVALIDSTATE ||
+ ret == ZCONNECTIONLOSS ||
+ ret == ZOPERATIONTIMEOUT) {
+ return false;
+ }
+
+ if (ret != ZOK) {
+ fatal("zookeeper_interest failed! (%s)", zerror(ret));
+ }
+
+ *ops = 0;
+
+ if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
+ *ops |= RDWR;
+ } else if (interest & ZOOKEEPER_READ) {
+ *ops |= RDONLY;
+ } else if (interest & ZOOKEEPER_WRITE) {
+ *ops |= WRONLY;
+ }
+
+ return true;
+ }
+
+ void process(int fd, int ops)
+ {
+ int events = 0;
+
+ if (ready(fd, RDONLY)) {
+ events |= ZOOKEEPER_READ;
+ } if (ready(fd, WRONLY)) {
+ events |= ZOOKEEPER_WRITE;
+ }
+
+ int ret = zookeeper_process(zh, events);
+
+ // If in some disconnected state, try again later.
+ if (ret == ZINVALIDSTATE || ret == ZCONNECTIONLOSS) {
+ return;
+ }
+
+ if (ret != ZOK && ret != ZNOTHING) {
+ fatal("zookeeper_process failed! (%s)", zerror(ret));
+ }
+ }
+#endif // USE_THREADED_ZOOKEEPER
+
private:
- static void watch(zhandle_t* zh, int type, int state,
+ static void event(zhandle_t* zh, int type, int state,
const char* path, void* ctx)
{
- ZooKeeperProcess* zooKeeperProcess = static_cast<ZooKeeperProcess*>(ctx);
- process::dispatch(zooKeeperProcess->pid, &WatcherProcess::event,
- zooKeeperProcess->zk, type, state, string(path));
+ ZooKeeperImpl* impl = static_cast<ZooKeeperImpl*>(ctx);
+ process::dispatch(impl->pid, &WatcherProcess::event,
+ impl->zk, type, state, string(path));
}
static void voidCompletion(int ret, const void *data)
@@ -346,7 +395,6 @@ private:
delete args;
}
-
static void stringCompletion(int ret, const char* value, const void* data)
{
const tuple<Promise<int>, string*> *args =
@@ -357,7 +405,7 @@ private:
if (ret == 0) {
if (result != NULL) {
- result->assign(value);
+ result->assign(value);
}
}
@@ -366,7 +414,6 @@ private:
delete args;
}
-
static void statCompletion(int ret, const Stat* stat, const void* data)
{
const tuple<Promise<int>, Stat*>* args =
@@ -377,7 +424,7 @@ private:
if (ret == 0) {
if (stat_result != NULL) {
- *stat_result = *stat;
+ *stat_result = *stat;
}
}
@@ -386,8 +433,9 @@ private:
delete args;
}
+
static void dataCompletion(int ret, const char* value, int value_len,
- const Stat* stat, const void* data)
+ const Stat* stat, const void* data)
{
const tuple<Promise<int>, string*, Stat*>* args =
reinterpret_cast<const tuple<Promise<int>, string*, Stat*>*>(data);
@@ -398,11 +446,11 @@ private:
if (ret == 0) {
if (result != NULL) {
- result->assign(value, value_len);
+ result->assign(value, value_len);
}
if (stat_result != NULL) {
- *stat_result = *stat;
+ *stat_result = *stat;
}
}
@@ -411,8 +459,9 @@ private:
delete args;
}
+
static void stringsCompletion(int ret, const String_vector* values,
- const void* data)
+ const void* data)
{
const tuple<Promise<int>, vector<string>*>* args =
reinterpret_cast<const tuple<Promise<int>, vector<string>*>*>(data);
@@ -422,9 +471,9 @@ private:
if (ret == 0) {
if (results != NULL) {
- for (int i = 0; i < values->count; i++) {
- results->push_back(values->data[i]);
- }
+ for (int i = 0; i < values->count; i++) {
+ results->push_back(values->data[i]);
+ }
}
}
@@ -433,64 +482,13 @@ private:
delete args;
}
- bool prepare(int* fd, int* ops, timeval* tv)
- {
- int interest = 0;
-
- int ret = zookeeper_interest(zh, fd, &interest, tv);
-
- // If in some disconnected state, try again later.
- if (ret == ZINVALIDSTATE ||
- ret == ZCONNECTIONLOSS ||
- ret == ZOPERATIONTIMEOUT) {
- return false;
- }
-
- if (ret != ZOK) {
- fatal("zookeeper_interest failed! (%s)", zerror(ret));
- }
-
- *ops = 0;
-
- if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
- *ops |= RDWR;
- } else if (interest & ZOOKEEPER_READ) {
- *ops |= RDONLY;
- } else if (interest & ZOOKEEPER_WRITE) {
- *ops |= WRONLY;
- }
-
- return true;
- }
-
- void process(int fd, int ops)
- {
- int events = 0;
-
- if (ready(fd, RDONLY)) {
- events |= ZOOKEEPER_READ;
- } if (ready(fd, WRONLY)) {
- events |= ZOOKEEPER_WRITE;
- }
-
- int ret = zookeeper_process(zh, events);
-
- // If in some disconnected state, try again later.
- if (ret == ZINVALIDSTATE || ret == ZCONNECTIONLOSS) {
- return;
- }
-
- if (ret != ZOK && ret != ZNOTHING) {
- fatal("zookeeper_process failed! (%s)", zerror(ret));
- }
- }
-
private:
friend class ZooKeeper;
+ const string hosts; // ZooKeeper host:port pairs.
+ const int timeout; // ZooKeeper session timeout.
+
ZooKeeper* zk; // ZooKeeper instance.
- string hosts; // ZooKeeper host:port pairs.
- int timeout; // ZooKeeper session timeout.
zhandle_t* zh; // ZooKeeper connection handle.
Watcher* watcher; // Associated Watcher instance.
@@ -498,82 +496,96 @@ private:
};
-
ZooKeeper::ZooKeeper(const string& hosts, int timeout, Watcher* watcher)
{
- impl = new ZooKeeperProcess(this, hosts, timeout, watcher);
+ impl = new ZooKeeperImpl(this, hosts, timeout, watcher);
+#ifndef USE_THREADED_ZOOKEEPER
process::spawn(impl);
+#endif // USE_THREADED_ZOOKEEPER
}
ZooKeeper::~ZooKeeper()
{
+#ifndef USE_THREADED_ZOOKEEPER
process::post(impl->self(), process::TERMINATE);
process::wait(impl->self());
+#endif // USE_THREADED_ZOOKEEPER
delete impl;
}
int ZooKeeper::getState()
{
- ZooKeeperProcess* zooKeeperProcess = static_cast<ZooKeeperProcess*>(impl);
- return zoo_state(zooKeeperProcess->zh);
+ return zoo_state(impl->zh);
}
int ZooKeeper::create(const string& path, const string& data,
const ACL_vector& acl, int flags, string* result)
{
- ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
- PID<ZooKeeperProcess> pid(*process);
- return process::call(pid, &ZooKeeperProcess::create,
+#ifndef USE_THREADED_ZOOKEEPER
+ return process::call(impl->self(), &ZooKeeperImpl::create,
cref(path), cref(data), cref(acl), flags, result);
+#else
+ return Future<int>(&impl->create(path, data, acl, flags, result)).get();
+#endif // USE_THREADED_ZOOKEEPER
}
int ZooKeeper::remove(const string& path, int version)
{
- ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
- PID<ZooKeeperProcess> pid(*process);
- return process::call(pid, &ZooKeeperProcess::remove,
+#ifndef USE_THREADED_ZOOKEEPER
+ return process::call(impl->self(), &ZooKeeperImpl::remove,
cref(path), version);
+#else
+ return Future<int>(&impl->remove(path, version)).get();
+#endif // USE_THREADED_ZOOKEEPER
}
int ZooKeeper::exists(const string& path, bool watch, Stat* stat)
{
- ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
- PID<ZooKeeperProcess> pid(*process);
- return process::call(pid, &ZooKeeperProcess::exists,
+#ifndef USE_THREADED_ZOOKEEPER
+ return process::call(impl->self(), &ZooKeeperImpl::exists,
cref(path), watch, stat);
+#else
+ return Future<int>(&impl->exists(path, watch, stat)).get();
+#endif // USE_THREADED_ZOOKEEPER
}
int ZooKeeper::get(const string& path, bool watch, string* result, Stat* stat)
{
- ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
- PID<ZooKeeperProcess> pid(*process);
- return process::call(pid, &ZooKeeperProcess::get,
+#ifndef USE_THREADED_ZOOKEEPER
+ return process::call(impl->self(), &ZooKeeperImpl::get,
cref(path), watch, result, stat);
+#else
+ return Future<int>(&impl->get(path, watch, result, stat)).get();
+#endif // USE_THREADED_ZOOKEEPER
}
int ZooKeeper::getChildren(const string& path, bool watch,
vector<string>* results)
{
- ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
- PID<ZooKeeperProcess> pid(*process);
- return process::call(pid, &ZooKeeperProcess::getChildren,
+#ifndef USE_THREADED_ZOOKEEPER
+ return process::call(impl->self(), &ZooKeeperImpl::getChildren,
cref(path), watch, results);
+#else
+ return Future<int>(&impl->getChildren(path, watch, results)).get();
+#endif // USE_THREADED_ZOOKEEPER
}
int ZooKeeper::set(const string& path, const string& data, int version)
{
- ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
- PID<ZooKeeperProcess> pid(*process);
- return process::call(pid, &ZooKeeperProcess::set,
+#ifndef USE_THREADED_ZOOKEEPER
+ return process::call(impl->self(), &ZooKeeperImpl::set,
cref(path), cref(data), version);
+#else
+ return Future<int>(&impl->set(path, data, version)).get();
+#endif // USE_THREADED_ZOOKEEPER
}
@@ -581,22 +593,3 @@ const char* ZooKeeper::error(int ret) co
{
return zerror(ret);
}
-
-
-// class TestWatcher : public Watcher
-// {
-// public:
-// void process(ZooKeeper *zk, int type, int state, const string &path)
-// {
-// cout << "TestWatcher::process" << endl;
-// }
-// };
-
-
-// int main(int argc, char** argv)
-// {
-// TestWatcher watcher;
-// ZooKeeper zk(argv[1], 10000, &watcher);
-// sleep(10);
-// return 0;
-// }
Modified: incubator/mesos/trunk/src/examples/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/Makefile.in?rev=1132297&r1=1132296&r2=1132297&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/Makefile.in (original)
+++ incubator/mesos/trunk/src/examples/Makefile.in Sun Jun 5 09:21:17 2011
@@ -75,7 +75,7 @@ LIBS += -lprotobuf -lglog -lprocess -lev
# Add ZooKeeper if necessary.
ifeq ($(WITH_ZOOKEEPER),1)
- LIBS += -lzookeeper_st
+ LIBS += -lzookeeper_mt
endif
SCHED_EXES = $(BINDIR)/examples/cpp-test-framework \
Modified: incubator/mesos/trunk/src/tests/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/Makefile.in?rev=1132297&r1=1132296&r2=1132297&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/Makefile.in (original)
+++ incubator/mesos/trunk/src/tests/Makefile.in Sun Jun 5 09:21:17 2011
@@ -74,7 +74,7 @@ LIBS += -lprotobuf -lglog -lgmock -lgtes
# Add ZooKeeper if necessary.
ifeq ($(WITH_ZOOKEEPER),1)
- LIBS += -lzookeeper_st
+ LIBS += -lzookeeper_mt
endif
SCHED_LIB = $(LIBDIR)/libmesos_sched.a