You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/12/11 01:03:50 UTC
svn commit: r1419937 - in /incubator/mesos/branches/0.10.x/src:
detector/detector.cpp detector/detector.hpp slave/slave.cpp
tests/zookeeper_server.hpp tests/zookeeper_tests.cpp
Author: benh
Date: Tue Dec 11 00:03:49 2012
New Revision: 1419937
URL: http://svn.apache.org/viewvc?rev=1419937&view=rev
Log:
*** MODIFIED FOR 0.10.0 ***
Fixed the master detector to only send a NoMasterDetected message to a
leading master when the detector gets partitioned and we decide to
manually expire the session.
From: Vinod Kone <vi...@gmail.com>
Review: https://reviews.apache.org/r/7746
Modified:
incubator/mesos/branches/0.10.x/src/detector/detector.cpp
incubator/mesos/branches/0.10.x/src/detector/detector.hpp
incubator/mesos/branches/0.10.x/src/slave/slave.cpp
incubator/mesos/branches/0.10.x/src/tests/zookeeper_server.hpp
incubator/mesos/branches/0.10.x/src/tests/zookeeper_tests.cpp
Modified: incubator/mesos/branches/0.10.x/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.x/src/detector/detector.cpp?rev=1419937&r1=1419936&r2=1419937&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.x/src/detector/detector.cpp (original)
+++ incubator/mesos/branches/0.10.x/src/detector/detector.cpp Tue Dec 11 00:03:49 2012
@@ -23,6 +23,8 @@
#include <vector>
#include <process/delay.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
#include <process/timer.hpp>
@@ -43,9 +45,7 @@
#include "zookeeper/watcher.hpp"
#include "zookeeper/zookeeper.hpp"
-using namespace mesos;
-using namespace mesos::internal;
-
+using process::Future;
using process::Process;
using process::Timer;
using process::UPID;
@@ -55,6 +55,9 @@ using std::pair;
using std::string;
using std::vector;
+namespace mesos {
+namespace internal {
+
// TODO(benh): Make this value configurable via flags and verify that
// it is always LESS THAN the slave heartbeat timeout.
const seconds ZOOKEEPER_SESSION_TIMEOUT(10.0);
@@ -74,6 +77,9 @@ public:
virtual void initialize();
+ // ZooKeeperMasterDetector implementation.
+ int64_t session();
+
// ZooKeeper events.
void connected(bool reconnect);
void reconnecting();
@@ -108,33 +114,6 @@ private:
};
-class ZooKeeperMasterDetector : public MasterDetector
-{
-public:
- /**
- * Uses ZooKeeper for both detecting masters and contending to be a
- * master.
- *
- * @param server comma separated list of server host:port pairs
- * @param znode top-level "ZooKeeper node" (directory) to use
- * @param pid libprocess pid to send messages/updates to (and to
- * use for contending to be a master)
- * @param contend true if should contend to be master and false otherwise (not
- * needed for slaves and frameworks)
- * @param quiet verbosity logging level for underlying ZooKeeper library
- */
- ZooKeeperMasterDetector(const zookeeper::URL& url,
- const UPID& pid,
- bool contend,
- bool quiet);
-
- virtual ~ZooKeeperMasterDetector();
-
-private:
- ZooKeeperMasterDetectorProcess* process;
-};
-
-
MasterDetector::~MasterDetector() {}
@@ -292,6 +271,13 @@ void ZooKeeperMasterDetectorProcess::ini
}
+int64_t ZooKeeperMasterDetectorProcess::session()
+{
+ CHECK_NOTNULL(zk);
+ return zk->getSessionId();
+}
+
+
void ZooKeeperMasterDetectorProcess::connected(bool reconnect)
{
if (!reconnect) {
@@ -387,7 +373,7 @@ void ZooKeeperMasterDetectorProcess::con
void ZooKeeperMasterDetectorProcess::reconnecting()
{
LOG(INFO) << "Master detector lost connection to ZooKeeper, "
- << "attempting to reconnect ...";
+ << "attempting to reconnect ...";
// ZooKeeper won't tell us of a session expiration until we
// reconnect, which could occur much much later than the session was
@@ -454,7 +440,28 @@ void ZooKeeperMasterDetectorProcess::tim
<< "(sessionId=" << std::hex << sessionId << ")";
timer = Option<Timer>::none();
expire = true;
- process::post(pid, NoMasterDetectedMessage());
+
+ // We only send a NoMasterDetectedMessage if we are a
+ // contending detector AND ALSO the current leader.
+ // This is because:
+ // 1) If we are a non-contending detector (e.g. slave), a zk session
+ // expiration doesn't necessarily mean a new leader (master) is elected
+ // (e.g. the slave is partitioned from the zk server). If the leading
+ // master stays the same (i.e., no leader election), then the
+ // slave should still accept a ShutDownMessage from the master.
+ // If a new master does get elected, the slave would know about it
+ // because it would do a leader detection after it connects/re-connects.
+ // 2) If we are a contender but not the leader (e.g. non-leading master),
+ // sending a NoMasterDetectedMessage() is bad because, a partitioned
+ // non-leading master would never know about the leading master that
+ // stays the same (i.e., no leader election) even after it is
+ // connected/reconnected with the ZooKeeper. This is because, the
+ // the master detection code (detectMaster()) will not send a
+ // NewMasterDetectedMessage to the non-leading master as there is no
+ // change in the currentMasterSeq.
+ if (contend && currentMasterPID == pid) {
+ process::post(pid, NoMasterDetectedMessage());
+ }
}
}
@@ -466,8 +473,18 @@ void ZooKeeperMasterDetectorProcess::det
int code = zk->getChildren(url.path, true, &results);
if (code != ZOK) {
- LOG(ERROR) << "Master detector failed to get masters: "
- << zk->message(code);
+ if (zk->retryable(code)) {
+ // NOTE: We don't expect a ZNONODE here because 'url.path' is always
+ // created in the connected() call. Despite that, we don't do a
+ // CHECK (code != ZNONODE) just to be safe incase the zk client library
+ // does return the code unexpectedly.
+ LOG(ERROR) << "Master detector failed to get masters: "
+ << zk->message(code);
+ return; // Try again when we reconnect.
+ } else {
+ LOG(FATAL) << "Non-retryable ZooKeeper error while getting masters: "
+ << zk->message(code);
+ }
} else {
LOG(INFO) << "Master detector found " << results.size()
<< " registered masters";
@@ -490,6 +507,7 @@ void ZooKeeperMasterDetectorProcess::det
// No master present (lost or possibly hasn't come up yet).
if (masterSeq.empty()) {
+ LOG(INFO) << "Master detector of " << pid << " couldn't find any masters";
process::post(pid, NoMasterDetectedMessage());
} else if (masterSeq != currentMasterSeq) {
// Okay, let's fetch the master pid from ZooKeeper.
@@ -499,9 +517,19 @@ void ZooKeeperMasterDetectorProcess::det
if (code != ZOK) {
// This is possible because the master might have failed since
// the invocation of ZooKeeper::getChildren above.
- LOG(ERROR) << "Master detector failed to fetch new master pid: "
- << zk->message(code);
- process::post(pid, NoMasterDetectedMessage());
+ // It is fine to not send a NoMasterDetectedMessage here because,
+ // 1) If this is due to a connection loss or session expiration,
+ // connected() or expired() will be called and the leader detection
+ // code (detectMaster()) will be re-tried.
+ // 2) If this is due to no masters present (i.e., code == ZNONODE),
+ // updated() will be called and the detectMaster() will be re-tried.
+ if (zk->retryable(code)) {
+ LOG(ERROR) << "Master detector failed to fetch new master pid: "
+ << zk->message(code);
+ } else {
+ LOG(FATAL) << "Non-retryable ZooKeeper error while fetching "
+ << "new master pid: " << zk->message(code);
+ }
} else {
// Now let's parse what we fetched from ZooKeeper.
LOG(INFO) << "Master detector got new master pid: " << result;
@@ -509,18 +537,18 @@ void ZooKeeperMasterDetectorProcess::det
UPID masterPid = result;
if (masterPid == UPID()) {
- // TODO(benh): Maybe we should try again then!?!? Parsing
- // might have failed because of DNS, and whoever is using the
- // detector might sit "unconnected" indefinitely!
- LOG(ERROR) << "Failed to parse new master pid!";
- process::post(pid, NoMasterDetectedMessage());
+ // TODO(benh): Maybe we should try again then!?!? Parsing
+ // might have failed because of DNS, and whoever is using the
+ // detector might sit "unconnected" indefinitely!
+ LOG(ERROR) << "Failed to parse new master pid!";
+ process::post(pid, NoMasterDetectedMessage());
} else {
- currentMasterSeq = masterSeq;
- currentMasterPID = masterPid;
+ currentMasterSeq = masterSeq;
+ currentMasterPID = masterPid;
NewMasterDetectedMessage message;
- message.set_pid(currentMasterPID);
- process::post(pid, message);
+ message.set_pid(currentMasterPID);
+ process::post(pid, message);
}
}
}
@@ -544,3 +572,12 @@ ZooKeeperMasterDetector::~ZooKeeperMaste
wait(process);
delete process;
}
+
+
+Future<int64_t> ZooKeeperMasterDetector::session()
+{
+ return dispatch(process, &ZooKeeperMasterDetectorProcess::session);
+}
+
+} // namespace internal {
+} // namespace mesos {
Modified: incubator/mesos/branches/0.10.x/src/detector/detector.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.x/src/detector/detector.hpp?rev=1419937&r1=1419936&r2=1419937&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.x/src/detector/detector.hpp (original)
+++ incubator/mesos/branches/0.10.x/src/detector/detector.hpp Tue Dec 11 00:03:49 2012
@@ -25,13 +25,20 @@
#include <climits>
#include <cstdlib>
+#include <process/future.hpp>
+#include <process/pid.hpp>
#include <process/process.hpp>
#include <stout/try.hpp>
+#include "zookeeper/url.hpp"
+
namespace mesos {
namespace internal {
+class ZooKeeperMasterDetectorProcess; // Forward declaration.
+
+
/**
* Implements functionality for:
* a) detecting masters
@@ -104,8 +111,38 @@ private:
const process::UPID master;
};
+
+class ZooKeeperMasterDetector : public MasterDetector
+{
+public:
+ /**
+ * Uses ZooKeeper for both detecting masters and contending to be a
+ * master.
+ *
+ * @param url znode path of the master
+ * @param pid libprocess pid to send messages/updates to (and to
+ * use for contending to be a master)
+ * @param contend true if should contend to be master and false otherwise (not
+ * needed for slaves and frameworks)
+ * @param quiet verbosity logging level for underlying ZooKeeper library
+ */
+ ZooKeeperMasterDetector(const zookeeper::URL& url,
+ const process::UPID& pid,
+ bool contend,
+ bool quiet);
+
+ virtual ~ZooKeeperMasterDetector();
+
+ /**
+ * Returns the ZooKeeper session ID associated with this detector.
+ */
+ process::Future<int64_t> session();
+
+private:
+ ZooKeeperMasterDetectorProcess* process;
+};
+
} // namespace internal {
} // namespace mesos {
#endif // __MASTER_DETECTOR_HPP__
-
Modified: incubator/mesos/branches/0.10.x/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.x/src/slave/slave.cpp?rev=1419937&r1=1419936&r2=1419937&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.x/src/slave/slave.cpp (original)
+++ incubator/mesos/branches/0.10.x/src/slave/slave.cpp Tue Dec 11 00:03:49 2012
@@ -336,7 +336,7 @@ void Slave::shutdown()
{
if (from != master) {
LOG(WARNING) << "Ignoring shutdown message from " << from
- << "because it is not from the registered master ("
+ << " because it is not from the registered master ("
<< master << ")";
return;
}
Modified: incubator/mesos/branches/0.10.x/src/tests/zookeeper_server.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.x/src/tests/zookeeper_server.hpp?rev=1419937&r1=1419936&r2=1419937&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.x/src/tests/zookeeper_server.hpp (original)
+++ incubator/mesos/branches/0.10.x/src/tests/zookeeper_server.hpp Tue Dec 11 00:03:49 2012
@@ -53,7 +53,16 @@ public:
// the first call and re-uses the port on subsequent calls.
int startNetwork();
- // Forces the server to expire the given session immediately.
+ // Forces the server to expire the given session.
+ // Note that there is a delay (~3s) for the corresponding client to receive
+ // a session expiration event notification from the ZooKeeper server.
+ // TODO(vinod): Fix this so that the notification is immediate.
+ // One possible solution is suggested at :
+ // http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A4
+ // But according to,
+ // http://comments.gmane.org/gmane.comp.java.hadoop.zookeeper.user/4489
+ // the C binding for ZooKeeper does not yet support multiple
+ // clients with the same session id.
void expireSession(int64_t sessionId);
private:
Modified: incubator/mesos/branches/0.10.x/src/tests/zookeeper_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/branches/0.10.x/src/tests/zookeeper_tests.cpp?rev=1419937&r1=1419936&r2=1419937&view=diff
==============================================================================
--- incubator/mesos/branches/0.10.x/src/tests/zookeeper_tests.cpp (original)
+++ incubator/mesos/branches/0.10.x/src/tests/zookeeper_tests.cpp Tue Dec 11 00:03:49 2012
@@ -39,6 +39,7 @@
#include "zookeeper/authentication.hpp"
#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
using namespace mesos::internal;
using namespace mesos::internal::test;
@@ -46,6 +47,7 @@ using namespace mesos::internal::test;
using process::Clock;
using testing::_;
+using testing::AtMost;
using testing::Return;
@@ -275,7 +277,7 @@ TEST_F(ZooKeeperTest, MasterDetectorShut
zks->startNetwork();
- WAIT_FOR(newMasterDetectedCall2, seconds(5.0));
+ WAIT_FOR(newMasterDetectedCall2, seconds(5.0)); // ZooKeeper needs extra time.
MasterDetector::destroy(detector.get());
@@ -286,6 +288,70 @@ TEST_F(ZooKeeperTest, MasterDetectorShut
}
+TEST_F(ZooKeeperTest, MasterDetectorExpireZKSession)
+{
+ // Simulate a leading master.
+ MockMasterDetectorListenerProcess leader;
+
+ trigger newMasterDetectedCall1, newMasterDetectedCall2;
+ EXPECT_CALL(leader, newMasterDetected(_))
+ .WillOnce(Trigger(&newMasterDetectedCall1))
+ .WillOnce(Trigger(&newMasterDetectedCall2));
+
+ EXPECT_CALL(leader, noMasterDetected())
+ .Times(0);
+
+ process::spawn(leader);
+
+ std::string znode = "zk://" + zks->connectString() + "/mesos";
+
+ Try<zookeeper::URL> url = zookeeper::URL::parse(znode);
+ ASSERT_TRUE(url.isSome());
+
+ // Leader's detector.
+ ZooKeeperMasterDetector leaderDetector(url.get(), leader.self(), true, true);
+
+ WAIT_UNTIL(newMasterDetectedCall1);
+
+ // Simulate a following master.
+ MockMasterDetectorListenerProcess follower;
+
+ trigger newMasterDetectedCall3;
+ EXPECT_CALL(follower, newMasterDetected(_))
+ .WillOnce(Trigger(&newMasterDetectedCall3))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(follower, noMasterDetected())
+ .Times(0);
+
+ process::spawn(follower);
+
+ // Follower's detector.
+ ZooKeeperMasterDetector followerDetector(
+ url.get(), follower.self(), true, true);
+
+ WAIT_UNTIL(newMasterDetectedCall3);
+
+ // Now expire the leader's zk session.
+ process::Future<int64_t> session = leaderDetector.session();
+ session.await();
+ ASSERT_FALSE(session.isFailed()) << session.failure();
+ ASSERT_FALSE(session.isDiscarded());
+ ASSERT_TRUE(session.isReady());
+
+ zks->expireSession(session.get());
+
+ // Wait for session expiration and ensure we receive a newMasterDetected call.
+ WAIT_FOR(newMasterDetectedCall2, seconds(5.0)); // ZooKeeper needs extra time.
+
+ process::terminate(follower);
+ process::wait(follower);
+
+ process::terminate(leader);
+ process::wait(leader);
+}
+
+
TEST_F(ZooKeeperTest, Group)
{
zookeeper::Group group(zks->connectString(), NO_TIMEOUT, "/test/");