You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:02:18 UTC

svn commit: r1131727 - in /incubator/mesos/trunk/src: master.cpp slave.cpp

Author: benh
Date: Sun Jun  5 05:02:18 2011
New Revision: 1131727

URL: http://svn.apache.org/viewvc?rev=1131727&view=rev
Log:
Fixed ZooKeeper bug that occurs when a master restarts too quickly and the ephemeral nodes have not been cleaned up.

Modified:
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/slave.cpp

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131727&r1=1131726&r2=1131727&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 05:02:18 2011
@@ -1,10 +1,15 @@
-#include "allocator.hpp"
-#include "master.hpp"
-#include "master_webui.hpp"
-#include "allocator_factory.hpp"
+#include "config.hpp" // Need to define first to get USING_ZOOKEEPER
 
 #include <glog/logging.h>
 
+#ifdef USING_ZOOKEEPER
+#include <zookeeper.hpp>
+#endif
+
+#include "allocator.hpp"
+#include "allocator_factory.hpp"
+#include "master.hpp"
+#include "master_webui.hpp"
 
 using std::endl;
 using std::max;
@@ -26,8 +31,88 @@ using namespace nexus::internal;
 using namespace nexus::internal::master;
 
 
+/* List of ZooKeeper host:port pairs (from master_main.cpp/local.cpp). */
+extern string zookeeper;
+
 namespace {
 
+#ifdef USING_ZOOKEEPER
+class MasterWatcher : public Watcher
+{
+private:
+  Master *master;
+
+public:
+  void process(ZooKeeper *zk, int type, int state, const string &path)
+  {
+    string znode = "/home/nexus/master";
+    string dirname = "/home/nexus";
+    string delimiter = "/";
+    string contents = "";
+
+    int ret;
+    string result;
+
+    if ((state == ZOO_CONNECTED_STATE) &&
+	(type == ZOO_SESSION_EVENT)) {
+      // Create directory path znodes as necessary.
+      size_t index = dirname.find(delimiter, 0);
+      while (index < string::npos) {
+	index = dirname.find(delimiter, index+1);
+	string prefix = dirname.substr(0, index);
+	ret = zk->create(prefix, contents, ZOO_CREATOR_ALL_ACL,
+			 0, &result);
+	if (ret != ZOK && ret != ZNODEEXISTS)
+	  fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+      }
+
+      // Now create znode.
+      ret = zk->create(znode, master->getPID(), ZOO_CREATOR_ALL_ACL,
+		       ZOO_EPHEMERAL, &result);
+
+      // If the node already exists, wait for it to get deleted.
+      if (ret == ZNODEEXISTS)
+	ret = zk->exists(znode, true, NULL);
+
+      if (ret != ZOK)
+	fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+    } else if ((state == ZOO_CONNECTED_STATE) &&
+	       (type == ZOO_DELETED_EVENT) &&
+	       (path.compare(znode) == 0)) {
+      // Now (re)create znode.
+      ret = zk->create(znode, master->getPID(), ZOO_CREATOR_ALL_ACL,
+		       ZOO_EPHEMERAL, &result);
+
+      if (ret != ZOK)
+	fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+
+      // And set the value to our pid.
+      ret = zk->set(znode, master->getPID(), -1);
+
+      if (ret != ZOK)
+	fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+    } else if ((state == ZOO_EXPIRED_SESSION_STATE) &&
+	       (type == ZOO_SESSION_EVENT)) {
+      // TODO(benh): Reconnect if session expires. Note the Zookeeper
+      // C library retries in the case of connection timeouts,
+      // connection loss, etc. Only expired sessions require
+      // explicitly reconnecting.
+	fatal("connection to ZooKeeper expired!");
+    } else if ((state == ZOO_CONNECTING_STATE) &&
+	       (type == ZOO_SESSION_EVENT)) {
+      // The client library automatically reconnects, taking into
+      // account failed servers in the connection string,
+      // appropriately handling the "herd effect", etc.
+      LOG(INFO) << "Lost Zookeeper connection. Retrying (automagically).";
+    } else {
+      fatal("unhandled ZooKeeper event!");
+    }
+  }
+
+  MasterWatcher(Master *_master) : master(_master) {}
+};
+#endif
+
 // A process that periodically pings the master to check filter expiries, etc
 class AllocatorTimer : public Tuple<Process>
 {
@@ -244,6 +329,12 @@ void Master::operator () ()
   link(spawn(new AllocatorTimer(self())));
   //link(spawn(new SharesPrinter(self())));
 
+#ifdef USING_ZOOKEEPER
+  ZooKeeper *zk;
+  if (!zookeeper.empty())
+    zk = new ZooKeeper(zookeeper, 10000, new MasterWatcher(this));
+#endif
+
   while (true) {
     switch (receive()) {
 

Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131727&r1=1131726&r2=1131727&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun  5 05:02:18 2011
@@ -48,12 +48,11 @@ private:
 public:
   void process(ZooKeeper *zk, int type, int state, const string &path)
   {
+    string znode = "/home/nexus/master";
+    int ret;
+
     if ((state == ZOO_CONNECTED_STATE) &&
 	((type == ZOO_SESSION_EVENT) || (type == ZOO_CREATED_EVENT))) {
-      // Lookup master PID.
-      string znode = "/home/nexus/master";
-      int ret;
-
       // Check if znode exists, if not, just return and wait.
       ret = zk->exists(znode, true, NULL);
 
@@ -79,6 +78,10 @@ public:
       slave->master = master;
 
       Process::post(slave->getPID(), S2S_GOT_MASTER);
+    } else if ((state == ZOO_CONNECTED_STATE) &&
+	       (type == ZOO_DELETED_EVENT) &&
+	       (path.compare(znode) == 0)) {
+      // Master gone, we should get a PROCESS_EXIT and commit suicide.
     } else {
       fatal("unhandled ZooKeeper event!");
     }