You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/12/20 01:32:03 UTC
[6/7] git commit: Changed master detector to return Future
Changed master detector to return Future<Option> instead of
Future<Result>.
Now that local session timeouts are transparent to the clients of the
master detector, they are not supposed to retry anymore. Thus passing
"Result<UPID> previous" back into MasterDetector::detect() is no
longer correct.
From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/16291
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e1767e38
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e1767e38
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e1767e38
Branch: refs/heads/master
Commit: e1767e384db6ab187e69f5b2ed0f56995103337b
Parents: aa75551
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Dec 19 16:09:41 2013 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Thu Dec 19 16:09:41 2013 -0800
----------------------------------------------------------------------
src/cli/resolve.cpp | 8 +-
src/master/detector.cpp | 116 +++++++++------------
src/master/detector.hpp | 43 +++-----
src/master/master.cpp | 12 +--
src/master/master.hpp | 4 +-
src/sched/sched.cpp | 19 ++--
src/slave/slave.cpp | 23 ++--
src/slave/slave.hpp | 4 +-
src/tests/master_contender_detector_tests.cpp | 85 ++++++++++++---
src/tests/zookeeper_tests.cpp | 4 +-
src/zookeeper/detector.cpp | 69 +++++-------
src/zookeeper/detector.hpp | 15 +--
12 files changed, 208 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/cli/resolve.cpp
----------------------------------------------------------------------
diff --git a/src/cli/resolve.cpp b/src/cli/resolve.cpp
index dddadfc..0ebb0c6 100644
--- a/src/cli/resolve.cpp
+++ b/src/cli/resolve.cpp
@@ -124,16 +124,16 @@ int main(int argc, char** argv)
return -1;
}
- Future<Result<UPID> > pid = detector.get()->detect();
+ Future<Option<UPID> > pid = detector.get()->detect();
if (!pid.await(timeout)) {
cerr << "Failed to detect master from '" << master.get()
<< "' within " << timeout << endl;
return -1;
} else {
- // Not expecting detect() to fail or discard the future.
- CHECK(pid.isReady());
- if (pid.get().isError()) {
+ CHECK(!pid.isDiscarded());
+
+ if (pid.isFailed()) {
cerr << "Failed to detect master from '" << master.get()
<< "': " << pid.failure() << endl;
return -1;
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/master/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector.cpp b/src/master/detector.cpp
index 2f73f66..7b437ac 100644
--- a/src/master/detector.cpp
+++ b/src/master/detector.cpp
@@ -54,20 +54,17 @@ class StandaloneMasterDetectorProcess
: public Process<StandaloneMasterDetectorProcess>
{
public:
- StandaloneMasterDetectorProcess() : leader(None()) {}
+ StandaloneMasterDetectorProcess() {}
StandaloneMasterDetectorProcess(const UPID& _leader)
: leader(_leader) {}
~StandaloneMasterDetectorProcess();
- void appoint(const Result<UPID>& leader);
- Future<Result<UPID> > detect(const Result<UPID>& previous = None());
+ void appoint(const Option<UPID>& leader);
+ Future<Option<UPID> > detect(const Option<UPID>& previous = None());
private:
- // The leading master that's directly 'appoint()'ed.
- Result<UPID> leader;
-
- // Promises for the detection result.
- set<Promise<Result<UPID> >*> promises;
+ Option<UPID> leader; // The appointed master.
+ set<Promise<Option<UPID> >*> promises;
};
@@ -80,13 +77,11 @@ public:
~ZooKeeperMasterDetectorProcess();
virtual void initialize();
-
- // ZooKeeperMasterDetector implementation.
- Future<Result<UPID> > detect(const Result<UPID>& previous);
+ Future<Option<UPID> > detect(const Option<UPID>& previous);
private:
// Invoked when the group leadership has changed.
- void detected(Future<Result<Group::Membership> > leader);
+ void detected(Future<Option<Group::Membership> > leader);
// Invoked when we have fetched the data associated with the leader.
void fetched(const Future<string>& data);
@@ -95,8 +90,8 @@ private:
LeaderDetector detector;
// The leading Master.
- Result<UPID> leader;
- set<Promise<Result<UPID> >*> promises;
+ Option<UPID> leader;
+ set<Promise<Option<UPID> >*> promises;
};
@@ -143,8 +138,8 @@ MasterDetector::~MasterDetector() {}
StandaloneMasterDetectorProcess::~StandaloneMasterDetectorProcess()
{
- foreach (Promise<Result<UPID> >* promise, promises) {
- promise->set(Result<UPID>::error("MasterDetector is being destructed"));
+ foreach (Promise<Option<UPID> >* promise, promises) {
+ promise->future().discard();
delete promise;
}
promises.clear();
@@ -152,11 +147,11 @@ StandaloneMasterDetectorProcess::~StandaloneMasterDetectorProcess()
void StandaloneMasterDetectorProcess::appoint(
- const Result<process::UPID>& _leader)
+ const Option<process::UPID>& _leader)
{
leader = _leader;
- foreach (Promise<Result<UPID> >* promise, promises) {
+ foreach (Promise<Option<UPID> >* promise, promises) {
promise->set(leader);
delete promise;
}
@@ -164,21 +159,14 @@ void StandaloneMasterDetectorProcess::appoint(
}
-Future<Result<UPID> > StandaloneMasterDetectorProcess::detect(
- const Result<UPID>& previous)
+Future<Option<UPID> > StandaloneMasterDetectorProcess::detect(
+ const Option<UPID>& previous)
{
- // Directly return the current leader is not the
- // same as the previous one.
- if (leader.isError() != previous.isError() ||
- leader.isNone() != previous.isNone() ||
- leader.isSome() != previous.isSome()) {
- return leader; // State change.
- } else if (leader.isSome() && previous.isSome() &&
- leader.get() != previous.get()) {
- return leader; // Leadership change.
+ if (leader != previous) {
+ return leader;
}
- Promise<Result<UPID> >* promise = new Promise<Result<UPID> >();
+ Promise<Option<UPID> >* promise = new Promise<Option<UPID> >();
promises.insert(promise);
return promise->future();
}
@@ -206,14 +194,14 @@ StandaloneMasterDetector::~StandaloneMasterDetector()
}
-void StandaloneMasterDetector::appoint(const Result<process::UPID>& leader)
+void StandaloneMasterDetector::appoint(const Option<process::UPID>& leader)
{
return dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader);
}
-Future<Result<UPID> > StandaloneMasterDetector::detect(
- const Result<UPID>& previous)
+Future<Option<UPID> > StandaloneMasterDetector::detect(
+ const Option<UPID>& previous)
{
return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous);
}
@@ -240,8 +228,8 @@ ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
{
- foreach (Promise<Result<UPID> >* promise, promises) {
- promise->set(Result<UPID>::error("No longer detecting a master"));
+ foreach (Promise<Option<UPID> >* promise, promises) {
+ promise->future().discard();
delete promise;
}
promises.clear();
@@ -250,43 +238,43 @@ ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
void ZooKeeperMasterDetectorProcess::initialize()
{
- detector.detect(None())
+ detector.detect()
.onAny(defer(self(), &Self::detected, lambda::_1));
}
-Future<Result<UPID> > ZooKeeperMasterDetectorProcess::detect(
- const Result<UPID>& previous)
+Future<Option<UPID> > ZooKeeperMasterDetectorProcess::detect(
+ const Option<UPID>& previous)
{
- // Directly return when the current leader and previous are not the
- // same.
- if (leader.isError() != previous.isError() ||
- leader.isNone() != previous.isNone() ||
- leader.isSome() != previous.isSome()) {
- return leader; // State change.
- } else if (leader.isSome() && previous.isSome() &&
- leader.get() != previous.get()) {
- return leader; // Leadership change.
+ if (leader != previous) {
+ return leader;
}
- Promise<Result<UPID> >* promise = new Promise<Result<UPID> >();
+ Promise<Option<UPID> >* promise = new Promise<Option<UPID> >();
promises.insert(promise);
return promise->future();
}
void ZooKeeperMasterDetectorProcess::detected(
- Future<Result<Group::Membership> > _leader)
+ Future<Option<Group::Membership> > _leader)
{
- CHECK(_leader.isReady())
- << "Not expecting LeaderDetector to fail or discard futures";
+ CHECK(!_leader.isDiscarded());
- if (!_leader.get().isSome()) {
- leader = _leader.get().isError()
- ? Result<UPID>::error(_leader.get().error())
- : Result<UPID>::none();
+ if (_leader.isFailed()) {
+ leader = None();
+ foreach (Promise<Option<UPID> >* promise, promises) {
+ promise->fail(_leader.failure());
+ delete promise;
+ }
+ promises.clear();
+ return;
+ }
+
+ if (_leader.get().isNone()) {
+ leader = None();
- foreach (Promise<Result<UPID> >* promise, promises) {
+ foreach (Promise<Option<UPID> >* promise, promises) {
promise->set(leader);
delete promise;
}
@@ -305,23 +293,23 @@ void ZooKeeperMasterDetectorProcess::detected(
void ZooKeeperMasterDetectorProcess::fetched(const Future<string>& data)
{
+ CHECK(!data.isDiscarded());
+
if (data.isFailed()) {
- leader = Error(data.failure());
- foreach (Promise<Result<UPID> >* promise, promises) {
- promise->set(leader);
+ leader = None();
+ foreach (Promise<Option<UPID> >* promise, promises) {
+ promise->fail(data.failure());
delete promise;
}
promises.clear();
return;
}
- CHECK(data.isReady()); // Not expecting Group to discard futures.
-
// Cache the master for subsequent requests.
leader = UPID(data.get());
LOG(INFO) << "A new leading master (UPID=" << leader.get() << ") is detected";
- foreach (Promise<Result<UPID> >* promise, promises) {
+ foreach (Promise<Option<UPID> >* promise, promises) {
promise->set(leader);
delete promise;
}
@@ -351,8 +339,8 @@ ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
}
-Future<Result<UPID> > ZooKeeperMasterDetector::detect(
- const Result<UPID>& previous)
+Future<Option<UPID> > ZooKeeperMasterDetector::detect(
+ const Option<UPID>& previous)
{
return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/master/detector.hpp
----------------------------------------------------------------------
diff --git a/src/master/detector.hpp b/src/master/detector.hpp
index 6e7a4c4..8df2d16 100644
--- a/src/master/detector.hpp
+++ b/src/master/detector.hpp
@@ -23,7 +23,7 @@
#include <process/owned.hpp>
#include <process/pid.hpp>
-#include <stout/result.hpp>
+#include <stout/option.hpp>
#include <stout/try.hpp>
#include "zookeeper/detector.hpp"
@@ -58,18 +58,18 @@ public:
// Returns some PID after an election has occurred and the elected
// PID is different than that specified (if any), or NONE if an
// election occurs and no PID is elected (e.g., all PIDs are lost).
- // The result is an error if the detector is not able to detect the
- // leading master, possibly due to network disconnection.
- //
- // The future fails when the detector is destructed, it is never
- // discarded.
+ // A failed future is returned if the detector is unable to detect
+ // the leading master due to a non-retryable error.
+ // Note that the detector transparently tries to recover from
+ // retryable errors.
+ // The future is never discarded unless it stays pending when the
+ // detector destructs.
//
// The 'previous' result (if any) should be passed back if this
// method is called repeatedly so the detector only returns when it
- // gets a different result, either because an error is recovered or
- // the elected membership is different from the 'previous'.
- virtual process::Future<Result<process::UPID> > detect(
- const Result<process::UPID>& previous = None()) = 0;
+ // gets a different result.
+ virtual process::Future<Option<process::UPID> > detect(
+ const Option<process::UPID>& previous = None()) = 0;
};
@@ -85,20 +85,11 @@ public:
StandaloneMasterDetector(const process::UPID& leader);
virtual ~StandaloneMasterDetector();
- // Appoint the leading master so it can be *detected* by default.
- // The leader can be NONE or ERROR.
- // This method is used only by this basic implementation and not
- // needed by child classes with other detection mechanisms such as
- // Zookeeper.
- //
- // When used by Master, this method is called during its
- // initialization process; when used by Slave and SchedulerDriver,
- // the MasterDetector needs to have the leader installed prior to
- // injection.
- void appoint(const Result<process::UPID>& leader);
+ // Appoint the leading master so it can be *detected*.
+ void appoint(const Option<process::UPID>& leader);
- virtual process::Future<Result<process::UPID> > detect(
- const Result<process::UPID>& previous = None());
+ virtual process::Future<Option<process::UPID> > detect(
+ const Option<process::UPID>& previous = None());
private:
StandaloneMasterDetectorProcess* process;
@@ -111,13 +102,13 @@ public:
// Creates a detector which uses ZooKeeper to determine (i.e.,
// elect) a leading master.
ZooKeeperMasterDetector(const zookeeper::URL& url);
- // A constructor overload for testing purposes.
+ // Used for testing purposes.
ZooKeeperMasterDetector(process::Owned<zookeeper::Group> group);
virtual ~ZooKeeperMasterDetector();
// MasterDetector implementation.
- virtual process::Future<Result<process::UPID> > detect(
- const Result<process::UPID>& previous = None());
+ virtual process::Future<Option<process::UPID> > detect(
+ const Option<process::UPID>& previous = None());
private:
ZooKeeperMasterDetectorProcess* process;
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 78dff2f..38c5532 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -195,7 +195,6 @@ Master::Master(
: ProcessBase("master"),
http(*this),
flags(_flags),
- leader(None()),
allocator(_allocator),
registrar(_registrar),
files(_files),
@@ -717,8 +716,10 @@ void Master::lostCandidacy(const Future<Nothing>& lost)
}
-void Master::detected(const Future<Result<UPID> >& _leader)
+void Master::detected(const Future<Option<UPID> >& _leader)
{
+ CHECK(!_leader.isDiscarded());
+
if (_leader.isFailed()) {
EXIT(1) << "Failed to detect the leading master: " << _leader.failure()
<< "; committing suicide!";
@@ -727,13 +728,8 @@ void Master::detected(const Future<Result<UPID> >& _leader)
bool wasElected = elected();
leader = _leader.get();
- if (leader.isError()) {
- EXIT(1) << "Failed to detect the leading master: " << leader.error()
- << "; committing suicide!";
- }
-
LOG(INFO) << "The newly elected leader is "
- << (leader.isSome() ? leader.get() : "NONE");
+ << (leader.isSome() ? leader.get() : "None");
if (wasElected && !elected()) {
EXIT(1) << "Lost leadership... committing suicide!";
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 2e921c1..95b9cec 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -202,7 +202,7 @@ protected:
void lostCandidacy(const Future<Nothing>& lost);
// Invoked when there is a newly elected leading master.
- void detected(const Future<Result<UPID> >& pid);
+ void detected(const Future<Option<UPID> >& pid);
// Process a launch tasks request (for a non-cancelled offer) by
// launching the desired tasks (if the offer contains a valid set of
@@ -315,7 +315,7 @@ private:
const Flags flags;
- Result<UPID> leader; // Current leading master.
+ Option<UPID> leader; // Current leading master.
// Whether we are the current leading master.
bool elected() const { return leader.isSome() && leader.get() == self(); }
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 54610f5..c01e564 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -108,7 +108,6 @@ public:
mutex(_mutex),
cond(_cond),
failover(_framework.has_id() && !framework.id().value().empty()),
- master(None()),
connected(false),
aborted(false),
detector(_detector),
@@ -173,7 +172,7 @@ protected:
.onAny(defer(self(), &SchedulerProcess::detected, lambda::_1));
}
- void detected(const Future<Result<UPID> >& pid)
+ void detected(const Future<Option<UPID> >& pid)
{
if (aborted) {
VLOG(1) << "Ignoring the master change because the driver is aborted!";
@@ -221,12 +220,10 @@ protected:
doReliableRegistration();
}
- } else if (master.isNone()) {
+ } else {
// In this case, we don't actually invoke Scheduler::error
// since we might get reconnected to a master imminently.
VLOG(1) << "No master detected";
- } else {
- EXIT(1) << "Failed to detect master: " << master.error();
}
// Keep detecting masters.
@@ -245,7 +242,7 @@ protected:
authenticated = false;
- if (!master.isSome()) {
+ if (master.isNone()) {
return;
}
@@ -303,7 +300,7 @@ protected:
CHECK_SOME(authenticating);
const Future<bool>& future = authenticating.get();
- if (!master.isSome()) {
+ if (master.isNone()) {
LOG(INFO) << "Ignoring _authenticate because the master is lost";
authenticating = None();
// Set it to false because we do not want further retries until
@@ -377,7 +374,7 @@ protected:
return;
}
- if (!master.isSome() || from != master.get()) {
+ if (master != from) {
VLOG(1) << "Ignoring framework registered message because it was sent "
<< "from '" << from << "' instead of the leading master '"
<< (master.isSome() ? master.get() : UPID()) << "'";
@@ -418,7 +415,7 @@ protected:
return;
}
- if (!master.isSome() || from != master.get()) {
+ if (master != from) {
VLOG(1) << "Ignoring framework re-registered message because it was sent "
<< "from '" << from << "' instead of the leading master '"
<< (master.isSome() ? master.get() : UPID()) << "'";
@@ -444,7 +441,7 @@ protected:
void doReliableRegistration()
{
- if (connected || !master.isSome()) {
+ if (connected || master.isNone()) {
return;
}
@@ -983,7 +980,7 @@ private:
pthread_mutex_t* mutex;
pthread_cond_t* cond;
bool failover;
- Result<UPID> master;
+ Option<UPID> master;
bool connected; // Flag to indicate if framework is registered.
volatile bool aborted; // Flag to indicate if the driver is aborted.
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8f77731..396293b 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -81,7 +81,6 @@ Slave::Slave(const slave::Flags& _flags,
http(*this),
flags(_flags),
local(_local),
- master(None()),
completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
detector(_detector),
isolator(_isolator),
@@ -429,7 +428,7 @@ void Slave::shutdown(const UPID& from)
// Allow shutdown message only if
// 1) Its a message received from the registered master or
// 2) If its called locally (e.g tests)
- if (from && (!master.isSome() || from != master.get())) {
+ if (from && master != from) {
LOG(WARNING) << "Ignoring shutdown message from " << from
<< " because it is not from the registered master: "
<< (master.isSome() ? master.get() : "None");
@@ -479,7 +478,7 @@ Nothing Slave::detachFile(const string& path)
}
-void Slave::detected(const Future<Result<UPID> >& pid)
+void Slave::detected(const Future<Option<UPID> >& pid)
{
CHECK(state == DISCONNECTED ||
state == RUNNING ||
@@ -518,10 +517,8 @@ void Slave::detected(const Future<Result<UPID> >& pid)
}
doReliableRegistration();
- } else if (master.isNone()) {
- LOG(INFO) << "Lost leading master";
} else {
- EXIT(1) << "Failed to detect a master: " << master.error();
+ LOG(INFO) << "Lost leading master";
}
// Keep detecting masters.
@@ -533,7 +530,7 @@ void Slave::detected(const Future<Result<UPID> >& pid)
void Slave::registered(const UPID& from, const SlaveID& slaveId)
{
- if (!master.isSome() || from != master.get()) {
+ if (master != from) {
LOG(WARNING) << "Ignoring registration message from " << from
<< " because it is not the expected master: "
<< (master.isSome() ? master.get() : "None");
@@ -592,7 +589,7 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
{
- if (!master.isSome() || from != master.get()) {
+ if (master != from) {
LOG(WARNING) << "Ignoring re-registration message from " << from
<< " because it is not the expected master: "
<< (master.isSome() ? master.get() : "None");
@@ -638,7 +635,7 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
void Slave::doReliableRegistration()
{
- if (!master.isSome()) {
+ if (master.isNone()) {
LOG(INFO) << "Skipping registration because no master present";
return;
}
@@ -730,7 +727,7 @@ void Slave::runTask(
const string& pid,
const TaskInfo& task)
{
- if (!master.isSome() || from != master.get()) {
+ if (master != from) {
LOG(WARNING) << "Ignoring run task message from " << from
<< " because it is not the expected master: "
<< (master.isSome() ? master.get() : "None");
@@ -1007,7 +1004,7 @@ void Slave::killTask(
const FrameworkID& frameworkId,
const TaskID& taskId)
{
- if (!master.isSome() || from != master.get()) {
+ if (master != from) {
LOG(WARNING) << "Ignoring kill task message from " << from
<< " because it is not the expected master: "
<< (master.isSome() ? master.get() : "None");
@@ -1135,7 +1132,7 @@ void Slave::shutdownFramework(
// Allow shutdownFramework() only if
// its called directly (e.g. Slave::finalize()) or
// its a message from the currently registered master.
- if (from && (!master.isSome() || from != master.get())) {
+ if (from && master != from) {
LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
<< " from " << from
<< " because it is not from the registered master ("
@@ -1968,7 +1965,7 @@ void Slave::exited(const UPID& pid)
{
LOG(INFO) << pid << " exited";
- if (!master.isSome() || master.get() == pid) {
+ if (master.isNone() || master.get() == pid) {
LOG(WARNING) << "Master disconnected!"
<< " Waiting for a new master to be elected";
// TODO(benh): After so long waiting for a master, commit suicide.
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 71fa4f0..b00f970 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -223,7 +223,7 @@ protected:
Nothing detachFile(const std::string& path);
// Invoked whenever the detector detects a change in masters.
- void detected(const Future<Result<UPID> >& pid);
+ void detected(const Future<Option<UPID> >& pid);
// Helper routine to lookup a framework.
Framework* getFramework(const FrameworkID& frameworkId);
@@ -318,7 +318,7 @@ private:
SlaveInfo info;
- Result<UPID> master;
+ Option<UPID> master;
Resources resources;
Attributes attributes;
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index d496641..d532c17 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -158,7 +158,7 @@ TEST(BasicMasterContenderDetectorTest, Detector)
StandaloneMasterDetector detector;
- Future<Result<UPID> > detected = detector.detect();
+ Future<Option<UPID> > detected = detector.detect();
// No one has appointed the leader so we are pending.
EXPECT_TRUE(detected.isPending());
@@ -196,7 +196,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender)
ZooKeeperMasterDetector detector(url.get());
- Future<Result<UPID> > leader = detector.detect();
+ Future<Option<UPID> > leader = detector.detect();
EXPECT_SOME_EQ(master, leader.get());
Future<Nothing> lostCandidacy = contended.get();
leader = detector.detect(leader.get());
@@ -236,7 +236,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
ZooKeeperMasterDetector detector1(url.get());
- Future<Result<UPID> > leader1 = detector1.detect();
+ Future<Option<UPID> > leader1 = detector1.detect();
AWAIT_READY(leader1);
EXPECT_SOME_EQ(master1, leader1.get());
@@ -252,7 +252,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
AWAIT_READY(contended2);
ZooKeeperMasterDetector detector2(url.get());
- Future<Result<UPID> > leader2 = detector2.detect();
+ Future<Option<UPID> > leader2 = detector2.detect();
AWAIT_READY(leader2);
EXPECT_SOME_EQ(master1, leader2.get());
@@ -261,12 +261,69 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
// Destroying detector1 (below) causes leadership change.
delete contender1;
- Future<Result<UPID> > leader3 = detector2.detect(master1);
+ Future<Option<UPID> > leader3 = detector2.detect(master1);
AWAIT_READY(leader3);
EXPECT_SOME_EQ(master2, leader3.get());
}
+// Verifies that contender and detector operations fail when facing
+// non-retryable errors returned by ZooKeeper.
+TEST_F(ZooKeeperMasterContenderDetectorTest, NonRetryableFrrors)
+{
+ // group1 creates a base directory in ZooKeeper and sets the
+ // credential for the user.
+ zookeeper::Group group1(
+ server->connectString(),
+ MASTER_CONTENDER_ZK_SESSION_TIMEOUT,
+ "/mesos",
+ zookeeper::Authentication("digest", "member:member"));
+ AWAIT_READY(group1.join("data"));
+
+ PID<Master> master;
+ master.ip = 10000000;
+ master.port = 10000;
+
+ // group2's password is wrong and operations on it will fail.
+ Owned<zookeeper::Group> group2(new Group(
+ server->connectString(),
+ MASTER_CONTENDER_ZK_SESSION_TIMEOUT,
+ "/mesos",
+ zookeeper::Authentication("digest", "member:wrongpass")));
+ ZooKeeperMasterContender contender(group2);
+ contender.initialize(master);
+
+ // Fails due to authentication error.
+ AWAIT_FAILED(contender.contend());
+
+ // Now test non-retryable failures in detection.
+ ZooKeeperTest::TestWatcher watcher;
+ ZooKeeper authenticatedZk(server->connectString(), NO_TIMEOUT, &watcher);
+ watcher.awaitSessionEvent(ZOO_CONNECTED_STATE);
+ authenticatedZk.authenticate("digest", "creator:creator");
+
+ // Creator of the base path restricts the all accesses to be
+ // itself.
+ ACL onlyCreatorCanAccess[] = {{ ZOO_PERM_ALL, ZOO_AUTH_IDS }};
+ authenticatedZk.create("/test",
+ "42",
+ (ACL_vector) {1, onlyCreatorCanAccess},
+ 0,
+ NULL);
+ ASSERT_ZK_GET("42", &authenticatedZk, "/test");
+
+ // group3 cannot read the base path thus detector should fail.
+ Owned<Group> group3(new Group(
+ server->connectString(),
+ MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
+ "/test",
+ None()));
+
+ ZooKeeperMasterDetector detector(group3);
+ AWAIT_FAILED(detector.detect());
+}
+
+
// Master contention and detection fail when the network is down, it
// recovers when the network is back up.
TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
@@ -292,7 +349,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
ZooKeeperMasterDetector detector(url.get());
- Future<Result<UPID> > leader = detector.detect();
+ Future<Option<UPID> > leader = detector.detect();
AWAIT_READY(leader);
EXPECT_SOME_EQ(master, leader.get());
@@ -365,7 +422,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
ZooKeeperMasterDetector leaderDetector(leaderGroup);
- Future<Result<UPID> > detected = leaderDetector.detect();
+ Future<Option<UPID> > detected = leaderDetector.detect();
AWAIT_READY(detected);
EXPECT_SOME_EQ(leader, detected.get());
@@ -418,9 +475,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
AWAIT_READY(nonContenderReconnecting);
// Now the detectors re-detect.
- Future<Result<UPID> > leaderDetected = leaderDetector.detect(leader);
- Future<Result<UPID> > followerDetected = followerDetector.detect(leader);
- Future<Result<UPID> > nonContenderDetected =
+ Future<Option<UPID> > leaderDetected = leaderDetector.detect(leader);
+ Future<Option<UPID> > followerDetected = followerDetector.detect(leader);
+ Future<Option<UPID> > nonContenderDetected =
nonContenderDetector.detect(leader);
Clock::pause();
@@ -474,12 +531,12 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
ZooKeeperMasterDetector leaderDetector(url.get());
- Future<Result<UPID> > detected = leaderDetector.detect();
+ Future<Option<UPID> > detected = leaderDetector.detect();
AWAIT_READY(detected);
EXPECT_SOME_EQ(leader, detected.get());
// Keep detecting.
- Future<Result<UPID> > newLeaderDetected =
+ Future<Option<UPID> > newLeaderDetected =
leaderDetector.detect(detected.get());
// Simulate a following master.
@@ -545,7 +602,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession)
ZooKeeperMasterDetector slaveDetector(group);
- Future<Result<UPID> > detected = slaveDetector.detect();
+ Future<Option<UPID> > detected = slaveDetector.detect();
AWAIT_READY(detected);
EXPECT_SOME_EQ(master, detected.get());
@@ -598,7 +655,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
Future<Future<Nothing> > contended = leaderContender.contend();
AWAIT_READY(contended);
- Future<Result<UPID> > detected = leaderDetector.detect(None());
+ Future<Option<UPID> > detected = leaderDetector.detect(None());
AWAIT_READY(detected);
EXPECT_SOME_EQ(leader, detected.get());
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/tests/zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper_tests.cpp b/src/tests/zookeeper_tests.cpp
index 62ab1bc..94d324a 100644
--- a/src/tests/zookeeper_tests.cpp
+++ b/src/tests/zookeeper_tests.cpp
@@ -156,7 +156,7 @@ TEST_F(ZooKeeperTest, LeaderDetector)
LeaderDetector detector(&group);
// Detect the leader.
- Future<Result<Group::Membership> > leader =
+ Future<Option<Group::Membership> > leader =
detector.detect(None());
AWAIT_READY(leader);
ASSERT_SOME_EQ(membership1.get(), leader.get());
@@ -205,7 +205,7 @@ TEST_F(ZooKeeperTest, LeaderDetectorTimeoutHandling)
AWAIT_READY(membership1);
Future<bool> cancelled = membership1.get().cancelled();
- Future<Result<Group::Membership> > leader = detector.detect();
+ Future<Option<Group::Membership> > leader = detector.detect();
AWAIT_READY(leader);
EXPECT_SOME(leader.get());
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/zookeeper/detector.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/detector.cpp b/src/zookeeper/detector.cpp
index 1de3663..6723d03 100644
--- a/src/zookeeper/detector.cpp
+++ b/src/zookeeper/detector.cpp
@@ -28,8 +28,8 @@ public:
virtual void initialize();
// LeaderDetector implementation.
- Future<Result<Group::Membership> > detect(
- const Result<Group::Membership>& previous);
+ Future<Option<Group::Membership> > detect(
+ const Option<Group::Membership>& previous);
private:
// Helper that sets up the watch on the group.
@@ -39,8 +39,8 @@ private:
void watched(Future<set<Group::Membership> > memberships);
Group* group;
- Result<Group::Membership> leader;
- set<Promise<Result<Group::Membership> >*> promises;
+ Option<Group::Membership> leader;
+ set<Promise<Option<Group::Membership> >*> promises;
};
@@ -50,8 +50,8 @@ LeaderDetectorProcess::LeaderDetectorProcess(Group* _group)
LeaderDetectorProcess::~LeaderDetectorProcess()
{
- foreach (Promise<Result<Group::Membership> >* promise, promises) {
- promise->fail("No longer detecting leader");
+ foreach (Promise<Option<Group::Membership> >* promise, promises) {
+ promise->future().discard();
delete promise;
}
promises.clear();
@@ -64,23 +64,18 @@ void LeaderDetectorProcess::initialize()
}
-Future<Result<Group::Membership> > LeaderDetectorProcess::detect(
- const Result<Group::Membership>& previous)
+Future<Option<Group::Membership> > LeaderDetectorProcess::detect(
+ const Option<Group::Membership>& previous)
{
// Return immediately if the incumbent leader is different from the
// expected.
- if (leader.isError() != previous.isError() ||
- leader.isNone() != previous.isNone() ||
- leader.isSome() != previous.isSome()) {
- return leader; // State change.
- } else if (leader.isSome() && previous.isSome() &&
- leader.get() != previous.get()) {
- return leader; // Leadership change.
+ if (leader != previous) {
+ return leader;
}
// Otherwise wait for the next election result.
- Promise<Result<Group::Membership> >* promise =
- new Promise<Result<Group::Membership> >();
+ Promise<Option<Group::Membership> >* promise =
+ new Promise<Option<Group::Membership> >();
promises.insert(promise);
return promise->future();
}
@@ -95,22 +90,19 @@ void LeaderDetectorProcess::watch(const set<Group::Membership>& expected)
void LeaderDetectorProcess::watched(Future<set<Group::Membership> > memberships)
{
+ CHECK(!memberships.isDiscarded());
+
if (memberships.isFailed()) {
LOG(ERROR) << "Failed to watch memberships: " << memberships.failure();
- leader = Error(memberships.failure());
- foreach (Promise<Result<Group::Membership> >* promise, promises) {
- promise->set(leader);
+ leader = None();
+ foreach (Promise<Option<Group::Membership> >* promise, promises) {
+ promise->fail(memberships.failure());
delete promise;
}
promises.clear();
-
- // Start over.
- watch(set<Group::Membership>());
return;
}
- CHECK(memberships.isReady()) << "Not expecting Group to discard futures";
-
// Update leader status based on memberships.
if (leader.isSome() && memberships.get().count(leader.get()) == 0) {
VLOG(1) << "The current leader (id=" << leader.get().id() << ") is lost";
@@ -121,24 +113,17 @@ void LeaderDetectorProcess::watched(Future<set<Group::Membership> > memberships)
// incumbent wins the election.
Option<Group::Membership> current;
foreach (const Group::Membership& membership, memberships.get()) {
- if (current.isNone() || membership.id() < current.get().id()) {
- current = membership;
- }
+ current = min(current, membership);
}
- if (current.isSome() && (!leader.isSome() || current.get() != leader.get())) {
- LOG(INFO) << "Detected a new leader (id='" << current.get().id()
- << "')";
- foreach (Promise<Result<Group::Membership> >* promise, promises) {
- promise->set(current);
- delete promise;
- }
- promises.clear();
- } else if (current.isNone() && !leader.isNone()) {
- LOG(INFO) << "No new leader is elected after election";
+ if (current != leader) {
+ LOG(INFO) << "Detected a new leader: "
+ << (current.isSome()
+ ? "'(id='" + stringify(current.get().id()) + "')"
+ : "None");
- foreach (Promise<Result<Group::Membership> >* promise, promises) {
- promise->set(Result<Group::Membership>::none());
+ foreach (Promise<Option<Group::Membership> >* promise, promises) {
+ promise->set(current);
delete promise;
}
promises.clear();
@@ -164,8 +149,8 @@ LeaderDetector::~LeaderDetector()
}
-Future<Result<Group::Membership> > LeaderDetector::detect(
- const Result<Group::Membership>& membership)
+Future<Option<Group::Membership> > LeaderDetector::detect(
+ const Option<Group::Membership>& membership)
{
return dispatch(process, &LeaderDetectorProcess::detect, membership);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/zookeeper/detector.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/detector.hpp b/src/zookeeper/detector.hpp
index de4acab..00fcf5e 100644
--- a/src/zookeeper/detector.hpp
+++ b/src/zookeeper/detector.hpp
@@ -26,17 +26,20 @@ public:
// Returns some membership after an election has occurred and a
// leader (membership) is elected, or none if an election occurs and
// no leader is elected (e.g., all memberships are lost).
- // The result is an error if the detector is not able to detect the
- // leader, possibly due to network disconnection.
+ // A failed future is returned if the detector is unable to detect
+ // the leading master due to a non-retryable error.
+ // Note that the detector transparently tries to recover from
+ // retryable errors.
+ // The future is never discarded unless it stays pending when the
+ // detector destructs.
//
// The 'previous' result (if any) should be passed back if this
// method is called repeatedly so the detector only returns when it
- // gets a different result, either because an error is recovered or
- // the elected membership is different from the 'previous'.
+ // gets a different result.
//
// TODO(xujyan): Use a Stream abstraction instead.
- process::Future<Result<Group::Membership> > detect(
- const Result<Group::Membership>& previous = None());
+ process::Future<Option<Group::Membership> > detect(
+ const Option<Group::Membership>& previous = None());
private:
LeaderDetectorProcess* process;