You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:21:44 UTC
svn commit: r1132300 - in /incubator/mesos/trunk/src: common/ detector/
master/ messaging/ sched/ slave/
Author: benh
Date: Sun Jun 5 09:21:44 2011
New Revision: 1132300
URL: http://svn.apache.org/viewvc?rev=1132300&view=rev
Log:
Updates to support ZooKeeper session expiration for the master detector.
Modified:
incubator/mesos/trunk/src/common/zookeeper.cpp
incubator/mesos/trunk/src/common/zookeeper.hpp
incubator/mesos/trunk/src/detector/detector.cpp
incubator/mesos/trunk/src/detector/detector.hpp
incubator/mesos/trunk/src/master/master.cpp
incubator/mesos/trunk/src/master/slaves_manager.cpp
incubator/mesos/trunk/src/messaging/messages.cpp
incubator/mesos/trunk/src/messaging/messages.hpp
incubator/mesos/trunk/src/sched/sched.cpp
incubator/mesos/trunk/src/slave/slave.cpp
Modified: incubator/mesos/trunk/src/common/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/zookeeper.cpp?rev=1132300&r1=1132299&r2=1132300&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/common/zookeeper.cpp Sun Jun 5 09:21:44 2011
@@ -43,16 +43,18 @@ WatcherProcessManager* manager;
// Watcher, so the ZooKeeperImpl won't end up calling into an object
// that has been deleted. In the worst case, the ZooKeeperImpl will
// dispatch to a dead WatcherProcess, which will just get dropped on
-// the floor. We wanted to keep the Watcher interface clean and
-// simple, so rather than add a member in Watcher that points to a
-// WatcherProcess instance (or points to a WatcherImpl), we choose to
-// create a WatcherProcessManager that stores the Watcher and
-// WatcherProcess associations. The WatcherProcessManager is akin to
-// having a shared dictionary or hashtable and using locks to access
-// it rather then sending and receiving messages. Their is probably a
-// performance hit here, but it would be interesting to see how bad
-// the perforamnce is across a range of low and high-contention
-// states.
+// the floor. In addition, the callbacks in the Watcher can manipulate
+// the ZooKeeper object freely, calling delete on it if necessary
+// (e.g., after a session expiration). We wanted to keep the Watcher
+// interface clean and simple, so rather than add a member in Watcher
+// that points to a WatcherProcess instance (or points to a
+// WatcherImpl), we choose to create a WatcherProcessManager that
+// stores the Watcher and WatcherProcess associations. The
+// WatcherProcessManager is akin to having a shared dictionary or
+// hashtable and using locks to access it rather then sending and
+// receiving messages. Their is probably a performance hit here, but
+// it would be interesting to see how bad the perforamnce is across a
+// range of low and high-contention states.
class WatcherProcess : public Process<WatcherProcess>
{
public:
@@ -143,9 +145,9 @@ class ZooKeeperImpl
#endif // USE_THREADED_ZOOKEEPER
{
public:
- ZooKeeperImpl(ZooKeeper* zk, const string& hosts, int timeout,
+ ZooKeeperImpl(ZooKeeper* zk, const string& servers, int timeout,
Watcher* watcher)
- : zk(zk), hosts(hosts), timeout(timeout), watcher(watcher)
+ : zk(zk), servers(servers), timeout(timeout), watcher(watcher)
{
if (watcher == NULL) {
fatalerror("cannot instantiate ZooKeeper with NULL watcher");
@@ -160,7 +162,7 @@ public:
// TODO(benh): Link with WatcherProcess PID?
- zh = zookeeper_init(hosts.c_str(), event, timeout, NULL, this, 0);
+ zh = zookeeper_init(servers.c_str(), event, timeout, NULL, this, 0);
if (zh == NULL) {
fatalerror("failed to create ZooKeeper (zookeeper_init)");
}
@@ -481,7 +483,7 @@ private:
private:
friend class ZooKeeper;
- const string hosts; // ZooKeeper host:port pairs.
+ const string servers; // ZooKeeper host:port pairs.
const int timeout; // ZooKeeper session timeout.
ZooKeeper* zk; // ZooKeeper instance.
@@ -492,9 +494,9 @@ private:
};
-ZooKeeper::ZooKeeper(const string& hosts, int timeout, Watcher* watcher)
+ZooKeeper::ZooKeeper(const string& servers, int timeout, Watcher* watcher)
{
- impl = new ZooKeeperImpl(this, hosts, timeout, watcher);
+ impl = new ZooKeeperImpl(this, servers, timeout, watcher);
#ifndef USE_THREADED_ZOOKEEPER
process::spawn(impl);
#endif // USE_THREADED_ZOOKEEPER
@@ -524,7 +526,8 @@ int ZooKeeper::create(const string& path
return process::call(impl->self(), &ZooKeeperImpl::create,
cref(path), cref(data), cref(acl), flags, result);
#else
- return Future<int>(&impl->create(path, data, acl, flags, result)).get();
+ Promise<int> promise = impl->create(path, data, acl, flags, result);
+ return Future<int>(&promise).get();
#endif // USE_THREADED_ZOOKEEPER
}
@@ -535,7 +538,8 @@ int ZooKeeper::remove(const string& path
return process::call(impl->self(), &ZooKeeperImpl::remove,
cref(path), version);
#else
- return Future<int>(&impl->remove(path, version)).get();
+ Promise<int> promise = impl->remove(path, version);
+ return Future<int>(&promise).get();
#endif // USE_THREADED_ZOOKEEPER
}
@@ -546,7 +550,8 @@ int ZooKeeper::exists(const string& path
return process::call(impl->self(), &ZooKeeperImpl::exists,
cref(path), watch, stat);
#else
- return Future<int>(&impl->exists(path, watch, stat)).get();
+ Promise<int> promise = impl->exists(path, watch, stat);
+ return Future<int>(&promise).get();
#endif // USE_THREADED_ZOOKEEPER
}
@@ -557,7 +562,8 @@ int ZooKeeper::get(const string& path, b
return process::call(impl->self(), &ZooKeeperImpl::get,
cref(path), watch, result, stat);
#else
- return Future<int>(&impl->get(path, watch, result, stat)).get();
+ Promise<int> promise = impl->get(path, watch, result, stat);
+ return Future<int>(&promise).get();
#endif // USE_THREADED_ZOOKEEPER
}
@@ -569,7 +575,8 @@ int ZooKeeper::getChildren(const string&
return process::call(impl->self(), &ZooKeeperImpl::getChildren,
cref(path), watch, results);
#else
- return Future<int>(&impl->getChildren(path, watch, results)).get();
+ Promise<int> promise = impl->getChildren(path, watch, results);
+ return Future<int>(&promise).get();
#endif // USE_THREADED_ZOOKEEPER
}
@@ -580,7 +587,8 @@ int ZooKeeper::set(const string& path, c
return process::call(impl->self(), &ZooKeeperImpl::set,
cref(path), cref(data), version);
#else
- return Future<int>(&impl->set(path, data, version)).get();
+ Promise<int> promise = impl->set(path, data, version);
+ return Future<int>(&promise).get();
#endif // USE_THREADED_ZOOKEEPER
}
Modified: incubator/mesos/trunk/src/common/zookeeper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/zookeeper.hpp?rev=1132300&r1=1132299&r2=1132300&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/zookeeper.hpp (original)
+++ incubator/mesos/trunk/src/common/zookeeper.hpp Sun Jun 5 09:21:44 2011
@@ -101,13 +101,13 @@ public:
* establishment is asynchronous, meaning that the session should
* not be considered established until (and unless) an event of
* state ZOO_CONNECTED_STATE is received.
- * \param hosts comma separated host:port pairs, each corresponding
+ * \param servers comma separated host:port pairs, each corresponding
* to a ZooKeeper server. e.g. "127.0.0.1:3000,127.0.0.1:3001"
* \param watcher the instance of Watcher that receives event
* callbacks. When notifications are triggered the Watcher::process
* method will be invoked.
*/
- ZooKeeper(const std::string &hosts, int timeout, Watcher *watcher);
+ ZooKeeper(const std::string &servers, int timeout, Watcher *watcher);
~ZooKeeper();
Modified: incubator/mesos/trunk/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/detector.cpp?rev=1132300&r1=1132299&r2=1132300&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/detector.cpp (original)
+++ incubator/mesos/trunk/src/detector/detector.cpp Sun Jun 5 09:21:44 2011
@@ -33,27 +33,6 @@ using std::vector;
#ifdef WITH_ZOOKEEPER
-class ZooKeeperReconnectTimer : public Process<ZooKeeperReconnectTimer>
-{
-public:
- ZooKeeperReconnectTimer(const UPID& _pid) : pid(_pid) {}
-
-protected:
- virtual void operator () ()
- {
- receive(120);
- if (name() == process::TIMEOUT) {
- LOG(ERROR) << "Have not heard back from ZooKeeper after trying to "
- << "(automagically) reconnect";
- process::post(pid, MASTER_DETECTION_FAILURE);
- }
- }
-
-private:
- const UPID pid;
-};
-
-
class ZooKeeperMasterDetector : public MasterDetector, public Watcher
{
public:
@@ -84,6 +63,12 @@ public:
virtual void process(ZooKeeper *zk, int type, int state, const string &path);
private:
+ void connected();
+ void reconnecting();
+ void reconnected();
+ void expired();
+ void updated(const string& path);
+
/**
* @param s sequence id
*/
@@ -105,9 +90,9 @@ private:
*/
UPID lookupMasterPID(const string &seq) const;
- string servers;
- string znode;
- UPID pid;
+ const string servers;
+ const string znode;
+ const UPID pid;
bool contend;
bool reconnect;
@@ -118,11 +103,8 @@ private:
string currentMasterSeq;
UPID currentMasterPID;
-
- // Reconnect timer.
- ZooKeeperReconnectTimer *timer;
};
-#endif // #ifdef WITH_ZOOKEEPER
+#endif // WITH_ZOOKEEPER
MasterDetector::~MasterDetector() {}
@@ -167,7 +149,7 @@ MasterDetector* MasterDetector::create(c
#else
fatal("Cannot detect masters with 'zoo://', "
"ZooKeeper is not supported in this build");
-#endif // #ifdef WITH_ZOOKEEPER
+#endif // WITH_ZOOKEEPER
break;
}
@@ -282,13 +264,13 @@ BasicMasterDetector::~BasicMasterDetecto
#ifdef WITH_ZOOKEEPER
-ZooKeeperMasterDetector::ZooKeeperMasterDetector(const string& _servers,
- const string& _znode,
- const UPID& _pid,
- bool _contend,
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(const string& servers,
+ const string& znode,
+ const UPID& pid,
+ bool contend,
bool quiet)
- : servers(_servers), znode(_znode), pid(_pid),
- contend(_contend), reconnect(false), timer(NULL)
+ : servers(servers), znode(znode), pid(pid),
+ contend(contend), reconnect(false)
{
// Set verbosity level for underlying ZooKeeper library logging.
// TODO(benh): Put this in the C++ API.
@@ -301,146 +283,171 @@ ZooKeeperMasterDetector::ZooKeeperMaster
ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
{
- // Kill the timer (if running), and then the actual ZooKeeper instance.
- if (timer != NULL) {
- process::post(timer->self(), process::TERMINATE);
- process::wait(timer->self());
- delete timer;
- timer = NULL;
- }
-
if (zk != NULL) {
delete zk;
- zk = NULL;
}
}
-void ZooKeeperMasterDetector::process(ZooKeeper* zk, int type, int state,
- const string &path)
+void ZooKeeperMasterDetector::connected()
{
+ LOG(INFO) << "Master detector connected to ZooKeeper ...";
+
int ret;
string result;
static const string delimiter = "/";
- if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
- // Check if this is a reconnect.
- if (!reconnect) {
- LOG(INFO) << "Connected to ZooKeeper";
+ // Assume the znode that was created does not end with a "/".
+ CHECK(znode.at(znode.length() - 1) != '/');
- // Assume the znode that was created does not end with a "/".
- CHECK(znode.at(znode.length() - 1) != '/');
+ // Create directory path znodes as necessary.
+ size_t index = znode.find(delimiter, 0);
- // Create directory path znodes as necessary.
- size_t index = znode.find(delimiter, 0);
+ while (index < string::npos) {
+ // Get out the prefix to create.
+ index = znode.find(delimiter, index + 1);
+ string prefix = znode.substr(0, index);
- while (index < string::npos) {
- // Get out the prefix to create.
- index = znode.find(delimiter, index + 1);
- string prefix = znode.substr(0, index);
+ LOG(INFO) << "Trying to create znode '" << prefix << "' in ZooKeeper";
- LOG(INFO) << "Trying to create znode '" << prefix << "' in ZooKeeper";
+ // Create the node (even if it already exists).
+ ret = zk->create(prefix, "", ZOO_OPEN_ACL_UNSAFE,
+ // ZOO_CREATOR_ALL_ACL, // needs authentication
+ 0, &result);
- // Create the node (even if it already exists).
- ret = zk->create(prefix, "", ZOO_OPEN_ACL_UNSAFE,
- // ZOO_CREATOR_ALL_ACL, // needs authentication
- 0, &result);
+ if (ret != ZOK && ret != ZNODEEXISTS) {
+ fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+ }
+ }
- if (ret != ZOK && ret != ZNODEEXISTS) {
- fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
- }
- }
+ // Wierdness in ZooKeeper timing, let's check that everything is created.
+ ret = zk->get(znode, false, &result, NULL);
- // Wierdness in ZooKeeper timing, let's check that everything is created.
- ret = zk->get(znode, false, &result, NULL);
+ if (ret != ZOK) {
+ fatal("ZooKeeper not responding correctly (%s). "
+ "Make sure ZooKeeper is running on: %s",
+ zk->error(ret), servers.c_str());
+ }
- if (ret != ZOK) {
- fatal("ZooKeeper not responding correctly (%s). "
- "Make sure ZooKeeper is running on: %s",
- zk->error(ret), servers.c_str());
- }
+ if (contend) {
+ // We contend with the pid given in constructor.
+ ret = zk->create(znode + "/", pid, ZOO_OPEN_ACL_UNSAFE,
+ // ZOO_CREATOR_ALL_ACL, // needs authentication
+ ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
- if (contend) {
- // We contend with the pid given in constructor.
- ret = zk->create(znode + "/", pid, ZOO_OPEN_ACL_UNSAFE,
- // ZOO_CREATOR_ALL_ACL, // needs authentication
- ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
-
- if (ret != ZOK) {
- fatal("ZooKeeper not responding correctly (%s). "
- "Make sure ZooKeeper is running on: %s",
- zk->error(ret), servers.c_str());
- }
-
- setId(result);
- LOG(INFO) << "Created ephemeral/sequence:" << getId();
-
- MSG<GOT_MASTER_TOKEN> msg;
- msg.set_token(getId());
- MesosProcess<class T>::post(pid, msg);
- }
+ if (ret != ZOK) {
+ fatal("ZooKeeper not responding correctly (%s). "
+ "Make sure ZooKeeper is running on: %s",
+ zk->error(ret), servers.c_str());
+ }
- // Now determine who the master is (it may be us).
- detectMaster();
- } else {
- // Reconnected.
- LOG(INFO) << "Reconnected to Zookeeper ...";
+ setId(result);
+ LOG(INFO) << "Created ephemeral/sequence:" << getId();
- // Kill the reconnect timer.
- if (timer != NULL) {
- process::post(timer->self(), process::TERMINATE);
- process::wait(timer->self());
- delete timer;
- timer = NULL;
- }
+ MSG<GOT_MASTER_TOKEN> msg;
+ msg.set_token(getId());
+ MesosProcess<class T>::post(pid, msg);
+ }
- if (contend) {
- // Contending for master, confirm our ephemeral sequence znode exists.
- ret = zk->get(znode + "/" + mySeq, false, &result, NULL);
+ // Now determine who the master is (it may be us).
+ detectMaster();
+}
- // We might no longer be the master! Commit suicide for now
- // (hoping another master is on standbye), but in the future
- // it would be nice if we could go back on standbye.
- if (ret == ZNONODE)
- fatal("failed to reconnect to ZooKeeper quickly enough "
- "(our ephemeral sequence znode is gone), commiting suicide!");
-
- if (ret != ZOK)
- fatal("ZooKeeper not responding correctly (%s). "
- "Make sure ZooKeeper is running on: %s",
- zk->error(ret), servers.c_str());
- // We are still the master!
- LOG(INFO) << "Still acting as master";
- } else {
- // Reconnected, but maybe the master changed?
- LOG(INFO) << "Not sure if still master";
- detectMaster();
- }
+void ZooKeeperMasterDetector::reconnecting()
+{
+ LOG(INFO) << "Master detector lost connection to ZooKeeper, "
+ << "attempting to reconnect ...";
+}
- reconnect = false;
+
+void ZooKeeperMasterDetector::reconnected()
+{
+ LOG(INFO) << "Master detector reconnected ...";
+
+ int ret;
+ string result;
+
+ static const string delimiter = "/";
+
+ if (contend) {
+ // Contending for master, confirm our ephemeral sequence znode exists.
+ ret = zk->get(znode + "/" + mySeq, false, &result, NULL);
+
+ // We might no longer be the master! Commit suicide for now
+ // (hoping another master is on standbye), but in the future
+ // it would be nice if we could go back on standbye.
+ if (ret == ZNONODE) {
+ fatal("failed to reconnect to ZooKeeper quickly enough "
+ "(our ephemeral sequence znode is gone), commiting suicide!");
+ }
+
+ if (ret != ZOK) {
+ fatal("ZooKeeper not responding correctly (%s). "
+ "Make sure ZooKeeper is running on: %s",
+ zk->error(ret), servers.c_str());
}
- } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHILD_EVENT)) {
- // A new master might have showed up and created a sequence
- // identifier or a master may have died, determine who the master is now!
+
+ // We are still the master!
+ LOG(INFO) << "Still acting as master";
+ } else {
+ // Reconnected, but maybe the master changed?
detectMaster();
+ }
+}
+
+
+void ZooKeeperMasterDetector::expired()
+{
+ LOG(WARNING) << "Master detector ZooKeeper session expired!";
+
+ CHECK(zk != NULL);
+ delete zk;
+
+ zk = new ZooKeeper(servers, 10000, this);
+}
+
+
+void ZooKeeperMasterDetector::updated(const string& path)
+{
+ // A new master might have showed up and created a sequence
+ // identifier or a master may have died, determine who the master is now!
+ detectMaster();
+}
+
+
+void ZooKeeperMasterDetector::process(ZooKeeper* zk, int type, int state,
+ const string &path)
+{
+ if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
+ // Check if this is a reconnect.
+ if (!reconnect) {
+ // Initial connect.
+ connected();
+ } else {
+ // Reconnected.
+ reconnected();
+ }
} else if ((state == ZOO_CONNECTING_STATE) && (type == ZOO_SESSION_EVENT)) {
// The client library automatically reconnects, taking into
// account failed servers in the connection string,
// appropriately handling the "herd effect", etc.
- LOG(INFO) << "Lost Zookeeper connection, retrying (automagically)";
-
- // Create a reconnect timer so that we commit suicide if we
- // haven't heard back from ZooKeeper after a certain period of
- // time.
- timer = new ZooKeeperReconnectTimer(pid);
- process::spawn(timer);
-
reconnect = true;
+ reconnecting();
+ } else if ((state == ZOO_EXPIRED_SESSION_STATE) && (type == ZOO_SESSION_EVENT)) {
+ // Session expiration. Let the manager take care of it.
+ expired();
+
+ // If this watcher is reused, the next connect won't be a reconnect.
+ reconnect = false;
+ } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHILD_EVENT)) {
+ updated(path);
+ } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHANGED_EVENT)) {
+ updated(path);
} else {
LOG(WARNING) << "Unimplemented watch event: (state is "
- << state << " and type is " << type << ")";
+ << state << " and type is " << type << ")";
}
}
@@ -469,10 +476,13 @@ void ZooKeeperMasterDetector::detectMast
int ret = zk->getChildren(znode, true, &results);
- if (ret != ZOK)
- LOG(ERROR) << "Failed to get masters: " << zk->error(ret);
- else
- LOG(INFO) << "Found " << results.size() << " registered masters";
+ if (ret != ZOK) {
+ LOG(ERROR) << "Master detector failed to get masters: "
+ << zk->error(ret);
+ } else {
+ LOG(INFO) << "Master detector found " << results.size()
+ << " registered masters";
+ }
string masterSeq;
long min = LONG_MAX;
@@ -513,13 +523,14 @@ UPID ZooKeeperMasterDetector::lookupMast
ret = zk->get(znode + "/" + seq, false, &result, NULL);
- if (ret != ZOK)
- LOG(ERROR) << "Failed to fetch new master pid: " << zk->error(ret);
- else
- LOG(INFO) << "Got new master pid: " << result;
+ if (ret != ZOK) {
+ LOG(ERROR) << "Master detector failed to fetch new master pid: "
+ << zk->error(ret);
+ } else {
+ LOG(INFO) << "Master detector got new master pid: " << result;
+ }
- // TODO(benh): Automatic cast!
- return UPID(result);
+ return result;
}
-#endif // #ifdef WITH_ZOOKEEPER
+#endif // WITH_ZOOKEEPER
Modified: incubator/mesos/trunk/src/detector/detector.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/detector.hpp?rev=1132300&r1=1132299&r2=1132300&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/detector.hpp (original)
+++ incubator/mesos/trunk/src/detector/detector.hpp Sun Jun 5 09:21:44 2011
@@ -30,14 +30,15 @@ public:
*
* @param url string possibly containing zoo://, zoofile://, mesos://
* @param pid libprocess pid to both receive our messages and be
- * used if we should contend
+ * used if we should contend
* @param contend true if should contend to be master
+ * @param quite true if should limit log output
* @return instance of MasterDetector
*/
- static MasterDetector * create(const std::string& url,
- const process::UPID& pid,
- bool contend = false,
- bool quiet = true);
+ static MasterDetector* create(const std::string& url,
+ const process::UPID& pid,
+ bool contend = false,
+ bool quiet = true);
/**
* Cleans up and deallocates the detector.
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132300&r1=1132299&r2=1132300&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun 5 09:21:44 2011
@@ -445,8 +445,6 @@ void Master::initialize()
install(NO_MASTER_DETECTED, &Master::noMasterDetected);
- install(MASTER_DETECTION_FAILURE, &Master::masterDetectionFailure);
-
install(F2M_REGISTER_FRAMEWORK, &Master::registerFramework,
&RegisterFrameworkMessage::framework);
@@ -543,12 +541,6 @@ void Master::noMasterDetected()
}
-void Master::masterDetectionFailure()
-{
- LOG(FATAL) << "Cannot reliably detect master ... committing suicide!";
-}
-
-
void Master::registerFramework(const FrameworkInfo& frameworkInfo)
{
Framework* framework =
Modified: incubator/mesos/trunk/src/master/slaves_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/slaves_manager.cpp?rev=1132300&r1=1132299&r2=1132300&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/slaves_manager.cpp (original)
+++ incubator/mesos/trunk/src/master/slaves_manager.cpp Sun Jun 5 09:21:44 2011
@@ -95,6 +95,12 @@ public:
// Reconnected.
process::dispatch(pid, &ZooKeeperSlavesManagerStorage::reconnected);
}
+ } else if ((state == ZOO_CONNECTING_STATE) && (type == ZOO_SESSION_EVENT)) {
+ // The client library automatically reconnects, taking into
+ // account failed servers in the connection string,
+ // appropriately handling the "herd effect", etc.
+ reconnect = true;
+ process::dispatch(pid, &ZooKeeperSlavesManagerStorage::reconnecting);
} else if ((state == ZOO_EXPIRED_SESSION_STATE) && (type == ZOO_SESSION_EVENT)) {
// Session expiration. Let the manager take care of it.
process::dispatch(pid, &ZooKeeperSlavesManagerStorage::expired);
@@ -104,12 +110,6 @@ public:
} else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHANGED_EVENT)) {
// Let the manager deal with file changes.
process::dispatch(pid, &ZooKeeperSlavesManagerStorage::updated, path);
- } else if ((state == ZOO_CONNECTING_STATE) && (type == ZOO_SESSION_EVENT)) {
- // The client library automatically reconnects, taking into
- // account failed servers in the connection string,
- // appropriately handling the "herd effect", etc.
- reconnect = true;
- process::dispatch(pid, &ZooKeeperSlavesManagerStorage::reconnecting);
} else {
LOG(WARNING) << "Unimplemented watch event: (state is "
<< state << " and type is " << type << ")";
@@ -150,7 +150,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
ret = zk->get(znode, true, &result, &stat);
if (ret != ZOK) {
- LOG(WARNING) << "Failed to get '" << znode
+ LOG(WARNING) << "Slaves manager storage failed to get '" << znode
<< "' in ZooKeeper! (" << zk->error(ret) << ")";
return false;
}
@@ -168,7 +168,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
size_t index = result.find(active);
if (index == string::npos) {
- LOG(WARNING) << "Bad data in '" << znode
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode
<< "', could not find 'active='";
return false;
}
@@ -184,7 +184,8 @@ Promise<bool> ZooKeeperSlavesManagerStor
ret = zk->set(znode, result, stat.version);
if (ret != ZOK) {
- LOG(WARNING) << "Could not add slave " << hostname << ":" << port
+ LOG(WARNING) << "Slaves manager storage could not add slave "
+ << hostname << ":" << port
<< " to '" << znode << "' in ZooKeeper! ("
<< zk->error(ret) << ")";
return false;
@@ -204,7 +205,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
ret = zk->get(znode, true, &result, &stat);
if (ret != ZOK) {
- LOG(WARNING) << "Failed to get '" << znode
+ LOG(WARNING) << "Slaves manager storage failed to get '" << znode
<< "' in ZooKeeper! (" << zk->error(ret) << ")";
return false;
}
@@ -215,7 +216,8 @@ Promise<bool> ZooKeeperSlavesManagerStor
size_t index = result.find(out.str());
if (index == string::npos) {
- LOG(WARNING) << "Could not remove slave " << hostname << ":" << port
+ LOG(WARNING) << "Slaves manager storage could not remove slave "
+ << hostname << ":" << port
<< " because not currently active or inactive";
return false;
} else if (index == 0) {
@@ -237,7 +239,8 @@ Promise<bool> ZooKeeperSlavesManagerStor
ret = zk->set(znode, result, stat.version);
if (ret != ZOK) {
- LOG(WARNING) << "Could not remove slave " << hostname << ":" << port
+ LOG(WARNING) << "Slaves manager storage could not remove slave "
+ << hostname << ":" << port
<< " from '" << znode << "' in ZooKeeper! ("
<< zk->error(ret) << ")";
return false;
@@ -257,7 +260,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
ret = zk->get(znode, true, &result, &stat);
if (ret != ZOK) {
- LOG(WARNING) << "Failed to get '" << znode
+ LOG(WARNING) << "Slaves manager storage failed to get '" << znode
<< "' in ZooKeeper! (" << zk->error(ret) << ")";
return false;
}
@@ -270,7 +273,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
size_t index = result.find(inactive);
if (index == string::npos) {
- LOG(WARNING) << "Bad data in '" << znode
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode
<< "', could not find 'inactive='";
return false;
}
@@ -278,11 +281,12 @@ Promise<bool> ZooKeeperSlavesManagerStor
index = result.find(out.str(), index);
if (index == string::npos) {
- LOG(WARNING) << "Could not activate slave " << hostname << ":" << port
+ LOG(WARNING) << "Slaves manager storage could not activate slave "
+ << hostname << ":" << port
<< " because not currently inactive";
return false;
} else if (index == 0) {
- LOG(WARNING) << "Bad data in '" << znode;
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode;
return false;
}
@@ -301,7 +305,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
index = result.find(active);
if (index == string::npos) {
- LOG(WARNING) << "Bad data in '" << znode
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode
<< "', could not find 'active='";
return false;
}
@@ -316,7 +320,8 @@ Promise<bool> ZooKeeperSlavesManagerStor
ret = zk->set(znode, result, stat.version);
if (ret != ZOK) {
- LOG(WARNING) << "Could not activate slave " << hostname << ":" << port
+ LOG(WARNING) << "Slaves manager storage could not activate slave "
+ << hostname << ":" << port
<< " in '" << znode << "' in ZooKeeper! ("
<< zk->error(ret) << ")";
return false;
@@ -336,7 +341,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
ret = zk->get(znode, true, &result, &stat);
if (ret != ZOK) {
- LOG(WARNING) << "Failed to get '" << znode
+ LOG(WARNING) << "Slaves manager storage failed to get '" << znode
<< "' in ZooKeeper! (" << zk->error(ret) << ")";
return false;
}
@@ -349,7 +354,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
size_t index = result.find(active);
if (index == string::npos) {
- LOG(WARNING) << "Bad data in '" << znode
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode
<< "', could not find 'active='";
return false;
}
@@ -357,11 +362,12 @@ Promise<bool> ZooKeeperSlavesManagerStor
index = result.find(out.str(), index);
if (index == string::npos) {
- LOG(WARNING) << "Could not deactivate slave " << hostname << ":" << port
+ LOG(WARNING) << "Slaves manager storage could not deactivate slave "
+ << hostname << ":" << port
<< " because not currently active";
return false;
} else if (index == 0) {
- LOG(WARNING) << "Bad data in '" << znode;
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode;
return false;
}
@@ -380,7 +386,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
index = result.find(inactive);
if (index == string::npos) {
- LOG(WARNING) << "Bad data in '" << znode
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode
<< "', could not find 'inactive='";
return false;
}
@@ -395,7 +401,8 @@ Promise<bool> ZooKeeperSlavesManagerStor
ret = zk->set(znode, result, stat.version);
if (ret != ZOK) {
- LOG(WARNING) << "Could not activate slave " << hostname << ":" << port
+ LOG(WARNING) << "Slaves manager storage could not activate slave "
+ << hostname << ":" << port
<< " in '" << znode << "' in ZooKeeper! ("
<< zk->error(ret) << ")";
return false;
@@ -429,7 +436,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
// Okay, consider this a failure (maybe we lost our connection
// to ZooKeeper), increment the failure count, log the issue,
// and perhaps try again when ZooKeeper issues get sorted out.
- LOG(WARNING) << "Failed to create '" << znode
+ LOG(WARNING) << "Slaves manager storage failed to create '" << znode
<< "' in ZooKeeper! (" << zk->error(ret) << ")";
return false;
}
@@ -443,14 +450,15 @@ Promise<bool> ZooKeeperSlavesManagerStor
Promise<bool> ZooKeeperSlavesManagerStorage::reconnecting()
{
- LOG(INFO) << "ZooKeeperSlavesManagerStorage is attempting to reconnect";
+ LOG(INFO) << "Slaves manager storage lost connection to ZooKeeper, "
+ << "attempting to reconnect ...";
return true;
}
Promise<bool> ZooKeeperSlavesManagerStorage::reconnected()
{
- LOG(INFO) << "ZooKeeperSlavesManagerStorage has reconnected";
+ LOG(INFO) << "Slaves manager storage has reconnected ...";
// Reconcile what's in the znodes versus what we have in memory
// (this also puts watches on these znodes).
@@ -460,7 +468,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
Promise<bool> ZooKeeperSlavesManagerStorage::expired()
{
- LOG(WARNING) << "ZooKeeperSlavesManagerStorage session expired!";
+ LOG(WARNING) << "Slaves manager storage session expired!";
CHECK(zk != NULL);
delete zk;
@@ -480,13 +488,13 @@ Promise<bool> ZooKeeperSlavesManagerStor
string result;
if (path == znode) {
- LOG(INFO) << "Slave information in ZooKeeper has been updated "
+ LOG(INFO) << "Slaves manager storage found updates in ZooKeeper "
<< "... propogating changes";
ret = zk->get(znode, true, &result, NULL);
if (ret != ZOK) {
- LOG(WARNING) << "Failed to get '" << znode
+ LOG(WARNING) << "Slaves manager storage failed to get '" << znode
<< "' in ZooKeeper! (" << zk->error(ret) << ")";
return false;
}
@@ -502,7 +510,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
foreach (const string& token, tokens) {
const vector<string>& pairs = tokenize::split(token, ":");
if (pairs.size() != 2) {
- LOG(WARNING) << "Bad data in '" << znode
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode
<< "', could not parse " << token;
return false;
}
@@ -510,7 +518,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
try {
active.insert(pairs[0], lexical_cast<uint16_t>(pairs[1]));
} catch (const bad_lexical_cast&) {
- LOG(WARNING) << "Bad data in '" << znode
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode
<< "', could not parse " << token;
return false;
}
@@ -528,7 +536,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
foreach (const string& token, tokens) {
const vector<string>& pairs = tokenize::split(token, ":");
if (pairs.size() != 2) {
- LOG(WARNING) << "Bad data in '" << znode
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode
<< "', could not parse " << token;
return false;
}
@@ -536,7 +544,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
try {
inactive.insert(pairs[0], lexical_cast<uint16_t>(pairs[1]));
} catch (const bad_lexical_cast&) {
- LOG(WARNING) << "Bad data in '" << znode
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode
<< "', could not parse " << token;
return false;
}
@@ -545,7 +553,7 @@ Promise<bool> ZooKeeperSlavesManagerStor
process::dispatch(slavesManager, &SlavesManager::updateInactive, inactive);
}
} else {
- LOG(WARNING) << "Not expecting changes to path '"
+ LOG(WARNING) << "Slaves manager stoage not expecting changes to path '"
<< path << "' in ZooKeeper";
return false;
}
@@ -558,14 +566,14 @@ string ZooKeeperSlavesManagerStorage::pa
{
size_t begin = s.find(key);
if (begin == string::npos) {
- LOG(WARNING) << "Bad data in '" << znode
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode
<< "', could not find '" << key << "'";
return "";
}
size_t end = s.find("\n", begin);
if (end == string::npos) {
- LOG(WARNING) << "Bad data in '" << znode
+ LOG(WARNING) << "Slaves manager storage found bad data in '" << znode
<< "', missing LF after '" << key << "'";
return "";
}
@@ -801,18 +809,13 @@ Promise<HttpResponse> SlavesManager::add
map<string, vector<string> > pairs =
tokenize::pairs(request.query, ',', '=');
- if (pairs.size() != 2) {
- LOG(WARNING) << "Malformed query string when trying to add a slave";
- return HttpNotFoundResponse();
- }
-
// Make sure there is at least a 'hostname=' and 'port='.
if (pairs.count("hostname") == 0) {
- LOG(WARNING) << "Missing 'hostname' in query string"
+ LOG(WARNING) << "Slaves manager expecting 'hostname' in query string"
<< " when trying to add a slave";
return HttpNotFoundResponse();
} else if (pairs.count("port") == 0) {
- LOG(WARNING) << "Missing 'port' in query string"
+ LOG(WARNING) << "Slaves manager expecting 'port' in query string"
<< " when trying to add a slave";
return HttpNotFoundResponse();
}
@@ -823,12 +826,14 @@ Promise<HttpResponse> SlavesManager::add
try {
port = lexical_cast<uint16_t>(pairs["port"].front());
} catch (const bad_lexical_cast&) {
- LOG(WARNING) << "Failed to parse 'port = " << pairs["port"].front()
+ LOG(WARNING) << "Slaves manager failed to parse 'port = "
+ << pairs["port"].front()
<< "' when trying to add a slave";
return HttpNotFoundResponse();
}
- LOG(INFO) << "Asked to add slave at " << hostname << ":" << port;
+ LOG(INFO) << "Slaves manager received HTTP request to add slave at "
+ << hostname << ":" << port;
if (add(hostname, port)) {
return HttpOKResponse();
@@ -847,18 +852,13 @@ Promise<HttpResponse> SlavesManager::rem
map<string, vector<string> > pairs =
tokenize::pairs(request.query, ',', '=');
- if (pairs.size() != 2) {
- LOG(WARNING) << "Malformed query string when trying to remove a slave";
- return HttpNotFoundResponse();
- }
-
// Make sure there is at least a 'hostname=' and 'port='.
if (pairs.count("hostname") == 0) {
- LOG(WARNING) << "Missing 'hostname' in query string"
+ LOG(WARNING) << "Slaves manager expecting 'hostname' in query string"
<< " when trying to remove a slave";
return HttpNotFoundResponse();
} else if (pairs.count("port") == 0) {
- LOG(WARNING) << "Missing 'port' in query string"
+ LOG(WARNING) << "Slaves manager expecting 'port' in query string"
<< " when trying to remove a slave";
return HttpNotFoundResponse();
}
@@ -869,12 +869,14 @@ Promise<HttpResponse> SlavesManager::rem
try {
port = lexical_cast<uint16_t>(pairs["port"].front());
} catch (const bad_lexical_cast&) {
- LOG(WARNING) << "Failed to parse 'port = " << pairs["port"].front()
+ LOG(WARNING) << "Slaves manager failed to parse 'port = "
+ << pairs["port"].front()
<< "' when trying to remove a slave";
return HttpNotFoundResponse();
}
- LOG(INFO) << "Asked to remove slave at " << hostname << ":" << port;
+ LOG(INFO) << "Slaves manager received HTTP request to remove slave at "
+ << hostname << ":" << port;
if (remove(hostname, port)) {
return HttpOKResponse();
@@ -893,18 +895,13 @@ Promise<HttpResponse> SlavesManager::act
map<string, vector<string> > pairs =
tokenize::pairs(request.query, ',', '=');
- if (pairs.size() != 2) {
- LOG(WARNING) << "Malformed query string when trying to activate a slave";
- return HttpNotFoundResponse();
- }
-
// Make sure there is at least a 'hostname=' and 'port='.
if (pairs.count("hostname") == 0) {
- LOG(WARNING) << "Missing 'hostname' in query string"
+ LOG(WARNING) << "Slaves manager expecting 'hostname' in query string"
<< " when trying to activate a slave";
return HttpNotFoundResponse();
} else if (pairs.count("port") == 0) {
- LOG(WARNING) << "Missing 'port' in query string"
+ LOG(WARNING) << "Slaves manager expecting 'port' in query string"
<< " when trying to activate a slave";
return HttpNotFoundResponse();
}
@@ -915,12 +912,13 @@ Promise<HttpResponse> SlavesManager::act
try {
port = lexical_cast<uint16_t>(pairs["port"].front());
} catch (const bad_lexical_cast&) {
- LOG(WARNING) << "Failed to parse 'port = " << pairs["port"].front()
+ LOG(WARNING) << "Slaves manager failed to parse 'port = "
+ << pairs["port"].front()
<< "' when trying to activate a slave";
return HttpNotFoundResponse();
}
- LOG(INFO) << "HTTP request to activate slave at "
+ LOG(INFO) << "Slaves manager received HTTP request to activate slave at "
<< hostname << ":" << port;
if (activate(hostname, port)) {
@@ -940,18 +938,13 @@ Promise<HttpResponse> SlavesManager::dea
map<string, vector<string> > pairs =
tokenize::pairs(request.query, ',', '=');
- if (pairs.size() != 2) {
- LOG(WARNING) << "Malformed query string when trying to deactivate a slave";
- return HttpNotFoundResponse();
- }
-
// Make sure there is at least a 'hostname=' and 'port='.
if (pairs.count("hostname") == 0) {
- LOG(WARNING) << "Missing 'hostname' in query string"
+ LOG(WARNING) << "Slaves manager expecting 'hostname' in query string"
<< " when trying to deactivate a slave";
return HttpNotFoundResponse();
} else if (pairs.count("port") == 0) {
- LOG(WARNING) << "Missing 'port' in query string"
+ LOG(WARNING) << "Slaves manager expecting 'port' in query string"
<< " when trying to deactivate a slave";
return HttpNotFoundResponse();
}
@@ -962,12 +955,13 @@ Promise<HttpResponse> SlavesManager::dea
try {
port = lexical_cast<uint16_t>(pairs["port"].front());
} catch (const bad_lexical_cast&) {
- LOG(WARNING) << "Failed to parse 'port = " << pairs["port"].front()
+ LOG(WARNING) << "Slaves manager failed to parse 'port = "
+ << pairs["port"].front()
<< "' when trying to deactivate a slave";
return HttpNotFoundResponse();
}
- LOG(INFO) << "HTTP request to deactivate slave at "
+ LOG(INFO) << "Slaves manager received HTTP request to deactivate slave at "
<< hostname << ":" << port;
if (deactivate(hostname, port)) {
Modified: incubator/mesos/trunk/src/messaging/messages.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.cpp?rev=1132300&r1=1132299&r2=1132300&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.cpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.cpp Sun Jun 5 09:21:44 2011
@@ -71,7 +71,6 @@ ALLOCATE_MESSAGE(S2PD_KILL_ALL);
ALLOCATE_MESSAGE(GOT_MASTER_TOKEN);
ALLOCATE_MESSAGE(NEW_MASTER_DETECTED);
ALLOCATE_MESSAGE(NO_MASTER_DETECTED);
-ALLOCATE_MESSAGE(MASTER_DETECTION_FAILURE);
// Generic messages.
ALLOCATE_MESSAGE(PING);
Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132300&r1=1132299&r2=1132300&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun 5 09:21:44 2011
@@ -111,7 +111,6 @@ MESSAGE(S2PD_KILL_ALL);
MESSAGE(GOT_MASTER_TOKEN, GotMasterTokenMessage);
MESSAGE(NEW_MASTER_DETECTED, NewMasterDetectedMessage);
MESSAGE(NO_MASTER_DETECTED);
-MESSAGE(MASTER_DETECTION_FAILURE);
// Generic messages.
MESSAGE(PING);
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132300&r1=1132299&r2=1132300&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun 5 09:21:44 2011
@@ -78,8 +78,6 @@ public:
install(NO_MASTER_DETECTED, &SchedulerProcess::noMasterDetected);
- install(MASTER_DETECTION_FAILURE, &SchedulerProcess::masterDetectionFailure);
-
install(M2F_REGISTER_REPLY, &SchedulerProcess::registerReply,
&FrameworkRegisteredMessage::framework_id);
@@ -169,17 +167,6 @@ protected:
active = false;
}
- void masterDetectionFailure()
- {
- VLOG(1) << "Master detection failed";
- active = false;
- // TODO(benh): Better error codes/messages!
- int32_t code = 1;
- const string& message = "Failed to detect master(s)";
- process::invoke(bind(&Scheduler::error, sched, driver, code,
- cref(message)));
- }
-
void registerReply(const FrameworkID& frameworkId)
{
VLOG(1) << "Framework registered with " << frameworkId;
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132300&r1=1132299&r2=1132300&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun 5 09:21:44 2011
@@ -200,8 +200,6 @@ void Slave::initialize()
install(NO_MASTER_DETECTED, &Slave::noMasterDetected);
- install(MASTER_DETECTION_FAILURE, &Slave::masterDetectionFailure);
-
install(M2S_REGISTER_REPLY, &Slave::registerReply,
&SlaveRegisteredMessage::slave_id);
@@ -291,12 +289,6 @@ void Slave::noMasterDetected()
}
-void Slave::masterDetectionFailure()
-{
- LOG(FATAL) << "Cannot reliably detect master ... committing suicide!";
-}
-
-
void Slave::registerReply(const SlaveID& slaveId)
{
LOG(INFO) << "Registered with master; given slave ID " << slaveId;