You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:24:14 UTC
svn commit: r1131740 - in /incubator/mesos/trunk/src: master_detector.cpp
master_detector.hpp zookeeper.cpp zookeeper.hpp
Author: benh
Date: Sun Jun 5 05:24:14 2011
New Revision: 1131740
URL: http://svn.apache.org/viewvc?rev=1131740&view=rev
Log:
New master_detector implementation that uses libprocess ZooKeeper API.
Added:
incubator/mesos/trunk/src/master_detector.cpp
incubator/mesos/trunk/src/master_detector.hpp
Modified:
incubator/mesos/trunk/src/zookeeper.cpp
incubator/mesos/trunk/src/zookeeper.hpp
Added: incubator/mesos/trunk/src/master_detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master_detector.cpp?rev=1131740&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master_detector.cpp (added)
+++ incubator/mesos/trunk/src/master_detector.cpp Sun Jun 5 05:24:14 2011
@@ -0,0 +1,166 @@
+#include <unistd.h>
+
+#include <process.hpp>
+
+#include <iostream>
+#include <climits>
+#include <cstdlib>
+#include <stdexcept>
+
+#include <glog/logging.h>
+
+#include <boost/lexical_cast.hpp>
+
+#include "fatal.hpp"
+#include "master_detector.hpp"
+#include "messages.hpp"
+
+using namespace nexus;
+using namespace nexus::internal;
+
+using boost::lexical_cast;
+
+
+MasterDetector::MasterDetector(const string &_servers, const string &_znode,
+ const PID &_pid, bool _contend)
+ : servers(_servers), znode(_znode), pid(_pid), contend(_contend)
+{
+ zk = new ZooKeeper(servers, 10000, this);
+}
+
+
+MasterDetector::~MasterDetector()
+{
+ if (zk != NULL)
+ delete zk;
+}
+
+
+void MasterDetector::process(ZooKeeper *zk, int type, int state,
+ const string &path)
+{
+ int ret;
+ string result;
+
+ static const string delimiter = "/";
+
+ if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
+ // Create directory path znodes as necessary.
+ size_t index = znode.find(delimiter, 0);
+ while (index < string::npos) {
+ index = znode.find(delimiter, index+1);
+ string prefix = znode.substr(0, index);
+ ret = zk->create(prefix, "", ZOO_CREATOR_ALL_ACL,
+ 0, &result);
+ if (ret != ZOK && ret != ZNODEEXISTS)
+ fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+ }
+
+ // Wierdness in ZooKeeper timing, let's check that everything is created.
+ ret = zk->get(znode, false, &result, NULL);
+
+ if (ret != ZOK)
+ fatal("ZooKeeper not responding correctly (%s). "
+ "Make sure ZooKeeper is running on: %s",
+ zk->error(ret), servers.c_str());
+
+ if (contend) {
+ // We use the contend with the pid given in constructor.
+ ret = zk->create(znode, pid, ZOO_CREATOR_ALL_ACL,
+ ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
+
+ if (ret != ZOK)
+ fatal("ZooKeeper not responding correctly (%s). "
+ "Make sure ZooKeeper is running on: %s",
+ zk->error(ret), servers.c_str());
+
+ setMySeq(result);
+ LOG(INFO) << "Created ephemeral/sequence:" << getMySeq();
+
+ const string &s =
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<GOT_MASTER_SEQ>(getMySeq()));
+ Process::post(pid, GOT_MASTER_SEQ, s.data(), s.size());
+ }
+
+ detectMaster();
+ } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHILD_EVENT)) {
+ detectMaster();
+ } else {
+ LOG(INFO) << "Unimplemented watch event: (state is "
+ << state << " and type is " << type << ")";
+ }
+}
+
+
+void MasterDetector::detectMaster()
+{
+ vector<string> results;
+
+ int ret = zk->getChildren(znode, true, &results);
+
+ if (ret != ZOK)
+ LOG(ERROR) << "failed to get masters: " << zk->error(ret);
+ else
+ LOG(INFO) << "found " << results.size() << " registered masters";
+
+ string masterSeq;
+ long min = LONG_MAX;
+ foreach (const string &result, results) {
+ int i = lexical_cast<int>(result);
+ if (i < min) {
+ min = i;
+ masterSeq = result;
+ }
+ }
+
+ // No master present (lost or possibly hasn't come up yet).
+ if (masterSeq.empty()) {
+ const string &s =
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<NO_MASTER_DETECTED>());
+ Process::post(pid, NO_MASTER_DETECTED, s.data(), s.size());
+ } else if (masterSeq != currentMasterSeq) {
+ currentMasterSeq = masterSeq;
+ currentMasterPID = lookupMasterPID(masterSeq);
+
+ // While trying to get the master PID, master might have crashed,
+ // so PID might be empty.
+ if (currentMasterPID == PID()) {
+ const string &s =
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<NO_MASTER_DETECTED>());
+ Process::post(pid, NO_MASTER_DETECTED, s.data(), s.size());
+ } else {
+ const string &s =
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<NEW_MASTER_DETECTED>(currentMasterSeq, currentMasterPID));
+ Process::post(pid, NEW_MASTER_DETECTED, s.data(), s.size());
+ }
+ }
+}
+
+
+PID MasterDetector::lookupMasterPID(const string &seq) const
+{
+ CHECK(!seq.empty());
+
+ int ret;
+ string result;
+
+ ret = zk->get(znode + seq, false, &result, NULL);
+
+ if (ret != ZOK)
+ LOG(ERROR) << "failed to fetch new master pid: " << zk->error(ret);
+ else
+ LOG(INFO) << "got new master pid: " << result;
+
+ // TODO(benh): Automatic cast!
+ return make_pid(result.c_str());
+}
+
+
+string MasterDetector::getCurrentMasterSeq() const {
+ return currentMasterSeq;
+}
+
+
+PID MasterDetector::getCurrentMasterPID() const {
+ return currentMasterPID;
+}
Added: incubator/mesos/trunk/src/master_detector.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master_detector.hpp?rev=1131740&view=auto
==============================================================================
--- incubator/mesos/trunk/src/master_detector.hpp (added)
+++ incubator/mesos/trunk/src/master_detector.hpp Sun Jun 5 05:24:14 2011
@@ -0,0 +1,105 @@
+#ifndef __MASTER_DETECTOR_HPP__
+#define __MASTER_DETECTOR_HPP__
+
+#include <string>
+#include <iostream>
+#include <unistd.h>
+#include <climits>
+#include <cstdlib>
+
+#include "zookeeper.hpp"
+
+using namespace std;
+
+
+/**
+ * Implements functionality for a) detecting masters b) contending to be a master.
+ */
+class MasterDetector : public Watcher {
+public:
+ /**
+ * Connects to ZooKeeper and possibly contends for master.
+ *
+ * @param server comma separated list of zookeeper server host:port
+ * pairs
+ * @param znode ZooKeeper node (directory)
+ * @param pid libprocess pid that we should send messages to (and to
+ * contend)
+ * @param contend true if should contend to be master (not needed
+ * for slaves and frameworks)
+ */
+ MasterDetector(const string &server, const string &znode,
+ const PID &_pid, bool contend = false);
+
+ ~MasterDetector();
+
+ /**
+ * ZooKeeper watcher.
+ */
+ virtual void process(ZooKeeper *zk, int type, int state, const string &path);
+
+ /**
+ * @return ZooKeeper unique sequence number of the current master.
+ */
+ string getCurrentMasterSeq() const;
+
+ /**
+ * @return libprocess PID of the current master.
+ */
+ PID getCurrentMasterPID() const;
+
+ /**
+ * @return Unique ZooKeeper sequence number (only if contending for master, otherwise "").
+ */
+ string getMySeq() const
+ {
+ return mySeq;
+ }
+
+ /**
+ * Adjusts ZooKeepers level of debugging output.
+ * @param quiet true makes ZK quiet, whereas false makes ZK output DEBUG messages
+ */
+ static void setQuiet(bool quiet)
+ {
+ zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
+ }
+
+private:
+ void setMySeq(const string &s)
+ {
+ string seq = s;
+ // Converts "/nxmaster/000000131" to "000000131".
+ int pos;
+ if ((pos = seq.find_last_of('/')) != string::npos) {
+ mySeq = seq.erase(0, pos+1);
+ } else
+ mySeq = "";
+ }
+
+ /*
+ * TODO(alig): Comment this object.
+ */
+ void detectMaster();
+
+ /*
+ * TODO(alig): Comment this object.
+ */
+ PID lookupMasterPID(const string &seq) const;
+
+ string servers;
+ string znode;
+ PID pid;
+ bool contend;
+
+ ZooKeeper *zk;
+
+ // Our sequence number if contending.
+ string mySeq;
+
+ string currentMasterSeq;
+ PID currentMasterPID;
+};
+
+#endif /* __MASTER_DETECTOR_HPP__ */
+
Modified: incubator/mesos/trunk/src/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.cpp?rev=1131740&r1=1131739&r2=1131740&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/zookeeper.cpp Sun Jun 5 05:24:14 2011
@@ -14,6 +14,7 @@ using std::cout;
using std::endl;
using std::map;
using std::string;
+using std::vector;
/* Forward (and first) declaration of ZooKeeperProcess. */
@@ -41,6 +42,7 @@ enum {
REMOVE, // Perform an asynchronous remove (delete).
EXISTS, // Perform an asysnchronous exists.
GET, // Perform an asynchronous get.
+ GET_CHILDREN, // Perform an asynchronous get_children.
SET, // Perform an asynchronous set.
};
@@ -105,6 +107,18 @@ struct GetCall
};
+/* GetChildren "message" for performing ZooKeeper::getChildren. */
+struct GetChildrenCall
+{
+ int ret;
+ const string *path;
+ bool watch;
+ vector<string> *results;
+ PID from;
+ ZooKeeperProcess *zooKeeperProcess;
+};
+
+
/* Set "message" for performing ZooKeeper::set. */
struct SetCall
{
@@ -346,6 +360,20 @@ private:
getCall->zooKeeperProcess->send(getCall->from, COMPLETED);
}
+ void getChildrenCompletion(int ret, const String_vector *results,
+ const void *data)
+ {
+ GetChildrenCall *getChildrenCall =
+ static_cast<GetChildrenCall *>(const_cast<void *>((data)));
+ getChildrenCall->ret = ret;
+ if (getChildrenCall->results != NULL && results != NULL) {
+ for (int i = 0; i < results->count; i++) {
+ getChildrenCall->results->push_back(results->data[i]);
+ }
+ }
+ getChildrenCall->zooKeeperProcess->send(getChildrenCall->from, COMPLETED);
+ }
+
static void setCompletion(int ret, const Stat *stat, const void *data)
{
SetCall *setCall =
@@ -487,6 +515,21 @@ protected:
}
break;
}
+ case GET_CHILDREN: {
+ GetChildrenCall *getChildrenCall =
+ *reinterpret_cast<GetChildrenCall **>(const_cast<char *>(body(NULL)));
+ getChildrenCall->from = from();
+ getChildrenCall->zooKeeperProcess = this;
+ int ret = zoo_aget_children(zh, getChildrenCall->path->c_str(),
+ getChildrenCall->watch,
+ getChildrenCompletion,
+ getChildrenCall);
+ if (ret != ZOK) {
+ getChildrenCall->ret = ret;
+ send(getChildrenCall->from, COMPLETED);
+ }
+ break;
+ }
case SET: {
SetCall *setCall =
*reinterpret_cast<SetCall **>(const_cast<char *>(body(NULL)));
@@ -722,6 +765,46 @@ int ZooKeeper::get(const string &path,
}
+int ZooKeeper::getChildren(const string &path,
+ bool watch,
+ vector<string> *results)
+{
+ GetChildrenCall getChildrenCall;
+ getChildrenCall.path = &path;
+ getChildrenCall.watch = watch;
+ getChildrenCall.results = results;
+
+ class GetChildrenCallProcess : public Process
+ {
+ private:
+ ZooKeeperProcess *zooKeeperProcess;
+ GetChildrenCall *getChildrenCall;
+
+ protected:
+ void operator () ()
+ {
+ if (call(zooKeeperProcess->getPID(),
+ GET,
+ reinterpret_cast<char *>(&getChildrenCall),
+ sizeof(GetCall *)) != COMPLETED)
+ getChildrenCall->ret = ZSYSTEMERROR;
+ }
+
+ public:
+ GetChildrenCallProcess(ZooKeeperProcess *_zooKeeperProcess,
+ GetChildrenCall *_getChidlrenCall)
+ : zooKeeperProcess(_zooKeeperProcess),
+ getChildrenCall(_getChidlrenCall)
+ {}
+ } getChildrenCallProcess(static_cast<ZooKeeperProcess *>(impl),
+ &getChildrenCall);
+
+ Process::wait(Process::spawn(&getChildrenCallProcess));
+
+ return getChildrenCall.ret;
+}
+
+
int ZooKeeper::set(const string &path,
const string &data,
int version)
Modified: incubator/mesos/trunk/src/zookeeper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.hpp?rev=1131740&r1=1131739&r2=1131740&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.hpp (original)
+++ incubator/mesos/trunk/src/zookeeper.hpp Sun Jun 5 05:24:14 2011
@@ -19,6 +19,7 @@
#include <zookeeper.h>
#include <string>
+#include <vector>
/* Forward declarations of classes we are using. */
@@ -230,6 +231,26 @@ public:
Stat *stat);
/**
+ * \brief lists the children of a node synchronously.
+ *
+ * \param path the name of the node. Expressed as a file name with
+ * slashes separating ancestors of the node.
+ * \param watch if true, a watch will be set at the server to notify
+ * the client if the node changes.
+ * \param results return value of children paths.
+ * \return the return code of the function.
+ * ZOK operation completed successfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ int getChildren(const std::string &path,
+ bool watch,
+ std::vector<std::string> *results);
+
+ /**
* \brief sets the data associated with a node.
*
* \param path the name of the node. Expressed as a file name with slashes