You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/11/04 02:29:47 UTC
svn commit: r1405464 -
/incubator/mesos/branches/0.10.0/src/detector/detector.cpp
Author: benh
Date: Sun Nov 4 01:29:47 2012
New Revision: 1405464
URL: http://svn.apache.org/viewvc?rev=1405464&view=rev
Log:
*** MODIFIED FOR 0.10.0 ***
Changed the ZooKeeper master detector to "time out" a session for
instances that are being used to contend for leadership (and send a
NoMasterDetectedMessage and expire the session after a reconnect does
occur) (https://reviews.apache.org/r/7295).
Modified:
incubator/mesos/branches/0.10.0/src/detector/detector.cpp
Modified: incubator/mesos/branches/0.10.0/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.0/src/detector/detector.cpp?rev=1405464&r1=1405463&r2=1405464&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.0/src/detector/detector.cpp (original)
+++ incubator/mesos/branches/0.10.0/src/detector/detector.cpp Sun Nov 4 01:29:47 2012
@@ -19,11 +19,15 @@
#include <glog/logging.h>
#include <fstream>
+#include <ios>
#include <vector>
#include <boost/lexical_cast.hpp>
+#include <process/delay.hpp>
+#include <process/process.hpp>
#include <process/protobuf.hpp>
+#include <process/timer.hpp>
#include <stout/foreach.hpp>
#include <stout/option.hpp>
@@ -45,6 +49,7 @@ using namespace mesos::internal;
using boost::lexical_cast;
using process::Process;
+using process::Timer;
using process::UPID;
using std::pair;
@@ -79,6 +84,11 @@ public:
void deleted(const string& path);
private:
+ // Handles reconnecting "timeouts" by prematurely expiring a session
+ // (only used for contending instances). TODO(benh): Remove 'const
+ // &' after fixing libprocess.
+ void timedout(const int64_t& sessionId);
+
// Attempts to detect a master.
void detectMaster();
@@ -91,6 +101,9 @@ private:
Watcher* watcher;
ZooKeeper* zk;
+ bool expire;
+ Option<Timer> timer;
+
// Our sequence string if contending to be a master.
Option<string> mySeq;
@@ -258,7 +271,8 @@ ZooKeeperMasterDetectorProcess::ZooKeepe
pid(_pid),
contend(_contend),
watcher(NULL),
- zk(NULL)
+ zk(NULL),
+ expire(false)
{
// Set verbosity level for underlying ZooKeeper library logging.
// TODO(benh): Put this in the C++ API.
@@ -357,11 +371,35 @@ void ZooKeeperMasterDetectorProcess::con
} else {
LOG(INFO) << "Master detector reconnected ...";
- // Either we were the master and we're still the master (because
- // we haven't yet gotten a session expiration), or someone else
- // was the master and they're still the master, or someone else
- // was the master and someone else still is now the master. Either
- // way, run the leader detector.
+ // Cancel and cleanup the reconnect timer (if necessary).
+ if (timer.isSome()) {
+ process::timers::cancel(timer.get());
+ timer = Option<Timer>::none();
+ }
+
+ // If we decided to expire the session, make sure we delete the
+ // ZooKeeper instance so the session actually expires. We also
+ // create a new ZooKeeper instance for clients that want to
+ // continue detecting and/or contending (which is likely given
+ // that this code is getting executed).
+ if (expire) {
+ LOG(WARNING) << "Cleaning up after expired ZooKeeper session";
+
+ delete CHECK_NOTNULL(zk);
+ delete CHECK_NOTNULL(watcher);
+
+ watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
+ zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
+
+ expire = false;
+ return;
+ }
+
+ // We've reconnected and we didn't prematurely expire the session,
+ // but the master might have changed, so we should run an
+ // election. TODO(benh): Determine if this is really necessary or
+ // if the watch set via 'ZooKeeper::getChildren' in 'detectMaster'
+ // is sufficient (it should be).
detectMaster();
}
}
@@ -371,6 +409,21 @@ void ZooKeeperMasterDetectorProcess::rec
{
LOG(INFO) << "Master detector lost connection to ZooKeeper, "
<< "attempting to reconnect ...";
+
+ // ZooKeeper won't tell us of a session expiration until we
+ // reconnect, which could occur much much later than the session was
+ // actually expired. This can lead to a prolonged split-brain
+ // scenario when network partitions occur. Rather than wait for a
+ // reconnection to occur (i.e., a network partition to be repaired)
+ // we create a local timer and "expire" our session prematurely if
+ // we haven't reconnected within the session expiration time
+ // out. Later, when we eventually do reconnect we can force the
+ // session to be expired if we decided locally to expire.
+ timer = process::delay(
+ ZOOKEEPER_SESSION_TIMEOUT.value,
+ self(),
+ &Self::timedout,
+ zk->getSessionId());
}
@@ -378,10 +431,19 @@ void ZooKeeperMasterDetectorProcess::exp
{
LOG(WARNING) << "Master detector ZooKeeper session expired!";
- CHECK(zk != NULL);
- delete zk;
+ // Cancel and cleanup the reconnect timer (if necessary).
+ if (timer.isSome()) {
+ process::timers::cancel(timer.get());
+ timer = Option<Timer>::none();
+ }
+
+ delete CHECK_NOTNULL(zk);
+ delete CHECK_NOTNULL(watcher);
+ watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
+
+ expire = false;
}
@@ -405,6 +467,19 @@ void ZooKeeperMasterDetectorProcess::del
}
+void ZooKeeperMasterDetectorProcess::timedout(const int64_t& sessionId)
+{
+ CHECK_NOTNULL(zk);
+ if (timer.isSome() && zk->getSessionId() == sessionId) {
+ LOG(WARNING) << "Timed out waiting to reconnect to ZooKeeper "
+ << "(sessionId=" << std::hex << sessionId << ")";
+ timer = Option<Timer>::none();
+ expire = true;
+ process::post(pid, NoMasterDetectedMessage());
+ }
+}
+
+
void ZooKeeperMasterDetectorProcess::detectMaster()
{
vector<string> results;