You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:27:29 UTC
svn commit: r1131595 - in /incubator/mesos/trunk/src: zookeeper_master.hpp
zookeeper_slave.hpp
Author: benh
Date: Sun Jun 5 03:27:29 2011
New Revision: 1131595
URL: http://svn.apache.org/viewvc?rev=1131595&view=rev
Log:
Very primitive ZooKeeper implementation for running standalone master and slaves.
Added:
incubator/mesos/trunk/src/zookeeper_master.hpp
incubator/mesos/trunk/src/zookeeper_slave.hpp
Added: incubator/mesos/trunk/src/zookeeper_master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper_master.hpp?rev=1131595&view=auto
==============================================================================
--- incubator/mesos/trunk/src/zookeeper_master.hpp (added)
+++ incubator/mesos/trunk/src/zookeeper_master.hpp Sun Jun 5 03:27:29 2011
@@ -0,0 +1,128 @@
+#include <zookeeper.h>
+
+using namespace nexus;
+using namespace nexus::internal;
+using namespace nexus::internal::master;
+
+// A process for the master to communicate with ZooKeeper.
+class ZooKeeperProcessForMaster : public Tuple<Process>
+{
+private:
+ static zhandle_t* zh; // ZooKeeper connection handle.
+ static string zookeeper; // ZooKeeper server host:port.
+
+public:
+ static string master; // Master PID as string.
+
+protected:
+ void operator () ()
+ {
+ while (true) {
+ int fd;
+ int interest;
+ struct timeval tv;
+
+ if (zookeeper_interest(zh, &fd, &interest, &tv) != ZOK)
+ fatal("zookeeper_interest failed!");
+
+ int op = 0;
+
+ if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
+ op |= RDWR;
+ } else if (interest & ZOOKEEPER_READ) {
+ op |= RDONLY;
+ } else if (interest & ZOOKEEPER_WRITE) {
+ op |= WRONLY;
+ }
+
+ if (await(fd, op, tv)) {
+ int events = 0;
+ if (ready(fd, RDONLY))
+ events |= ZOOKEEPER_READ;
+ if (ready(fd, WRONLY))
+ events |= ZOOKEEPER_WRITE;
+
+ int ret = zookeeper_process(zh, events);
+
+ if (ret != ZOK && ret != ZNOTHING) {
+ fatal("zookeeper_process failed! (%s)", zerror(ret));
+ }
+ } else {
+ LOG(WARNING) << "ZooKeeperProcess interrupted during await ... ";
+ }
+ }
+ }
+
+ static void create_completion(int ret, const char* name, const void* data)
+ {
+ string* znode = static_cast<string*>(const_cast<void*>(data));
+ if (ret != ZOK)
+ fatal("zookeeper create failed on '%s': %s", znode->c_str(), zerror(ret));
+ delete znode;
+ }
+
+ static void create_completion_ignore_node_exists(int ret,
+ const char* name,
+ const void* data)
+ {
+ string* znode = static_cast<string*>(const_cast<void*>(data));
+ if (ret != ZOK && ret != ZNODEEXISTS)
+ fatal("zookeeper create failed on '%s': %s", znode->c_str(), zerror(ret));
+ delete znode;
+ }
+
+ static void watcher(zhandle_t *zzh, int type, int state,
+ const char *path, void* context)
+ {
+ CHECK(zh != NULL);
+
+ if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
+ // Register master.
+ static string basename = "/home/nexus";
+ static string delimiter = "/";
+ size_t index = basename.find(delimiter, 0);
+
+ int ret;
+
+ // Create basename znodes as necessary.
+ while (index < string::npos) {
+ index = basename.find(delimiter, index+1);
+ string* prefix = new string(basename.substr(0, index));
+ ret = zoo_acreate(zh, prefix->c_str(), "", 0, &ZOO_CREATOR_ALL_ACL,
+ 0, create_completion_ignore_node_exists, prefix);
+ if (ret != ZOK)
+ fatal("zoo_acreate failed! (%s)", zerror(ret));
+ }
+
+ string* znode = new string(basename + delimiter + "master");
+
+ ret = zoo_acreate(zh, znode->c_str(), master.c_str(), master.length(),
+ &ZOO_CREATOR_ALL_ACL, ZOO_EPHEMERAL,
+ create_completion, znode);
+
+ if (ret != ZOK)
+ fatal("zoo_acreate failed! (%s)", zerror(ret));
+ } else {
+ fatal("unhandled ZooKeeper event!");
+ }
+ }
+
+public:
+ ZooKeeperProcessForMaster(const string& _master, const string& _zookeeper)
+ {
+ master = _master;
+ zookeeper = _zookeeper;
+ zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, NULL, 0);
+ if (zh == NULL)
+ fatal("zookeeper_init failed!");
+ }
+
+ ~ZooKeeperProcessForMaster() {}
+};
+
+
+zhandle_t* ZooKeeperProcessForMaster::zh = NULL;
+string ZooKeeperProcessForMaster::zookeeper = "";
+string ZooKeeperProcessForMaster::master = "";
+
+
Added: incubator/mesos/trunk/src/zookeeper_slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper_slave.hpp?rev=1131595&view=auto
==============================================================================
--- incubator/mesos/trunk/src/zookeeper_slave.hpp (added)
+++ incubator/mesos/trunk/src/zookeeper_slave.hpp Sun Jun 5 03:27:29 2011
@@ -0,0 +1,102 @@
+#include <zookeeper.h>
+
+using namespace nexus;
+using namespace nexus::internal;
+using namespace nexus::internal::slave;
+
+// A process for the slave to communicate with ZooKeeper.
+class ZooKeeperProcessForSlave : public Tuple<Process>
+{
+private:
+ static zhandle_t* zh; // ZooKeeper connection handle.
+ static string zookeeper; // ZooKeeper server host:port.
+
+public:
+ static string master; // Master PID as string.
+
+protected:
+ void operator () ()
+ {
+ while (true) {
+ if (!master.empty())
+ return;
+
+ int fd;
+ int interest;
+ struct timeval tv;
+
+ if (zookeeper_interest(zh, &fd, &interest, &tv) != ZOK)
+ fatal("zookeeper_interest failed!");
+
+ int op = 0;
+
+ if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
+ op |= RDWR;
+ } else if (interest & ZOOKEEPER_READ) {
+ op |= RDONLY;
+ } else if (interest & ZOOKEEPER_WRITE) {
+ op |= WRONLY;
+ }
+
+ if (await(fd, op, tv)) {
+ int events = 0;
+ if (ready(fd, RDONLY))
+ events |= ZOOKEEPER_READ;
+ if (ready(fd, WRONLY))
+ events |= ZOOKEEPER_WRITE;
+
+ int ret = zookeeper_process(zh, events);
+
+ if (ret != ZOK && ret != ZNOTHING) {
+ fatal("zookeeper_process failed! (%s)", zerror(ret));
+ }
+ } else {
+ LOG(WARNING) << "ZooKeeperProcess interrupted during await ... ";
+ }
+ }
+ }
+
+ static void get_completion(int ret, const char* value, int len,
+ const struct Stat* stat, const void* data)
+ {
+ string* znode = static_cast<string*>(const_cast<void*>(data));
+ if (ret != ZOK)
+ fatal("zookeeper get failed on '%s': %s", znode->c_str(), zerror(ret));
+ master = string(value, len);
+ delete znode;
+ }
+
+ static void watcher(zhandle_t *zzh, int type, int state,
+ const char *path, void* context)
+ {
+ CHECK(zh != NULL);
+
+ if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
+ // Lookup master.
+ string* znode = new string("/home/nexus/master");
+
+ int ret = zoo_aget(zh, znode->c_str(), 0, get_completion, znode);
+
+ if (ret != ZOK)
+ fatal("zoo_aget failed! (%s)", zerror(ret));
+ } else {
+ fatal("unhandled ZooKeeper event!");
+ }
+ }
+
+public:
+ ZooKeeperProcessForSlave(const string& _zookeeper)
+ {
+ zookeeper = _zookeeper;
+ zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, NULL, 0);
+ if (zh == NULL)
+ fatal("zookeeper_init failed!");
+ }
+
+ ~ZooKeeperProcessForSlave() {}
+};
+
+
+zhandle_t* ZooKeeperProcessForSlave::zh = NULL;
+string ZooKeeperProcessForSlave::zookeeper = "";
+string ZooKeeperProcessForSlave::master = "";