You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/10/04 01:37:24 UTC
svn commit: r1393816 - /incubator/mesos/trunk/src/detector/detector.cpp
Author: benh
Date: Wed Oct 3 23:37:23 2012
New Revision: 1393816
URL: http://svn.apache.org/viewvc?rev=1393816&view=rev
Log:
Refactored underlying ZooKeeper master detector into a libprocess
process (https://reviews.apache.org/r/7290).
Modified:
incubator/mesos/trunk/src/detector/detector.cpp
Modified: incubator/mesos/trunk/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/detector.cpp?rev=1393816&r1=1393815&r2=1393816&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/detector.cpp (original)
+++ incubator/mesos/trunk/src/detector/detector.cpp Wed Oct 3 23:37:23 2012
@@ -26,6 +26,7 @@
#include <process/protobuf.hpp>
#include <stout/foreach.hpp>
+#include <stout/option.hpp>
#include "detector/detector.hpp"
@@ -35,6 +36,7 @@
#include "zookeeper/authentication.hpp"
#include "zookeeper/url.hpp"
+#include "zookeeper/watcher.hpp"
#include "zookeeper/zookeeper.hpp"
using namespace mesos;
@@ -49,8 +51,55 @@ using std::pair;
using std::string;
using std::vector;
+// TODO(benh): Make this value configurable via flags and verify that
+// it is always LESS THAN the slave heartbeat timeout.
+const Seconds ZOOKEEPER_SESSION_TIMEOUT(10.0);
-class ZooKeeperMasterDetector : public MasterDetector, public Watcher
+
+class ZooKeeperMasterDetectorProcess
+ : public Process<ZooKeeperMasterDetectorProcess>
+{
+public:
+ ZooKeeperMasterDetectorProcess(
+ const zookeeper::URL& url,
+ const UPID& pid,
+ bool contend,
+ bool quiet);
+
+ virtual ~ZooKeeperMasterDetectorProcess();
+
+ virtual void initialize();
+
+ // ZooKeeper events.
+ void connected(bool reconnect);
+ void reconnecting();
+ void expired();
+ void updated(const string& path);
+ void created(const string& path);
+ void deleted(const string& path);
+
+private:
+ // Attempts to detect a master.
+ void detectMaster();
+
+ const zookeeper::URL url;
+ const ACL_vector acl;
+
+ const UPID pid;
+ bool contend;
+
+ Watcher* watcher;
+ ZooKeeper* zk;
+
+ // Our sequence string if contending to be a master.
+ Option<string> mySeq;
+
+ string currentMasterSeq;
+ UPID currentMasterPID;
+};
+
+
+class ZooKeeperMasterDetector : public MasterDetector
{
public:
/**
@@ -72,38 +121,8 @@ public:
virtual ~ZooKeeperMasterDetector();
- /**
- * ZooKeeper watcher callback.
- */
- virtual void process(ZooKeeper *zk, int type, int state, const string &path);
-
private:
- void connected();
- void reconnecting();
- void reconnected();
- void expired();
- void updated(const string& path);
-
- /**
- * Attempts to detect a master.
- */
- void detectMaster();
-
- const zookeeper::URL url;
- const UPID pid;
- bool contend;
-
- bool reconnect;
-
- ACL_vector acl;
-
- ZooKeeper* zk;
-
- // Our sequence string if contending to be a master.
- string mySeq;
-
- string currentMasterSeq;
- UPID currentMasterPID;
+ ZooKeeperMasterDetectorProcess* process;
};
@@ -254,144 +273,150 @@ BasicMasterDetector::BasicMasterDetector
BasicMasterDetector::~BasicMasterDetector() {}
-ZooKeeperMasterDetector::ZooKeeperMasterDetector(
+ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
const zookeeper::URL& _url,
const UPID& _pid,
bool _contend,
bool quiet)
: url(_url),
+ acl(url.authentication.isSome()
+ ? zookeeper::EVERYONE_READ_CREATOR_ALL
+ : ZOO_OPEN_ACL_UNSAFE),
pid(_pid),
contend(_contend),
- reconnect(false)
+ watcher(NULL),
+ zk(NULL)
{
// Set verbosity level for underlying ZooKeeper library logging.
// TODO(benh): Put this in the C++ API.
zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
+}
- acl = url.authentication.isSome()
- ? zookeeper::EVERYONE_READ_CREATOR_ALL
- : ZOO_OPEN_ACL_UNSAFE;
- // Start up the ZooKeeper connection!
- zk = new ZooKeeper(url.servers, Milliseconds(10000), this);
+ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
+{
+ delete zk;
+ delete watcher;
}
-ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
+
+void ZooKeeperMasterDetectorProcess::initialize()
{
- delete zk;
+ // Doing initialization here allows to avoid the race between
+ // instantiating the ZooKeeper instance and being spawned ourself.
+ watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
+ zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
}
-void ZooKeeperMasterDetector::connected()
+void ZooKeeperMasterDetectorProcess::connected(bool reconnect)
{
- LOG(INFO) << "Master detector connected to ZooKeeper ...";
+ if (!reconnect) {
+ LOG(INFO) << "Master detector connected to ZooKeeper ...";
- int code;
- if (url.authentication.isSome()) {
- const std::string& scheme = url.authentication.get().scheme;
- const std::string& credentials = url.authentication.get().credentials;
- LOG(INFO) << "Authenticating to ZooKeeper using scheme '" << scheme << "'";
- code = zk->authenticate(scheme, credentials);
- if (code != ZOK) {
- LOG(FATAL) << "Failed to authenticate with ZooKeeper: "
- << zk->message(code);
+ int code;
+ if (url.authentication.isSome()) {
+ const std::string& scheme = url.authentication.get().scheme;
+ const std::string& credentials = url.authentication.get().credentials;
+ LOG(INFO) << "Authenticating to ZooKeeper using scheme '" << scheme << "'";
+ code = zk->authenticate(scheme, credentials);
+ if (code != ZOK) {
+ LOG(FATAL) << "Failed to authenticate with ZooKeeper: "
+ << zk->message(code);
+ }
}
- }
- string result;
+ string result;
- static const string delimiter = "/";
+ static const string delimiter = "/";
- // Assume the path (chroot) being used does not end with a "/".
- CHECK(url.path.at(url.path.length() - 1) != '/');
+ // Assume the path (chroot) being used does not end with a "/".
+ CHECK(url.path.at(url.path.length() - 1) != '/');
- // Create znodes as necessary.
- size_t index = url.path.find(delimiter, 0);
+ // Create znodes as necessary.
+ size_t index = url.path.find(delimiter, 0);
- while (index < string::npos) {
- // Get out the prefix to create.
- index = url.path.find(delimiter, index + 1);
- string prefix = url.path.substr(0, index);
+ while (index < string::npos) {
+ // Get out the prefix to create.
+ index = url.path.find(delimiter, index + 1);
+ string prefix = url.path.substr(0, index);
- LOG(INFO) << "Trying to create znode '" << prefix << "' in ZooKeeper";
+ LOG(INFO) << "Trying to create znode '" << prefix << "' in ZooKeeper";
- // Create the node (even if it already exists).
- code = zk->create(prefix, "", acl, 0, &result);
+ // Create the node (even if it already exists).
+ code = zk->create(prefix, "", acl, 0, &result);
- if (code != ZOK && code != ZNODEEXISTS) {
- LOG(FATAL) << "Failed to create ZooKeeper znode: " << zk->message(code);
+ if (code != ZOK && code != ZNODEEXISTS) {
+ LOG(FATAL) << "Failed to create ZooKeeper znode: " << zk->message(code);
+ }
}
- }
- // Wierdness in ZooKeeper timing, let's check that everything is created.
- code = zk->get(url.path, false, &result, NULL);
-
- if (code != ZOK) {
- LOG(FATAL) << "Unexpected ZooKeeper failure: " << zk->message(code);
- }
-
- if (contend) {
- // We contend with the pid given in constructor.
- code = zk->create(url.path + "/", pid, acl,
- ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
+ // Wierdness in ZooKeeper timing, let's check that everything is created.
+ code = zk->get(url.path, false, &result, NULL);
if (code != ZOK) {
- LOG(FATAL) << "Unexpected ZooKeeper failure: %s" << zk->message(code);
+ LOG(FATAL) << "Unexpected ZooKeeper failure: " << zk->message(code);
}
- // Save the sequence id but only grab the basename, e.g.,
- // "/path/to/znode/000000131" => "000000131".
- size_t index;
- if ((index = result.find_last_of('/')) != string::npos) {
- mySeq = result.erase(0, index + 1);
- } else {
- mySeq = result;
+ if (contend) {
+ // We contend with the pid given in constructor.
+ code = zk->create(url.path + "/", pid, acl,
+ ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
+
+ if (code != ZOK) {
+ LOG(FATAL) << "Unexpected ZooKeeper failure: %s" << zk->message(code);
+ }
+
+ // Save the sequence id but only grab the basename, e.g.,
+ // "/path/to/znode/000000131" => "000000131".
+ size_t index;
+ if ((index = result.find_last_of('/')) != string::npos) {
+ mySeq = result.erase(0, index + 1);
+ } else {
+ mySeq = result;
+ }
+
+ LOG(INFO) << "Created ephemeral/sequence:" << mySeq.get();
+
+ GotMasterTokenMessage message;
+ message.set_token(mySeq.get());
+ process::post(pid, message);
}
- LOG(INFO) << "Created ephemeral/sequence:" << mySeq;
+ // Now determine who the master is (it may be us).
+ detectMaster();
+ } else {
+ LOG(INFO) << "Master detector reconnected ...";
- GotMasterTokenMessage message;
- message.set_token(mySeq);
- process::post(pid, message);
+ // Either we were the master and we're still the master (because
+ // we haven't yet gotten a session expiration), or someone else
+ // was the master and they're still the master, or someone else
+ // was the master and someone else still is now the master. Either
+ // way, run the leader detector.
+ detectMaster();
}
-
- // Now determine who the master is (it may be us).
- detectMaster();
}
-void ZooKeeperMasterDetector::reconnecting()
+void ZooKeeperMasterDetectorProcess::reconnecting()
{
LOG(INFO) << "Master detector lost connection to ZooKeeper, "
<< "attempting to reconnect ...";
}
-void ZooKeeperMasterDetector::reconnected()
-{
- LOG(INFO) << "Master detector reconnected ...";
-
- // Either we were the master and we're still the master (because we
- // haven't yet gotten a session expiration), or someone else was the
- // master and they're still the master, or someone else was the
- // master and someone else still is now the master. Either way, run
- // the leader detector.
- detectMaster();
-}
-
-
-void ZooKeeperMasterDetector::expired()
+void ZooKeeperMasterDetectorProcess::expired()
{
LOG(WARNING) << "Master detector ZooKeeper session expired!";
CHECK(zk != NULL);
delete zk;
- zk = new ZooKeeper(url.servers, Milliseconds(10000), this);
+ zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
}
-void ZooKeeperMasterDetector::updated(const string& path)
+void ZooKeeperMasterDetectorProcess::updated(const string& path)
{
// A new master might have showed up and created a sequence
// identifier or a master may have died, determine who the master is now!
@@ -399,42 +424,19 @@ void ZooKeeperMasterDetector::updated(co
}
-void ZooKeeperMasterDetector::process(ZooKeeper* zk, int type, int state,
- const string& path)
+void ZooKeeperMasterDetectorProcess::created(const string& path)
{
- if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
- // Check if this is a reconnect.
- if (!reconnect) {
- // Initial connect.
- connected();
- } else {
- // Reconnected.
- reconnected();
- }
- } else if ((state == ZOO_CONNECTING_STATE) && (type == ZOO_SESSION_EVENT)) {
- // The client library automatically reconnects, taking into
- // account failed servers in the connection string,
- // appropriately handling the "herd effect", etc.
- reconnect = true;
- reconnecting();
- } else if ((state == ZOO_EXPIRED_SESSION_STATE) && (type == ZOO_SESSION_EVENT)) {
- // Session expiration. Let the manager take care of it.
- expired();
-
- // If this watcher is reused, the next connect won't be a reconnect.
- reconnect = false;
- } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHILD_EVENT)) {
- updated(path);
- } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHANGED_EVENT)) {
- updated(path);
- } else {
- LOG(WARNING) << "Unimplemented watch event: (state is "
- << state << " and type is " << type << ")";
- }
+ LOG(FATAL) << "Unexpected ZooKeeper event (created) for '" << path << "'";
+}
+
+
+void ZooKeeperMasterDetectorProcess::deleted(const string& path)
+{
+ LOG(FATAL) << "Unexpected ZooKeeper event (deleted) for '" << path << "'";
}
-void ZooKeeperMasterDetector::detectMaster()
+void ZooKeeperMasterDetectorProcess::detectMaster()
{
vector<string> results;
@@ -495,3 +497,22 @@ void ZooKeeperMasterDetector::detectMast
}
}
}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(
+ const zookeeper::URL& url,
+ const UPID& pid,
+ bool contend,
+ bool quiet)
+{
+ process = new ZooKeeperMasterDetectorProcess(url, pid, contend, quiet);
+ spawn(process);
+}
+
+
+ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
+{
+ terminate(process);
+ wait(process);
+ delete process;
+}