You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/11/04 02:29:47 UTC

svn commit: r1405464 - /incubator/mesos/branches/0.10.0/src/detector/detector.cpp

Author: benh
Date: Sun Nov  4 01:29:47 2012
New Revision: 1405464

URL: http://svn.apache.org/viewvc?rev=1405464&view=rev
Log:
*** MODIFIED FOR 0.10.0 ***
Changed the ZooKeeper master detector to "time out" a session for
instances that are being used to contend for leadership (and send a
NoMasterDetectedMessage and expire the session after a reconnect does
occur) (https://reviews.apache.org/r/7295).

Modified:
    incubator/mesos/branches/0.10.0/src/detector/detector.cpp

Modified: incubator/mesos/branches/0.10.0/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.0/src/detector/detector.cpp?rev=1405464&r1=1405463&r2=1405464&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.0/src/detector/detector.cpp (original)
+++ incubator/mesos/branches/0.10.0/src/detector/detector.cpp Sun Nov  4 01:29:47 2012
@@ -19,11 +19,15 @@
 #include <glog/logging.h>
 
 #include <fstream>
+#include <ios>
 #include <vector>
 
 #include <boost/lexical_cast.hpp>
 
+#include <process/delay.hpp>
+#include <process/process.hpp>
 #include <process/protobuf.hpp>
+#include <process/timer.hpp>
 
 #include <stout/foreach.hpp>
 #include <stout/option.hpp>
@@ -45,6 +49,7 @@ using namespace mesos::internal;
 using boost::lexical_cast;
 
 using process::Process;
+using process::Timer;
 using process::UPID;
 
 using std::pair;
@@ -79,6 +84,11 @@ public:
   void deleted(const string& path);
 
 private:
+  // Handles reconnecting "timeouts" by prematurely expiring a session
+  // (only used for contending instances). TODO(benh): Remove 'const
+  // &' after fixing libprocess.
+  void timedout(const int64_t& sessionId);
+
   // Attempts to detect a master.
   void detectMaster();
 
@@ -91,6 +101,9 @@ private:
   Watcher* watcher;
   ZooKeeper* zk;
 
+  bool expire;
+  Option<Timer> timer;
+
   // Our sequence string if contending to be a master.
   Option<string> mySeq;
 
@@ -258,7 +271,8 @@ ZooKeeperMasterDetectorProcess::ZooKeepe
     pid(_pid),
     contend(_contend),
     watcher(NULL),
-    zk(NULL)
+    zk(NULL),
+    expire(false)
 {
   // Set verbosity level for underlying ZooKeeper library logging.
   // TODO(benh): Put this in the C++ API.
@@ -357,11 +371,35 @@ void ZooKeeperMasterDetectorProcess::con
   } else {
     LOG(INFO) << "Master detector reconnected ...";
 
-    // Either we were the master and we're still the master (because
-    // we haven't yet gotten a session expiration), or someone else
-    // was the master and they're still the master, or someone else
-    // was the master and someone else still is now the master. Either
-    // way, run the leader detector.
+    // Cancel and cleanup the reconnect timer (if necessary).
+    if (timer.isSome()) {
+      process::timers::cancel(timer.get());
+      timer = Option<Timer>::none();
+    }
+
+    // If we decided to expire the session, make sure we delete the
+    // ZooKeeper instance so the session actually expires. We also
+    // create a new ZooKeeper instance for clients that want to
+    // continue detecting and/or contending (which is likely given
+    // that this code is getting executed).
+    if (expire) {
+      LOG(WARNING) << "Cleaning up after expired ZooKeeper session";
+
+      delete CHECK_NOTNULL(zk);
+      delete CHECK_NOTNULL(watcher);
+
+      watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
+      zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
+
+      expire = false;
+      return;
+    }
+
+    // We've reconnected and we didn't prematurely expire the session,
+    // but the master might have changed, so we should run an
+    // election. TODO(benh): Determine if this is really necessary or
+    // if the watch set via 'ZooKeeper::getChildren' in 'detectMaster'
+    // is sufficient (it should be).
     detectMaster();
   }
 }
@@ -371,6 +409,21 @@ void ZooKeeperMasterDetectorProcess::rec
 {
   LOG(INFO) << "Master detector lost connection to ZooKeeper, "
 	    << "attempting to reconnect ...";
+
+  // ZooKeeper won't tell us of a session expiration until we
+  // reconnect, which could occur much much later than the session was
+  // actually expired. This can lead to a prolonged split-brain
+  // scenario when network partitions occur. Rather than wait for a
+  // reconnection to occur (i.e., a network partition to be repaired)
+  // we create a local timer and "expire" our session prematurely if
+  // we haven't reconnected within the session expiration time
+  // out. Later, when we eventually do reconnect we can force the
+  // session to be expired if we decided locally to expire.
+  timer = process::delay(
+      ZOOKEEPER_SESSION_TIMEOUT.value,
+      self(),
+      &Self::timedout,
+      zk->getSessionId());
 }
 
 
@@ -378,10 +431,19 @@ void ZooKeeperMasterDetectorProcess::exp
 {
   LOG(WARNING) << "Master detector ZooKeeper session expired!";
 
-  CHECK(zk != NULL);
-  delete zk;
+  // Cancel and cleanup the reconnect timer (if necessary).
+  if (timer.isSome()) {
+    process::timers::cancel(timer.get());
+    timer = Option<Timer>::none();
+  }
+
+  delete CHECK_NOTNULL(zk);
+  delete CHECK_NOTNULL(watcher);
 
+  watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
   zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
+
+  expire = false;
 }
 
 
@@ -405,6 +467,19 @@ void ZooKeeperMasterDetectorProcess::del
 }
 
 
+void ZooKeeperMasterDetectorProcess::timedout(const int64_t& sessionId)
+{
+  CHECK_NOTNULL(zk);
+  if (timer.isSome() && zk->getSessionId() == sessionId) {
+    LOG(WARNING) << "Timed out waiting to reconnect to ZooKeeper "
+                 << "(sessionId=" << std::hex << sessionId << ")";
+    timer = Option<Timer>::none();
+    expire = true;
+    process::post(pid, NoMasterDetectedMessage());
+  }
+}
+
+
 void ZooKeeperMasterDetectorProcess::detectMaster()
 {
   vector<string> results;