You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/01/23 00:59:44 UTC
[1/4] git commit: Fixed MESOS-935: Group should tell MasterDetector
"no memberships detected" when it locally times out.
Updated Branches:
refs/heads/master 7321c385f -> 586e7eb65
Fixed MESOS-935: Group should tell MasterDetector "no memberships
detected" when it locally times out.
From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/17156
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42946ed5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42946ed5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42946ed5
Branch: refs/heads/master
Commit: 42946ed574e117c51b3f5f0a4be7636b96404d18
Parents: 7321c38
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Jan 22 15:38:45 2014 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jan 22 15:51:10 2014 -0800
----------------------------------------------------------------------
src/master/detector.hpp | 3 +
src/master/master.hpp | 7 +-
src/tests/master_contender_detector_tests.cpp | 49 +++++----
src/tests/master_tests.cpp | 112 +++++++++++++++++++++
src/tests/zookeeper_tests.cpp | 28 ++++--
src/zookeeper/detector.hpp | 3 +-
src/zookeeper/group.cpp | 16 ++-
7 files changed, 183 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/master/detector.hpp
----------------------------------------------------------------------
diff --git a/src/master/detector.hpp b/src/master/detector.hpp
index 8df2d16..277c9d9 100644
--- a/src/master/detector.hpp
+++ b/src/master/detector.hpp
@@ -107,6 +107,9 @@ public:
virtual ~ZooKeeperMasterDetector();
// MasterDetector implementation.
+ // The detector transparently tries to recover from retryable
+ // errors until the group session expires, in which case the Future
+ // returns None.
virtual process::Future<Option<process::UPID> > detect(
const Option<process::UPID>& previous = None());
http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 18a6cc4..99b8181 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -177,6 +177,10 @@ public:
// Made public for testing purposes.
void detected(const Future<Option<UPID> >& pid);
+ // Invoked when the contender has lost the candidacy.
+ // Made public for testing purposes.
+ void lostCandidacy(const Future<Nothing>& lost);
+
protected:
virtual void initialize();
virtual void finalize();
@@ -201,9 +205,6 @@ protected:
// Invoked when the contender has entered the contest.
void contended(const Future<Future<Nothing> >& candidacy);
- // Invoked when the contender has lost the candidacy.
- void lostCandidacy(const Future<Nothing>& lost);
-
// Process a launch tasks request (for a non-cancelled offer) by
// launching the desired tasks (if the offer contains a valid set of
// tasks) and reporting any unused resources to the allocator.
http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index 9cd576f..a739f89 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -418,7 +418,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
// We may need to advance multiple times because we could have
// advanced the clock before the timer in Group starts.
- while (lostCandidacy.isPending()) {
+ while (lostCandidacy.isPending() || leader.isPending()) {
Clock::advance(MASTER_CONTENDER_ZK_SESSION_TIMEOUT);
Clock::settle();
}
@@ -426,9 +426,11 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
// Local timeout does not fail the future but rather deems the
// session has timed out and the candidacy is lost.
EXPECT_TRUE(lostCandidacy.isReady());
+ EXPECT_NONE(leader.get());
- // Re-contend (and continue detecting).
+ // Re-contend and re-detect.
contended = contender.contend();
+ leader = detector.detect(leader.get());
// Things will not change until the server restarts.
Clock::advance(Minutes(1));
@@ -540,19 +542,23 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
Clock::pause();
- // We know when the groups have timed out when the contenders are
- // informed of the candidacy loss.
// We may need to advance multiple times because we could have
// advanced the clock before the timer in Group starts.
- while (leaderLostCandidacy.isPending() || followerLostCandidacy.isPending()) {
+ while (leaderDetected.isPending() ||
+ followerDetected.isPending() ||
+ nonContenderDetected.isPending() ||
+ leaderLostCandidacy.isPending() ||
+ followerLostCandidacy.isPending()) {
Clock::advance(sessionTimeout);
Clock::settle();
}
- // Detection is not interrupted.
- EXPECT_TRUE(leaderDetected.isPending());
- EXPECT_TRUE(followerDetected.isPending());
- EXPECT_TRUE(nonContenderDetected.isPending());
+ EXPECT_NONE(leaderDetected.get());
+ EXPECT_NONE(followerDetected.get());
+ EXPECT_NONE(nonContenderDetected.get());
+
+ EXPECT_TRUE(leaderLostCandidacy.isReady());
+ EXPECT_TRUE(followerLostCandidacy.isReady());
Clock::resume();
}
@@ -670,18 +676,18 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession)
Future<Option<int64_t> > session = group->session();
AWAIT_READY(session);
- Future<Nothing> connected = FUTURE_DISPATCH(
- group->process->self(),
- &GroupProcess::connected);
-
server->expireSession(session.get().get());
- // When connected is called, the leader has already expired and
- // reconnected.
- AWAIT_READY(connected);
+ // Session expiration causes detector to assume all membership are
+ // lost.
+ AWAIT_READY(detected);
+ EXPECT_NONE(detected.get());
+
+ detected = slaveDetector.detect(detected.get());
- // Still pending because there is no leader change.
- EXPECT_TRUE(detected.isPending());
+ // The detector is able re-detect the master.
+ AWAIT_READY(detected);
+ EXPECT_SOME_EQ(master, detected.get());
}
@@ -758,9 +764,14 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
server->expireSession(slaveSession.get().get());
server->expireSession(masterSession.get().get());
- // Wait for session expiration and ensure a new master is detected.
+ // Wait for session expiration and the detector will first receive
+ // a "no master detected" event.
AWAIT_READY(detected);
+ EXPECT_NONE(detected.get());
+ // nonContenderDetector can now re-detect the new master.
+ detected = nonContenderDetector.detect(detected.get());
+ AWAIT_READY(detected);
EXPECT_SOME_EQ(follower, detected.get());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index d34450b..f1486ce 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -49,6 +49,10 @@
#include "tests/isolator.hpp"
#include "tests/mesos.hpp"
+#ifdef MESOS_HAS_JAVA
+#include "tests/zookeeper.hpp"
+#endif
+
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::tests;
@@ -1023,6 +1027,7 @@ TEST_F(MasterTest, MasterLost)
Shutdown();
}
+
// Test sends different state than current and expects an update with
// the current state of task.
//
@@ -1099,3 +1104,110 @@ TEST_F(MasterTest, ReconcileTaskTest)
Shutdown(); // Must shutdown before 'isolator' gets deallocated.
}
+
+
+#ifdef MESOS_HAS_JAVA
+class MasterZooKeeperTest : public MesosTest
+{
+public:
+ static void SetUpTestCase()
+ {
+ // Make sure the JVM is created.
+ ZooKeeperTest::SetUpTestCase();
+
+ // Launch the ZooKeeper test server.
+ server = new ZooKeeperTestServer();
+ server->startNetwork();
+
+ Try<zookeeper::URL> parse = zookeeper::URL::parse(
+ "zk://" + server->connectString() + "/znode");
+ ASSERT_SOME(parse);
+
+ url = parse.get();
+ }
+
+ static void TearDownTestCase()
+ {
+ delete server;
+ server = NULL;
+ }
+
+protected:
+ MasterZooKeeperTest() : MesosTest(url) {}
+
+ static ZooKeeperTestServer* server;
+ static Option<zookeeper::URL> url;
+};
+
+
+ZooKeeperTestServer* MasterZooKeeperTest::server = NULL;
+
+
+Option<zookeeper::URL> MasterZooKeeperTest::url;
+
+
+// This test verifies that when the ZooKeeper cluster is lost,
+// master, slave & scheduler all get informed.
+TEST_F(MasterZooKeeperTest, LostZooKeeperCluster)
+{
+ ASSERT_SOME(StartMaster());
+ ASSERT_SOME(StartSlave());
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, stringify(url.get()), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(Return()); // Ignore offers.
+
+ Future<process::Message> frameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+ Future<process::Message> slaveRegisteredMessage =
+ FUTURE_MESSAGE(Eq(SlaveRegisteredMessage().GetTypeName()), _, _);
+
+ driver.start();
+
+ // Wait for the "registered" messages so that we know the master is
+ // detected by everyone.
+ AWAIT_READY(frameworkRegisteredMessage);
+ AWAIT_READY(slaveRegisteredMessage);
+
+ Future<Nothing> schedulerDisconnected;
+ EXPECT_CALL(sched, disconnected(&driver))
+ .WillOnce(FutureSatisfy(&schedulerDisconnected));
+
+ // Need to drop these two dispatches because otherwise the master
+ // will EXIT.
+ Future<Nothing> masterDetected = DROP_DISPATCH(_, &Master::detected);
+ DROP_DISPATCH(_, &Master::lostCandidacy);
+
+ Future<Nothing> slaveDetected = FUTURE_DISPATCH(_, &Slave::detected);
+
+ server->shutdownNetwork();
+
+ Clock::pause();
+
+ while (schedulerDisconnected.isPending() ||
+ masterDetected.isPending() ||
+ slaveDetected.isPending()) {
+ Clock::advance(MASTER_CONTENDER_ZK_SESSION_TIMEOUT);
+ Clock::settle();
+ }
+
+ Clock::resume();
+
+ // Master, slave and scheduler all lose the leading master.
+ AWAIT_READY(schedulerDisconnected);
+ AWAIT_READY(masterDetected);
+ AWAIT_READY(slaveDetected);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+#endif // MESOS_HAS_JAVA
http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/tests/zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper_tests.cpp b/src/tests/zookeeper_tests.cpp
index 656afde..c7fa750 100644
--- a/src/tests/zookeeper_tests.cpp
+++ b/src/tests/zookeeper_tests.cpp
@@ -203,7 +203,6 @@ TEST_F(ZooKeeperTest, LeaderDetectorTimeoutHandling)
Future<Group::Membership> membership1 = group.join("member 1");
AWAIT_READY(membership1);
- Future<bool> cancelled = membership1.get().cancelled();
Future<Option<Group::Membership> > leader = detector.detect();
@@ -218,26 +217,33 @@ TEST_F(ZooKeeperTest, LeaderDetectorTimeoutHandling)
// We may need to advance multiple times because we could have
// advanced the clock before the timer in Group starts.
- while (cancelled.isPending()) {
+ while (leader.isPending()) {
Clock::advance(timeout);
Clock::settle();
}
Clock::resume();
- // The detect operation times out but the group internally
- // recreates a new ZooKeeper client and hides the error from the
- // detector.
- EXPECT_TRUE(leader.isPending());
+ // The detect operation times out.
+ EXPECT_NONE(leader.get());
+
+ // Re-detect.
+ leader = detector.detect(leader.get());
Future<Nothing> connected = FUTURE_DISPATCH(
group.process->self(),
&GroupProcess::connected);
server->startNetwork();
- // When the service is restored, all sessions/memberships are gone.
AWAIT_READY(connected);
AWAIT_READY(leader);
- EXPECT_TRUE(leader.get().isNone());
+ EXPECT_SOME(leader.get());
+
+ // Wait until the old membership expires on ZK and re-detect.
+ // (Restarting network doesn't delete old ZNode automatically).
+ AWAIT_READY(leader.get().get().cancelled());
+ leader = detector.detect(leader.get());
+ AWAIT_READY(leader);
+ EXPECT_NONE(leader.get());
AWAIT_READY(group.join("member 1"));
@@ -246,10 +252,12 @@ TEST_F(ZooKeeperTest, LeaderDetectorTimeoutHandling)
EXPECT_SOME(leader.get());
// Cancel the member and join another.
- AWAIT_READY(group.cancel(leader.get().get()));
+ Future<bool> cancelled = group.cancel(leader.get().get());
+ AWAIT_READY(cancelled);
+ EXPECT_TRUE(cancelled.get());
leader = detector.detect(leader.get());
AWAIT_READY(leader);
- EXPECT_TRUE(leader.get().isNone());
+ EXPECT_NONE(leader.get());
AWAIT_READY(group.join("member 2"));
http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/zookeeper/detector.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/detector.hpp b/src/zookeeper/detector.hpp
index 00fcf5e..73235c0 100644
--- a/src/zookeeper/detector.hpp
+++ b/src/zookeeper/detector.hpp
@@ -29,7 +29,8 @@ public:
// A failed future is returned if the detector is unable to detect
// the leading master due to a non-retryable error.
// Note that the detector transparently tries to recover from
- // retryable errors.
+ // retryable errors until the group session expires, in which case
+ // the Future returns None.
// The future is never discarded unless it stays pending when the
// detector destructs.
//
http://git-wip-us.apache.org/repos/asf/mesos/blob/42946ed5/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index e21dc6f..72ebe69 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -430,7 +430,7 @@ void GroupProcess::timedout(int64_t sessionId)
// The timer can be reset or replaced and 'zk' can be replaced
// since this method was dispatched.
LOG(WARNING) << "Timed out waiting to reconnect to ZooKeeper "
- << "(sessionId=" << std::hex << sessionId << ")";
+ << "(sessionId=" << std::hex << sessionId << ")";
// Locally determine that the current session has expired.
expired();
@@ -450,7 +450,19 @@ void GroupProcess::expired()
timer = None();
}
- // Invalidate the cache.
+ // From the group's local perspective all the memberships are
+ // gone so we need to update the watches.
+ // If the memberships still exist on ZooKeeper, they will be
+ // restored in group after the group reconnects to ZK.
+ // This is a precaution against the possibility that ZK connection
+ // is lost right after we recreate the ZK instance below or the
+ // entire ZK cluster goes down. The outage can last for a long time
+ // but the clients watching the group should be informed sooner.
+ memberships = set<Group::Membership>();
+ update();
+
+ // Invalidate the cache so that we'll sync with ZK after
+ // reconnection.
memberships = None();
// Set all owned memberships as cancelled.
[2/4] git commit: Improved Group to take label as an option.
Posted by vi...@apache.org.
Improved Group to take label as an option.
Review: https://reviews.apache.org/r/17171
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/326172ec
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/326172ec
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/326172ec
Branch: refs/heads/master
Commit: 326172ec4571de0952278f005d6cba64d4dfc120
Parents: 42946ed
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jan 21 16:18:32 2014 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jan 22 15:51:12 2014 -0800
----------------------------------------------------------------------
src/tests/group_tests.cpp | 34 ++++++++++++++
src/zookeeper/group.cpp | 99 ++++++++++++++++++++++++++--------------
src/zookeeper/group.hpp | 43 ++++++++++++-----
src/zookeeper/zookeeper.hpp | 29 ++++++------
4 files changed, 145 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/tests/group_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/group_tests.cpp b/src/tests/group_tests.cpp
index ac1942b..5d4240c 100644
--- a/src/tests/group_tests.cpp
+++ b/src/tests/group_tests.cpp
@@ -372,3 +372,37 @@ TEST_F(GroupTest, RetryableErrors)
AWAIT_READY(cancellation);
AWAIT_READY(connected);
}
+
+
+TEST_F(GroupTest, LabelledGroup)
+{
+ Group group(server->connectString(), NO_TIMEOUT, "/test/");
+
+ // Join a group with label.
+ Future<Group::Membership> membership = group.join(
+ "hello world", std::string("testlabel"));
+
+ AWAIT_READY(membership);
+
+ Future<std::set<Group::Membership> > memberships = group.watch();
+
+ AWAIT_READY(memberships);
+ EXPECT_EQ(1u, memberships.get().size());
+ EXPECT_EQ(1u, memberships.get().count(membership.get()));
+
+ Future<std::string> data = group.data(membership.get());
+
+ AWAIT_EXPECT_EQ("hello world", data);
+
+ Future<bool> cancellation = group.cancel(membership.get());
+
+ AWAIT_EXPECT_EQ(true, cancellation);
+
+ memberships = group.watch(memberships.get());
+
+ AWAIT_READY(memberships);
+ EXPECT_EQ(0u, memberships.get().size());
+
+ ASSERT_TRUE(membership.get().cancelled().isReady());
+ ASSERT_TRUE(membership.get().cancelled().get());
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 72ebe69..a50da22 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -13,6 +13,7 @@
#include <stout/none.hpp>
#include <stout/numify.hpp>
#include <stout/os.hpp>
+#include <stout/path.hpp>
#include <stout/result.hpp>
#include <stout/some.hpp>
#include <stout/strings.hpp>
@@ -127,12 +128,14 @@ void GroupProcess::initialize()
}
-Future<Group::Membership> GroupProcess::join(const string& data)
+Future<Group::Membership> GroupProcess::join(
+ const string& data,
+ const Option<string>& label)
{
if (error.isSome()) {
return Failure(error.get());
} else if (state != READY) {
- Join* join = new Join(data);
+ Join* join = new Join(data, label);
pending.joins.push(join);
return join->promise.future();
}
@@ -145,14 +148,14 @@ Future<Group::Membership> GroupProcess::join(const string& data)
// client can assume a happens-before ordering of operations (i.e.,
// the first request will happen before the second, etc).
- Result<Group::Membership> membership = doJoin(data);
+ Result<Group::Membership> membership = doJoin(data, label);
if (membership.isNone()) { // Try again later.
if (!retrying) {
delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
retrying = true;
}
- Join* join = new Join(data);
+ Join* join = new Join(data, label);
pending.joins.push(join);
return join->promise.future();
} else if (membership.isError()) {
@@ -526,7 +529,9 @@ void GroupProcess::deleted(const string& path)
}
-Result<Group::Membership> GroupProcess::doJoin(const string& data)
+Result<Group::Membership> GroupProcess::doJoin(
+ const string& data,
+ const Option<string>& label)
{
CHECK_EQ(state, READY);
@@ -534,8 +539,12 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
// the specified data as it's contents.
string result;
- int code = zk->create(znode + "/", data, acl,
- ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
+ int code = zk->create(
+ znode + "/" + (label.isSome() ? (label.get() + "_") : ""),
+ data,
+ acl,
+ ZOO_SEQUENCE | ZOO_EPHEMERAL,
+ &result);
if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
@@ -551,19 +560,24 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
memberships = None();
// Save the sequence number but only grab the basename. Example:
- // "/path/to/znode/0000000131" => "0000000131".
+ // "/path/to/znode/label_0000000131" => "0000000131".
Try<string> basename = os::basename(result);
if (basename.isError()) {
return Error("Failed to get the sequence number: " + basename.error());
}
- Try<int32_t> sequence = numify<int32_t>(basename.get());
+ // Strip the label before grabbing the sequence number.
+ string node = label.isSome()
+ ? strings::remove(basename.get(), label.get() + "_")
+ : basename.get();
+
+ Try<int32_t> sequence = numify<int32_t>(node);
CHECK_SOME(sequence);
Promise<bool>* cancelled = new Promise<bool>();
owned[sequence.get()] = cancelled;
- return Group::Membership(sequence.get(), cancelled->future());
+ return Group::Membership(sequence.get(), label, cancelled->future());
}
@@ -571,11 +585,7 @@ Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
{
CHECK_EQ(state, READY);
- Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
-
- CHECK_SOME(sequence);
-
- string path = znode + "/" + sequence.get();
+ string path = path::join(znode, zkBasename(membership));
LOG(INFO) << "Trying to remove '" << path << "' in ZooKeeper";
@@ -614,11 +624,7 @@ Result<string> GroupProcess::doData(const Group::Membership& membership)
{
CHECK_EQ(state, READY);
- Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
-
- CHECK_SOME(sequence);
-
- string path = znode + "/" + sequence.get();
+ string path = path::join(znode, zkBasename(membership));
LOG(INFO) << "Trying to get '" << path << "' in ZooKeeper";
@@ -654,15 +660,21 @@ Try<bool> GroupProcess::cache()
CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
return false;
} else if (code != ZOK) {
- return Error("Non-retryable error attempting to get children of '" + znode +
- "' in ZooKeeper: " + zk->message(code));
+ return Error("Non-retryable error attempting to get children of '" + znode
+ + "' in ZooKeeper: " + zk->message(code));
}
- // Convert results to sequence numbers.
- set<int32_t> sequences;
+ // Convert results to sequence numbers and (optionally) labels.
+ hashmap<int32_t, Option<string> > sequences;
foreach (const string& result, results) {
- Try<int32_t> sequence = numify<int32_t>(result);
+ vector<string> tokens = strings::tokenize(result, "_");
+ Option<string> label = None();
+ if (tokens.size() > 1) {
+ label = tokens[0];
+ }
+
+ Try<int32_t> sequence = numify<int32_t>(tokens.back());
// Skip it if it couldn't be converted to a number.
if (sequence.isError()) {
@@ -671,39 +683,43 @@ Try<bool> GroupProcess::cache()
continue;
}
- sequences.insert(sequence.get());
+ sequences[sequence.get()] = label;
}
// Cache current memberships, cancelling those that are now missing.
set<Group::Membership> current;
foreachpair (int32_t sequence, Promise<bool>* cancelled, utils::copy(owned)) {
- if (sequences.count(sequence) == 0) {
+ if (!sequences.contains(sequence)) {
cancelled->set(false);
owned.erase(sequence); // Okay since iterating over a copy.
delete cancelled;
} else {
- current.insert(Group::Membership(sequence, cancelled->future()));
+ current.insert(Group::Membership(
+ sequence, sequences[sequence], cancelled->future()));
+
sequences.erase(sequence);
}
}
foreachpair (int32_t sequence, Promise<bool>* cancelled, utils::copy(unowned)) {
- if (sequences.count(sequence) == 0) {
+ if (!sequences.contains(sequence)) {
cancelled->set(false);
unowned.erase(sequence); // Okay since iterating over a copy.
delete cancelled;
} else {
- current.insert(Group::Membership(sequence, cancelled->future()));
+ current.insert(Group::Membership(
+ sequence, sequences[sequence], cancelled->future()));
+
sequences.erase(sequence);
}
}
// Add any remaining (i.e., unexpected) sequences.
- foreach (int32_t sequence, sequences) {
+ foreachpair (int32_t sequence, const Option<string>& label, sequences) {
Promise<bool>* cancelled = new Promise<bool>();
unowned[sequence] = cancelled;
- current.insert(Group::Membership(sequence, cancelled->future()));
+ current.insert(Group::Membership(sequence, label, cancelled->future()));
}
memberships = current;
@@ -759,7 +775,7 @@ Try<bool> GroupProcess::sync()
// Do joins.
while (!pending.joins.empty()) {
Join* join = pending.joins.front();
- Result<Group::Membership> membership = doJoin(join->data);
+ Result<Group::Membership> membership = doJoin(join->data, join->label);
if (membership.isNone()) {
return false; // Try again later.
} else if (membership.isError()) {
@@ -877,6 +893,17 @@ void GroupProcess::abort(const string& message)
}
+string GroupProcess::zkBasename(const Group::Membership& membership)
+{
+ Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
+ CHECK_SOME(sequence);
+
+ return membership.label.isSome()
+ ? (membership.label.get() + "_" + sequence.get())
+ : sequence.get();
+}
+
+
Group::Group(const string& servers,
const Duration& timeout,
const string& znode,
@@ -903,9 +930,11 @@ Group::~Group()
}
-Future<Group::Membership> Group::join(const string& data)
+Future<Group::Membership> Group::join(
+ const string& data,
+ const Option<string>& label)
{
- return dispatch(process, &GroupProcess::join, data);
+ return dispatch(process, &GroupProcess::join, data, label);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index 354229f..1ce1519 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -3,13 +3,16 @@
#include <map>
#include <set>
+#include <string>
#include "process/future.hpp"
#include "process/timer.hpp"
+#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
+#include <stout/try.hpp>
#include "zookeeper/authentication.hpp"
#include "zookeeper/url.hpp"
@@ -85,10 +88,13 @@ public:
private:
friend class GroupProcess; // Creates and manages memberships.
- Membership(int32_t _sequence, const process::Future<bool>& cancelled)
- : sequence(_sequence), cancelled_(cancelled) {}
+ Membership(int32_t _sequence,
+ const Option<std::string>& _label,
+ const process::Future<bool>& cancelled)
+ : sequence(_sequence), label(_label), cancelled_(cancelled) {}
const int32_t sequence;
+ const Option<std::string> label;
process::Future<bool> cancelled_;
};
@@ -103,13 +109,16 @@ public:
~Group();
- // Returns the result of trying to join a "group" in ZooKeeper. If
- // succesful, an "owned" membership will be returned whose
- // retrievable data will be a copy of the specified parameter. A
- // membership is not "renewed" in the event of a ZooKeeper session
- // expiration. Instead, a client should watch the group memberships
- // and rejoin the group as appropriate.
- process::Future<Membership> join(const std::string& data);
+ // Returns the result of trying to join a "group" in ZooKeeper.
+ // If "label" is provided the newly created znode contains "label_"
+ // as the prefix. If join is successful, an "owned" membership will
+ // be returned whose retrievable data will be a copy of the
+ // specified parameter. A membership is not "renewed" in the event
+ // of a ZooKeeper session expiration. Instead, a client should watch
+ // the group memberships and rejoin the group as appropriate.
+ process::Future<Membership> join(
+ const std::string& data,
+ const Option<std::string>& label = None());
// Returns the result of trying to cancel a membership. Note that
// only memberships that are "owned" (see join) can be canceled.
@@ -150,8 +159,14 @@ public:
static const Duration RETRY_INTERVAL;
+ // Helper function that returns the basename of the znode of
+ // the membership.
+ static std::string zkBasename(const Group::Membership& membership);
+
// Group implementation.
- process::Future<Group::Membership> join(const std::string& data);
+ process::Future<Group::Membership> join(
+ const std::string& data,
+ const Option<std::string>& label);
process::Future<bool> cancel(const Group::Membership& membership);
process::Future<std::string> data(const Group::Membership& membership);
process::Future<std::set<Group::Membership> > watch(
@@ -167,7 +182,9 @@ public:
void deleted(const std::string& path);
private:
- Result<Group::Membership> doJoin(const std::string& data);
+ Result<Group::Membership> doJoin(
+ const std::string& data,
+ const Option<std::string>& label);
Result<bool> doCancel(const Group::Membership& membership);
Result<std::string> doData(const Group::Membership& membership);
@@ -234,8 +251,10 @@ private:
struct Join
{
- Join(const std::string& _data) : data(_data) {}
+ Join(const std::string& _data, const Option<std::string>& _label)
+ : data(_data), label(_label) {}
std::string data;
+ const Option<std::string> label;
process::Promise<Group::Membership> promise;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/zookeeper/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/zookeeper.hpp b/src/zookeeper/zookeeper.hpp
index 1b4e2ed..f50aca6 100644
--- a/src/zookeeper/zookeeper.hpp
+++ b/src/zookeeper/zookeeper.hpp
@@ -187,12 +187,13 @@ public:
* ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
* ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
*/
- int create(const std::string &path,
- const std::string &data,
- const ACL_vector &acl,
- int flags,
- std::string *result,
- bool recursive = false);
+ int create(
+ const std::string &path,
+ const std::string &data,
+ const ACL_vector &acl,
+ int flags,
+ std::string *result,
+ bool recursive = false);
/**
* \brief delete a node in zookeeper synchronously.
@@ -253,10 +254,11 @@ public:
* ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
* ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
*/
- int get(const std::string &path,
- bool watch,
- std::string *result,
- Stat *stat);
+ int get(
+ const std::string &path,
+ bool watch,
+ std::string *result,
+ Stat *stat);
/**
* \brief lists the children of a node synchronously.
@@ -274,9 +276,10 @@ public:
* ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
* ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
*/
- int getChildren(const std::string &path,
- bool watch,
- std::vector<std::string> *results);
+ int getChildren(
+ const std::string &path,
+ bool watch,
+ std::vector<std::string> *results);
/**
* \brief sets the data associated with a node.
[3/4] git commit: Updated master and contender to write znode(s) in
the new (labelled) format.
Posted by vi...@apache.org.
Updated master and contender to write znode(s) in the new (labelled)
format.
Review: https://reviews.apache.org/r/17173
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/586e7eb6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/586e7eb6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/586e7eb6
Branch: refs/heads/master
Commit: 586e7eb65d6d376743725f64a481150171d44916
Parents: 4382a0f
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jan 21 20:12:46 2014 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jan 22 15:51:12 2014 -0800
----------------------------------------------------------------------
src/master/contender.cpp | 28 ++++++++-------
src/master/contender.hpp | 12 ++++---
src/master/master.cpp | 3 +-
src/tests/master_contender_detector_tests.cpp | 41 ++++++++++++++--------
src/tests/zookeeper_tests.cpp | 12 ++++---
src/zookeeper/contender.cpp | 29 +++++++++------
src/zookeeper/contender.hpp | 8 +++--
7 files changed, 84 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/master/contender.cpp
----------------------------------------------------------------------
diff --git a/src/master/contender.cpp b/src/master/contender.cpp
index 89e368b..e3b0737 100644
--- a/src/master/contender.cpp
+++ b/src/master/contender.cpp
@@ -22,6 +22,7 @@
#include <stout/check.hpp>
#include <stout/lambda.hpp>
+#include "master/constants.hpp"
#include "master/contender.hpp"
#include "master/master.hpp"
@@ -54,7 +55,7 @@ public:
// Explicitely use 'initialize' since we're overloading below.
using process::ProcessBase::initialize;
- void initialize(const PID<Master>& master);
+ void initialize(const MasterInfo& masterInfo);
// MasterContender implementation.
virtual Future<Future<Nothing> > contend();
@@ -64,7 +65,7 @@ private:
LeaderContender* contender;
// The master this contender contends on behalf of.
- Option<PID<Master> > master;
+ Option<MasterInfo> masterInfo;
Option<Future<Future<Nothing> > > candidacy;
};
@@ -109,8 +110,7 @@ StandaloneMasterContender::~StandaloneMasterContender()
}
-void StandaloneMasterContender::initialize(
- const PID<master::Master>& master)
+void StandaloneMasterContender::initialize(const MasterInfo& masterInfo)
{
// We don't really need to store the master in this basic
// implementation so we just restore an 'initialized' flag to make
@@ -161,10 +161,9 @@ ZooKeeperMasterContender::~ZooKeeperMasterContender()
}
-void ZooKeeperMasterContender::initialize(
- const PID<master::Master>& master)
+void ZooKeeperMasterContender::initialize(const MasterInfo& masterInfo)
{
- process->initialize(master);
+ process->initialize(masterInfo);
}
@@ -191,16 +190,15 @@ ZooKeeperMasterContenderProcess::~ZooKeeperMasterContenderProcess()
delete contender;
}
-void ZooKeeperMasterContenderProcess::initialize(
- const PID<Master>& _master)
+void ZooKeeperMasterContenderProcess::initialize(const MasterInfo& _masterInfo)
{
- master = _master;
+ masterInfo = _masterInfo;
}
Future<Future<Nothing> > ZooKeeperMasterContenderProcess::contend()
{
- if (master.isNone()) {
+ if (masterInfo.isNone()) {
return Failure("Initialize the contender first");
}
@@ -214,7 +212,13 @@ Future<Future<Nothing> > ZooKeeperMasterContenderProcess::contend()
delete contender;
}
- contender = new LeaderContender(group.get(), master.get());
+ // Serialize the MasterInfo to string.
+ string data;
+ if (!masterInfo.get().SerializeToString(&data)) {
+ return Failure("Failed to serialize data to MasterInfo");
+ }
+
+ contender = new LeaderContender(group.get(), data, master::MASTER_INFO_LABEL);
candidacy = contender->contend();
return candidacy.get();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/master/contender.hpp
----------------------------------------------------------------------
diff --git a/src/master/contender.hpp b/src/master/contender.hpp
index 2a7e7c4..0048ee0 100644
--- a/src/master/contender.hpp
+++ b/src/master/contender.hpp
@@ -27,6 +27,8 @@
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
+#include "messages/messages.hpp"
+
#include "zookeeper/contender.hpp"
#include "zookeeper/group.hpp"
#include "zookeeper/url.hpp"
@@ -63,9 +65,9 @@ public:
// to be cancelled during destruction.
virtual ~MasterContender() = 0;
- // Initializes the contender with the PID of the master it contends
- // on behalf of.
- virtual void initialize(const process::PID<master::Master>& master) = 0;
+ // Initializes the contender with the MasterInfo of the master it
+ // contends on behalf of.
+ virtual void initialize(const MasterInfo& masterInfo) = 0;
// Returns a Future<Nothing> once the contender has entered the
// contest (by obtaining a membership) and an error otherwise.
@@ -94,7 +96,7 @@ public:
virtual ~StandaloneMasterContender();
// MasterContender implementation.
- virtual void initialize(const process::PID<master::Master>& master);
+ virtual void initialize(const MasterInfo& masterInfo);
// In this basic implementation the outer Future directly returns
// and inner Future stays pending because there is only one
@@ -118,7 +120,7 @@ public:
virtual ~ZooKeeperMasterContender();
// MasterContender implementation.
- virtual void initialize(const process::PID<master::Master>& master);
+ virtual void initialize(const MasterInfo& masterInfo);
virtual process::Future<process::Future<Nothing> > contend();
private:
http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 008033e..c7d9186 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -294,6 +294,7 @@ void Master::initialize()
info.set_id(id.get());
info.set_ip(self().ip);
info.set_port(self().port);
+ info.set_pid(self());
LOG(INFO) << "Master ID: " << info.id();
@@ -543,7 +544,7 @@ void Master::initialize()
}
}
- contender->initialize(self());
+ contender->initialize(info);
// Start contending to be a leading master and detecting the current
// leader.
http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index a739f89..5223200 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -80,6 +80,19 @@ using testing::AtMost;
using testing::Return;
+// Helper function that creates a MasterInfo from PID<Master>.
+static MasterInfo createMasterInfo(const PID<Master>& master)
+{
+ MasterInfo masterInfo;
+ masterInfo.set_id(UUID::random().toString());
+ masterInfo.set_ip(master.ip);
+ masterInfo.set_port(master.port);
+ masterInfo.set_pid(master);
+
+ return masterInfo;
+}
+
+
class MasterContenderDetectorTest : public MesosTest {};
@@ -133,7 +146,7 @@ TEST(BasicMasterContenderDetectorTest, Contender)
MasterContender* contender = new StandaloneMasterContender();
- contender->initialize(master);
+ contender->initialize(createMasterInfo(master));
Future<Future<Nothing> > contended = contender->contend();
AWAIT_READY(contended);
@@ -190,7 +203,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender)
master.ip = 10000000;
master.port = 10000;
- contender->initialize(master);
+ contender->initialize(createMasterInfo(master));
Future<Future<Nothing> > contended = contender->contend();
AWAIT_READY(contended);
@@ -228,7 +241,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderPendingElection)
master.ip = 10000000;
master.port = 10000;
- contender.initialize(master);
+ contender.initialize(createMasterInfo(master));
// Drop Group::join so that 'contended' will stay pending.
Future<Nothing> join = DROP_DISPATCH(_, &GroupProcess::join);
@@ -281,7 +294,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
master1.ip = 10000000;
master1.port = 10000;
- contender1->initialize(master1);
+ contender1->initialize(createMasterInfo(master1));
Future<Future<Nothing> > contended1 = contender1->contend();
AWAIT_READY(contended1);
@@ -298,7 +311,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
master2.ip = 10000001;
master2.port = 10001;
- contender2.initialize(master2);
+ contender2.initialize(createMasterInfo(master2));
Future<Future<Nothing> > contended2 = contender2.contend();
AWAIT_READY(contended2);
@@ -343,7 +356,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, NonRetryableFrrors)
"/mesos",
zookeeper::Authentication("digest", "member:wrongpass")));
ZooKeeperMasterContender contender(group2);
- contender.initialize(master);
+ contender.initialize(createMasterInfo(master));
// Fails due to authentication error.
AWAIT_FAILED(contender.contend());
@@ -399,7 +412,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
master.ip = 10000000;
master.port = 10000;
- contender.initialize(master);
+ contender.initialize(createMasterInfo(master));
Future<Future<Nothing> > contended = contender.contend();
AWAIT_READY(contended);
@@ -474,7 +487,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
leader.ip = 10000000;
leader.port = 10000;
- leaderContender.initialize(leader);
+ leaderContender.initialize(createMasterInfo(leader));
Future<Future<Nothing> > contended = leaderContender.contend();
AWAIT_READY(contended);
@@ -494,7 +507,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
follower.ip = 10000001;
follower.port = 10001;
- followerContender.initialize(follower);
+ followerContender.initialize(createMasterInfo(follower));
contended = followerContender.contend();
AWAIT_READY(contended);
@@ -586,7 +599,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
ZooKeeperMasterContender leaderContender(group);
- leaderContender.initialize(leader);
+ leaderContender.initialize(createMasterInfo(leader));
Future<Future<Nothing> > leaderContended = leaderContender.contend();
AWAIT_READY(leaderContended);
@@ -610,7 +623,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
ZooKeeperMasterDetector followerDetector(url.get());
ZooKeeperMasterContender followerContender(url.get());
- followerContender.initialize(follower);
+ followerContender.initialize(createMasterInfo(follower));
Future<Future<Nothing> > followerContended = followerContender.contend();
AWAIT_READY(followerContended);
@@ -655,7 +668,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession)
master.port = 10000;
ZooKeeperMasterContender masterContender(url.get());
- masterContender.initialize(master);
+ masterContender.initialize(createMasterInfo(master));
Future<Future<Nothing> > leaderContended = masterContender.contend();
AWAIT_READY(leaderContended);
@@ -714,7 +727,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
leader.ip = 10000000;
leader.port = 10000;
- leaderContender.initialize(leader);
+ leaderContender.initialize(createMasterInfo(leader));
Future<Future<Nothing> > contended = leaderContender.contend();
AWAIT_READY(contended);
@@ -733,7 +746,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
follower.ip = 10000001;
follower.port = 10001;
- followerContender.initialize(follower);
+ followerContender.initialize(createMasterInfo(follower));
contended = followerContender.contend();
AWAIT_READY(contended);
http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/tests/zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper_tests.cpp b/src/tests/zookeeper_tests.cpp
index c7fa750..615338a 100644
--- a/src/tests/zookeeper_tests.cpp
+++ b/src/tests/zookeeper_tests.cpp
@@ -29,6 +29,8 @@
#include <stout/gtest.hpp>
#include <stout/strings.hpp>
+#include "master/constants.hpp"
+
#include "zookeeper/authentication.hpp"
#include "zookeeper/contender.hpp"
#include "zookeeper/detector.hpp"
@@ -274,7 +276,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
Group group(server->connectString(), timeout, "/test/");
Owned<LeaderContender> contender(
- new LeaderContender(&group, "candidate 1"));
+ new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
// Calling withdraw before contending returns 'false' because there
// is nothing to withdraw.
@@ -292,7 +294,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
// Normal workflow.
contender = Owned<LeaderContender>(
- new LeaderContender(&group, "candidate 1"));
+ new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
Future<Future<Nothing> > candidated = contender->contend();
AWAIT_READY(candidated);
@@ -320,7 +322,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
// Contend again.
contender = Owned<LeaderContender>(
- new LeaderContender(&group, "candidate 1"));
+ new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
candidated = contender->contend();
AWAIT_READY(connected);
@@ -344,7 +346,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
// Contend (3) and shutdown the network this time.
contender = Owned<LeaderContender>(
- new LeaderContender(&group, "candidate 1"));
+ new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
candidated = contender->contend();
AWAIT_READY(candidated);
lostCandidacy = candidated.get();
@@ -369,7 +371,7 @@ TEST_F(ZooKeeperTest, LeaderContender)
// Contend again (4).
contender = Owned<LeaderContender>(
- new LeaderContender(&group, "candidate 1"));
+ new LeaderContender(&group, "candidate 1", master::MASTER_INFO_LABEL));
candidated = contender->contend();
AWAIT_READY(candidated);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/zookeeper/contender.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/contender.cpp b/src/zookeeper/contender.cpp
index d8a5201..6710da4 100644
--- a/src/zookeeper/contender.cpp
+++ b/src/zookeeper/contender.cpp
@@ -8,6 +8,7 @@
#include <stout/check.hpp>
#include <stout/lambda.hpp>
#include <stout/option.hpp>
+#include <stout/some.hpp>
#include "zookeeper/contender.hpp"
#include "zookeeper/detector.hpp"
@@ -23,7 +24,11 @@ namespace zookeeper {
class LeaderContenderProcess : public Process<LeaderContenderProcess>
{
public:
- LeaderContenderProcess(Group* group, const std::string& data);
+ LeaderContenderProcess(
+ Group* group,
+ const string& data,
+ const Option<string>& label);
+
virtual ~LeaderContenderProcess();
// LeaderContender implementation.
@@ -45,6 +50,7 @@ private:
Group* group;
const string data;
+ const Option<string> label;
// The contender's state transitions from contending -> watching ->
// withdrawing or contending -> withdrawing. Each state is
@@ -70,9 +76,9 @@ private:
LeaderContenderProcess::LeaderContenderProcess(
Group* _group,
- const string& _data)
- : group(_group),
- data(_data) {}
+ const string& _data,
+ const Option<string>& _label)
+ : group(_group), data(_data), label(_label) {}
LeaderContenderProcess::~LeaderContenderProcess()
@@ -118,8 +124,8 @@ Future<Future<Nothing> > LeaderContenderProcess::contend()
return Failure("Cannot contend more than once");
}
- LOG(INFO) << "Joining the ZK group with data: '" << data << "'";
- candidacy = group->join(data);
+ LOG(INFO) << "Joining the ZK group";
+ candidacy = group->join(data, label);
candidacy
.onAny(defer(self(), &Self::joined));
@@ -234,8 +240,8 @@ void LeaderContenderProcess::joined()
return;
}
- LOG(INFO) << "New candidate (id='" << candidacy.get().id() << "', data='"
- << data << "') has entered the contest for leadership";
+ LOG(INFO) << "New candidate (id='" << candidacy.get().id()
+ << "') has entered the contest for leadership";
// Transition to 'watching' state.
watching = new Promise<Nothing>();
@@ -250,9 +256,12 @@ void LeaderContenderProcess::joined()
}
-LeaderContender::LeaderContender(Group* group, const string& data)
+LeaderContender::LeaderContender(
+ Group* group,
+ const string& data,
+ const Option<string>& label)
{
- process = new LeaderContenderProcess(group, data);
+ process = new LeaderContenderProcess(group, data, label);
spawn(process);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/586e7eb6/src/zookeeper/contender.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/contender.hpp b/src/zookeeper/contender.hpp
index e526e2d..6529245 100644
--- a/src/zookeeper/contender.hpp
+++ b/src/zookeeper/contender.hpp
@@ -6,6 +6,7 @@
#include <process/future.hpp>
#include <stout/nothing.hpp>
+#include <stout/option.hpp>
#include "zookeeper/group.hpp"
@@ -25,8 +26,11 @@ class LeaderContender
public:
// The specified 'group' is expected to outlive the contender. The
// specified 'data' is associated with the group membership created
- // by this contender.
- LeaderContender(Group* group, const std::string& data);
+ // by this contender. 'label' indicates the label for the znode that
+ // stores the 'data'.
+ LeaderContender(Group* group,
+ const std::string& data,
+ const Option<std::string>& label);
// Note that the contender's membership, if obtained, is scheduled
// to be cancelled during destruction.
[4/4] git commit: Updated detector to understand old (unlabelled) and
new (labelled) master znode formats.
Posted by vi...@apache.org.
Updated detector to understand old (unlabelled) and new (labelled)
master znode formats.
Review: https://reviews.apache.org/r/17172
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4382a0f6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4382a0f6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4382a0f6
Branch: refs/heads/master
Commit: 4382a0f63c29145bc540e6abb1f9f289fb8b1311
Parents: 326172e
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jan 21 20:11:25 2014 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jan 22 15:51:12 2014 -0800
----------------------------------------------------------------------
include/mesos/mesos.proto | 1 +
src/master/constants.cpp | 3 +++
src/master/constants.hpp | 5 +++++
src/master/detector.cpp | 41 +++++++++++++++++++++++++++++++++++------
src/zookeeper/detector.cpp | 2 +-
src/zookeeper/group.cpp | 4 ++--
src/zookeeper/group.hpp | 9 +++++++--
7 files changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 655f867..1503e73 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -160,6 +160,7 @@ message MasterInfo {
required string id = 1;
required uint32 ip = 2;
required uint32 port = 3 [default = 5050];
+ optional string pid = 4;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/master/constants.cpp
----------------------------------------------------------------------
diff --git a/src/master/constants.cpp b/src/master/constants.cpp
index 0b7c9f7..8a48bbb 100644
--- a/src/master/constants.cpp
+++ b/src/master/constants.cpp
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+#include <string>
+
#include <stout/bytes.hpp>
#include "master/constants.hpp"
@@ -33,6 +35,7 @@ const uint32_t MAX_COMPLETED_FRAMEWORKS = 50;
const uint32_t MAX_COMPLETED_TASKS_PER_FRAMEWORK = 1000;
const Duration WHITELIST_WATCH_INTERVAL = Seconds(5);
const uint32_t TASK_LIMIT = 100;
+const std::string MASTER_INFO_LABEL = "info";
} // namespace mesos {
} // namespace internal {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/master/constants.hpp
----------------------------------------------------------------------
diff --git a/src/master/constants.hpp b/src/master/constants.hpp
index 8498c9b..cdaaad0 100644
--- a/src/master/constants.hpp
+++ b/src/master/constants.hpp
@@ -21,6 +21,8 @@
#include <stdint.h>
+#include <string>
+
#include <stout/duration.hpp>
namespace mesos {
@@ -65,6 +67,9 @@ extern const Duration WHITELIST_WATCH_INTERVAL;
// Default number of tasks (limit) for /master/tasks.json endpoint
extern const uint32_t TASK_LIMIT;
+// Label used by the Leader Contender and Detector.
+extern const std::string MASTER_INFO_LABEL;
+
} // namespace mesos {
} // namespace internal {
} // namespace master {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/master/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector.cpp b/src/master/detector.cpp
index cf337cf..2b169c5 100644
--- a/src/master/detector.cpp
+++ b/src/master/detector.cpp
@@ -29,8 +29,11 @@
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
+#include "master/constants.hpp"
#include "master/detector.hpp"
+#include "messages/messages.hpp"
+
#include "zookeeper/detector.hpp"
#include "zookeeper/group.hpp"
#include "zookeeper/url.hpp"
@@ -46,7 +49,6 @@ namespace internal {
const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10);
-
class StandaloneMasterDetectorProcess
: public Process<StandaloneMasterDetectorProcess>
{
@@ -81,7 +83,7 @@ private:
void detected(const Future<Option<Group::Membership> >& leader);
// Invoked when we have fetched the data associated with the leader.
- void fetched(const Future<string>& data);
+ void fetched(const Group::Membership& membership, const Future<string>& data);
Owned<Group> group;
LeaderDetector detector;
@@ -294,7 +296,7 @@ void ZooKeeperMasterDetectorProcess::detected(
} else {
// Fetch the data associated with the leader.
group->data(_leader.get().get())
- .onAny(defer(self(), &Self::fetched, lambda::_1));
+ .onAny(defer(self(), &Self::fetched, _leader.get().get(), lambda::_1));
}
// Keep trying to detect leadership changes.
@@ -303,7 +305,9 @@ void ZooKeeperMasterDetectorProcess::detected(
}
-void ZooKeeperMasterDetectorProcess::fetched(const Future<string>& data)
+void ZooKeeperMasterDetectorProcess::fetched(
+ const Group::Membership& membership,
+ const Future<string>& data)
{
CHECK(!data.isDiscarded());
@@ -317,8 +321,33 @@ void ZooKeeperMasterDetectorProcess::fetched(const Future<string>& data)
return;
}
- // Cache the master for subsequent requests.
- leader = UPID(data.get());
+ // Parse the data based on the membership label and cache the
+ // leader for subsequent requests.
+ Option<string> label = membership.label();
+ if (label.isNone()) {
+ leader = UPID(data.get());
+ } else if (label.isSome() && label.get() == master::MASTER_INFO_LABEL) {
+ MasterInfo info;
+ if (!info.ParseFromString(data.get())) {
+ leader = None();
+ foreach (Promise<Option<UPID> >* promise, promises) {
+ promise->fail("Failed to parse data into MasterInfo");
+ delete promise;
+ }
+ promises.clear();
+ return;
+ }
+ leader = UPID(info.pid());
+ } else {
+ leader = None();
+ foreach (Promise<Option<UPID> >* promise, promises) {
+ promise->fail("Failed to parse data of unknown label " + label.get());
+ delete promise;
+ }
+ promises.clear();
+ return;
+ }
+
LOG(INFO) << "A new leading master (UPID=" << leader.get() << ") is detected";
foreach (Promise<Option<UPID> >* promise, promises) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/zookeeper/detector.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/detector.cpp b/src/zookeeper/detector.cpp
index 2759a2f..e186e51 100644
--- a/src/zookeeper/detector.cpp
+++ b/src/zookeeper/detector.cpp
@@ -133,7 +133,7 @@ void LeaderDetectorProcess::watched(const Future<set<Group::Membership> >& membe
if (current != leader) {
LOG(INFO) << "Detected a new leader: "
<< (current.isSome()
- ? "'(id='" + stringify(current.get().id()) + "')"
+ ? "(id='" + stringify(current.get().id()) + "')"
: "None");
foreach (Promise<Option<Group::Membership> >* promise, promises) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index a50da22..ecb6c00 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -898,8 +898,8 @@ string GroupProcess::zkBasename(const Group::Membership& membership)
Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
CHECK_SOME(sequence);
- return membership.label.isSome()
- ? (membership.label.get() + "_" + sequence.get())
+ return membership.label_.isSome()
+ ? (membership.label_.get() + "_" + sequence.get())
: sequence.get();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4382a0f6/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index 1ce1519..e51ebb2 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -74,6 +74,11 @@ public:
return sequence;
}
+ Option<std::string> label() const
+ {
+ return label_;
+ }
+
// Returns a future that is only satisfied once this membership
// has been cancelled. In which case, the value of the future is
// true if you own this membership and cancelled it by invoking
@@ -91,10 +96,10 @@ public:
Membership(int32_t _sequence,
const Option<std::string>& _label,
const process::Future<bool>& cancelled)
- : sequence(_sequence), label(_label), cancelled_(cancelled) {}
+ : sequence(_sequence), label_(_label), cancelled_(cancelled) {}
const int32_t sequence;
- const Option<std::string> label;
+ const Option<std::string> label_;
process::Future<bool> cancelled_;
};