You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:02:18 UTC
svn commit: r1131727 - in /incubator/mesos/trunk/src: master.cpp slave.cpp
Author: benh
Date: Sun Jun 5 05:02:18 2011
New Revision: 1131727
URL: http://svn.apache.org/viewvc?rev=1131727&view=rev
Log:
Fixed ZooKeeper bug that occurs when a master restarts too quickly and the ephemeral nodes have not been cleaned up.
Modified:
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/slave.cpp
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131727&r1=1131726&r2=1131727&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 05:02:18 2011
@@ -1,10 +1,15 @@
-#include "allocator.hpp"
-#include "master.hpp"
-#include "master_webui.hpp"
-#include "allocator_factory.hpp"
+#include "config.hpp" // Need to define first to get USING_ZOOKEEPER
#include <glog/logging.h>
+#ifdef USING_ZOOKEEPER
+#include <zookeeper.hpp>
+#endif
+
+#include "allocator.hpp"
+#include "allocator_factory.hpp"
+#include "master.hpp"
+#include "master_webui.hpp"
using std::endl;
using std::max;
@@ -26,8 +31,88 @@ using namespace nexus::internal;
using namespace nexus::internal::master;
+/* List of ZooKeeper host:port pairs (from master_main.cpp/local.cpp). */
+extern string zookeeper;
+
namespace {
+#ifdef USING_ZOOKEEPER
+class MasterWatcher : public Watcher
+{
+private:
+ Master *master;
+
+public:
+ void process(ZooKeeper *zk, int type, int state, const string &path)
+ {
+ string znode = "/home/nexus/master";
+ string dirname = "/home/nexus";
+ string delimiter = "/";
+ string contents = "";
+
+ int ret;
+ string result;
+
+ if ((state == ZOO_CONNECTED_STATE) &&
+ (type == ZOO_SESSION_EVENT)) {
+ // Create directory path znodes as necessary.
+ size_t index = dirname.find(delimiter, 0);
+ while (index < string::npos) {
+ index = dirname.find(delimiter, index+1);
+ string prefix = dirname.substr(0, index);
+ ret = zk->create(prefix, contents, ZOO_CREATOR_ALL_ACL,
+ 0, &result);
+ if (ret != ZOK && ret != ZNODEEXISTS)
+ fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+ }
+
+ // Now create znode.
+ ret = zk->create(znode, master->getPID(), ZOO_CREATOR_ALL_ACL,
+ ZOO_EPHEMERAL, &result);
+
+ // If the node already exists, wait for it to get deleted.
+ if (ret == ZNODEEXISTS)
+ ret = zk->exists(znode, true, NULL);
+
+ if (ret != ZOK)
+ fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+ } else if ((state == ZOO_CONNECTED_STATE) &&
+ (type == ZOO_DELETED_EVENT) &&
+ (path.compare(znode) == 0)) {
+ // Now (re)create znode.
+ ret = zk->create(znode, master->getPID(), ZOO_CREATOR_ALL_ACL,
+ ZOO_EPHEMERAL, &result);
+
+ if (ret != ZOK)
+ fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+
+ // And set the value to our pid.
+ ret = zk->set(znode, master->getPID(), -1);
+
+ if (ret != ZOK)
+ fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+ } else if ((state == ZOO_EXPIRED_SESSION_STATE) &&
+ (type == ZOO_SESSION_EVENT)) {
+ // TODO(benh): Reconnect if session expires. Note the Zookeeper
+ // C library retries in the case of connection timeouts,
+ // connection loss, etc. Only expired sessions require
+ // explicitly reconnecting.
+ fatal("connection to ZooKeeper expired!");
+ } else if ((state == ZOO_CONNECTING_STATE) &&
+ (type == ZOO_SESSION_EVENT)) {
+ // The client library automatically reconnects, taking into
+ // account failed servers in the connection string,
+ // appropriately handling the "herd effect", etc.
+ LOG(INFO) << "Lost Zookeeper connection. Retrying (automagically).";
+ } else {
+ fatal("unhandled ZooKeeper event!");
+ }
+ }
+
+ MasterWatcher(Master *_master) : master(_master) {}
+};
+#endif
+
// A process that periodically pings the master to check filter expiries, etc
class AllocatorTimer : public Tuple<Process>
{
@@ -244,6 +329,12 @@ void Master::operator () ()
link(spawn(new AllocatorTimer(self())));
//link(spawn(new SharesPrinter(self())));
+#ifdef USING_ZOOKEEPER
+ ZooKeeper *zk;
+ if (!zookeeper.empty())
+ zk = new ZooKeeper(zookeeper, 10000, new MasterWatcher(this));
+#endif
+
while (true) {
switch (receive()) {
Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131727&r1=1131726&r2=1131727&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun 5 05:02:18 2011
@@ -48,12 +48,11 @@ private:
public:
void process(ZooKeeper *zk, int type, int state, const string &path)
{
+ string znode = "/home/nexus/master";
+ int ret;
+
if ((state == ZOO_CONNECTED_STATE) &&
((type == ZOO_SESSION_EVENT) || (type == ZOO_CREATED_EVENT))) {
- // Lookup master PID.
- string znode = "/home/nexus/master";
- int ret;
-
// Check if znode exists, if not, just return and wait.
ret = zk->exists(znode, true, NULL);
@@ -79,6 +78,10 @@ public:
slave->master = master;
Process::post(slave->getPID(), S2S_GOT_MASTER);
+ } else if ((state == ZOO_CONNECTED_STATE) &&
+ (type == ZOO_DELETED_EVENT) &&
+ (path.compare(znode) == 0)) {
+ // Master gone, we should get a PROCESS_EXIT and commit suicide.
} else {
fatal("unhandled ZooKeeper event!");
}