You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/12/20 01:32:03 UTC

[6/7] git commit: Changed master detector to return Future

Changed master detector to return Future<Option> instead of
Future<Result>.

Now that local session timeouts are transparent to the clients of the
master detector, they are not supposed to retry anymore. Thus passing
"Result<UPID> previous" back into MasterDetector::detect() is no
longer correct.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/16291


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e1767e38
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e1767e38
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e1767e38

Branch: refs/heads/master
Commit: e1767e384db6ab187e69f5b2ed0f56995103337b
Parents: aa75551
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Dec 19 16:09:41 2013 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Thu Dec 19 16:09:41 2013 -0800

----------------------------------------------------------------------
 src/cli/resolve.cpp                           |   8 +-
 src/master/detector.cpp                       | 116 +++++++++------------
 src/master/detector.hpp                       |  43 +++-----
 src/master/master.cpp                         |  12 +--
 src/master/master.hpp                         |   4 +-
 src/sched/sched.cpp                           |  19 ++--
 src/slave/slave.cpp                           |  23 ++--
 src/slave/slave.hpp                           |   4 +-
 src/tests/master_contender_detector_tests.cpp |  85 ++++++++++++---
 src/tests/zookeeper_tests.cpp                 |   4 +-
 src/zookeeper/detector.cpp                    |  69 +++++-------
 src/zookeeper/detector.hpp                    |  15 +--
 12 files changed, 208 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/cli/resolve.cpp
----------------------------------------------------------------------
diff --git a/src/cli/resolve.cpp b/src/cli/resolve.cpp
index dddadfc..0ebb0c6 100644
--- a/src/cli/resolve.cpp
+++ b/src/cli/resolve.cpp
@@ -124,16 +124,16 @@ int main(int argc, char** argv)
     return -1;
   }
 
-  Future<Result<UPID> > pid = detector.get()->detect();
+  Future<Option<UPID> > pid = detector.get()->detect();
 
   if (!pid.await(timeout)) {
     cerr << "Failed to detect master from '" << master.get()
          << "' within " << timeout << endl;
     return -1;
   } else {
-    // Not expecting detect() to fail or discard the future.
-    CHECK(pid.isReady());
-    if (pid.get().isError()) {
+    CHECK(!pid.isDiscarded());
+
+    if (pid.isFailed()) {
       cerr << "Failed to detect master from '" << master.get()
            << "': " << pid.failure() << endl;
       return -1;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/master/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector.cpp b/src/master/detector.cpp
index 2f73f66..7b437ac 100644
--- a/src/master/detector.cpp
+++ b/src/master/detector.cpp
@@ -54,20 +54,17 @@ class StandaloneMasterDetectorProcess
   : public Process<StandaloneMasterDetectorProcess>
 {
 public:
-  StandaloneMasterDetectorProcess() : leader(None()) {}
+  StandaloneMasterDetectorProcess() {}
   StandaloneMasterDetectorProcess(const UPID& _leader)
     : leader(_leader) {}
   ~StandaloneMasterDetectorProcess();
 
-  void appoint(const Result<UPID>& leader);
-  Future<Result<UPID> > detect(const Result<UPID>& previous = None());
+  void appoint(const Option<UPID>& leader);
+  Future<Option<UPID> > detect(const Option<UPID>& previous = None());
 
 private:
-  // The leading master that's directly 'appoint()'ed.
-  Result<UPID> leader;
-
-  // Promises for the detection result.
-  set<Promise<Result<UPID> >*> promises;
+  Option<UPID> leader; // The appointed master.
+  set<Promise<Option<UPID> >*> promises;
 };
 
 
@@ -80,13 +77,11 @@ public:
   ~ZooKeeperMasterDetectorProcess();
 
   virtual void initialize();
-
-  // ZooKeeperMasterDetector implementation.
-  Future<Result<UPID> > detect(const Result<UPID>& previous);
+  Future<Option<UPID> > detect(const Option<UPID>& previous);
 
 private:
   // Invoked when the group leadership has changed.
-  void detected(Future<Result<Group::Membership> > leader);
+  void detected(Future<Option<Group::Membership> > leader);
 
   // Invoked when we have fetched the data associated with the leader.
   void fetched(const Future<string>& data);
@@ -95,8 +90,8 @@ private:
   LeaderDetector detector;
 
   // The leading Master.
-  Result<UPID> leader;
-  set<Promise<Result<UPID> >*> promises;
+  Option<UPID> leader;
+  set<Promise<Option<UPID> >*> promises;
 };
 
 
@@ -143,8 +138,8 @@ MasterDetector::~MasterDetector() {}
 
 StandaloneMasterDetectorProcess::~StandaloneMasterDetectorProcess()
 {
-  foreach (Promise<Result<UPID> >* promise, promises) {
-    promise->set(Result<UPID>::error("MasterDetector is being destructed"));
+  foreach (Promise<Option<UPID> >* promise, promises) {
+    promise->future().discard();
     delete promise;
   }
   promises.clear();
@@ -152,11 +147,11 @@ StandaloneMasterDetectorProcess::~StandaloneMasterDetectorProcess()
 
 
 void StandaloneMasterDetectorProcess::appoint(
-    const Result<process::UPID>& _leader)
+    const Option<process::UPID>& _leader)
 {
   leader = _leader;
 
-  foreach (Promise<Result<UPID> >* promise, promises) {
+  foreach (Promise<Option<UPID> >* promise, promises) {
     promise->set(leader);
     delete promise;
   }
@@ -164,21 +159,14 @@ void StandaloneMasterDetectorProcess::appoint(
 }
 
 
-Future<Result<UPID> > StandaloneMasterDetectorProcess::detect(
-    const Result<UPID>& previous)
+Future<Option<UPID> > StandaloneMasterDetectorProcess::detect(
+    const Option<UPID>& previous)
 {
-  // Directly return the current leader is not the
-  // same as the previous one.
-  if (leader.isError() != previous.isError() ||
-      leader.isNone() != previous.isNone() ||
-      leader.isSome() != previous.isSome()) {
-    return leader; // State change.
-  } else if (leader.isSome() && previous.isSome() &&
-             leader.get() != previous.get()) {
-    return leader; // Leadership change.
+  if (leader != previous) {
+    return leader;
   }
 
-  Promise<Result<UPID> >* promise = new Promise<Result<UPID> >();
+  Promise<Option<UPID> >* promise = new Promise<Option<UPID> >();
   promises.insert(promise);
   return promise->future();
 }
@@ -206,14 +194,14 @@ StandaloneMasterDetector::~StandaloneMasterDetector()
 }
 
 
-void StandaloneMasterDetector::appoint(const Result<process::UPID>& leader)
+void StandaloneMasterDetector::appoint(const Option<process::UPID>& leader)
 {
   return dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader);
 }
 
 
-Future<Result<UPID> > StandaloneMasterDetector::detect(
-    const Result<UPID>& previous)
+Future<Option<UPID> > StandaloneMasterDetector::detect(
+    const Option<UPID>& previous)
 {
   return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous);
 }
@@ -240,8 +228,8 @@ ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
 
 ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
 {
-  foreach (Promise<Result<UPID> >* promise, promises) {
-    promise->set(Result<UPID>::error("No longer detecting a master"));
+  foreach (Promise<Option<UPID> >* promise, promises) {
+    promise->future().discard();
     delete promise;
   }
   promises.clear();
@@ -250,43 +238,43 @@ ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
 
 void ZooKeeperMasterDetectorProcess::initialize()
 {
-  detector.detect(None())
+  detector.detect()
     .onAny(defer(self(), &Self::detected, lambda::_1));
 }
 
 
-Future<Result<UPID> > ZooKeeperMasterDetectorProcess::detect(
-    const Result<UPID>& previous)
+Future<Option<UPID> > ZooKeeperMasterDetectorProcess::detect(
+    const Option<UPID>& previous)
 {
-  // Directly return when the current leader and previous are not the
-  // same.
-  if (leader.isError() != previous.isError() ||
-      leader.isNone() != previous.isNone() ||
-      leader.isSome() != previous.isSome()) {
-    return leader; // State change.
-  } else if (leader.isSome() && previous.isSome() &&
-             leader.get() != previous.get()) {
-    return leader; // Leadership change.
+  if (leader != previous) {
+    return leader;
   }
 
-  Promise<Result<UPID> >* promise = new Promise<Result<UPID> >();
+  Promise<Option<UPID> >* promise = new Promise<Option<UPID> >();
   promises.insert(promise);
   return promise->future();
 }
 
 
 void ZooKeeperMasterDetectorProcess::detected(
-    Future<Result<Group::Membership> > _leader)
+    Future<Option<Group::Membership> > _leader)
 {
-  CHECK(_leader.isReady())
-    << "Not expecting LeaderDetector to fail or discard futures";
+  CHECK(!_leader.isDiscarded());
 
-  if (!_leader.get().isSome()) {
-    leader = _leader.get().isError()
-        ? Result<UPID>::error(_leader.get().error())
-        : Result<UPID>::none();
+  if (_leader.isFailed()) {
+    leader = None();
+    foreach (Promise<Option<UPID> >* promise, promises) {
+      promise->fail(_leader.failure());
+      delete promise;
+    }
+    promises.clear();
+    return;
+  }
+
+  if (_leader.get().isNone()) {
+    leader = None();
 
-    foreach (Promise<Result<UPID> >* promise, promises) {
+    foreach (Promise<Option<UPID> >* promise, promises) {
       promise->set(leader);
       delete promise;
     }
@@ -305,23 +293,23 @@ void ZooKeeperMasterDetectorProcess::detected(
 
 void ZooKeeperMasterDetectorProcess::fetched(const Future<string>& data)
 {
+  CHECK(!data.isDiscarded());
+
   if (data.isFailed()) {
-    leader = Error(data.failure());
-    foreach (Promise<Result<UPID> >* promise, promises) {
-      promise->set(leader);
+    leader = None();
+    foreach (Promise<Option<UPID> >* promise, promises) {
+      promise->fail(data.failure());
       delete promise;
     }
     promises.clear();
     return;
   }
 
-  CHECK(data.isReady()); // Not expecting Group to discard futures.
-
   // Cache the master for subsequent requests.
   leader = UPID(data.get());
   LOG(INFO) << "A new leading master (UPID=" << leader.get() << ") is detected";
 
-  foreach (Promise<Result<UPID> >* promise, promises) {
+  foreach (Promise<Option<UPID> >* promise, promises) {
     promise->set(leader);
     delete promise;
   }
@@ -351,8 +339,8 @@ ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
 }
 
 
-Future<Result<UPID> > ZooKeeperMasterDetector::detect(
-    const Result<UPID>& previous)
+Future<Option<UPID> > ZooKeeperMasterDetector::detect(
+    const Option<UPID>& previous)
 {
   return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/master/detector.hpp
----------------------------------------------------------------------
diff --git a/src/master/detector.hpp b/src/master/detector.hpp
index 6e7a4c4..8df2d16 100644
--- a/src/master/detector.hpp
+++ b/src/master/detector.hpp
@@ -23,7 +23,7 @@
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 
-#include <stout/result.hpp>
+#include <stout/option.hpp>
 #include <stout/try.hpp>
 
 #include "zookeeper/detector.hpp"
@@ -58,18 +58,18 @@ public:
   // Returns some PID after an election has occurred and the elected
   // PID is different than that specified (if any), or NONE if an
   // election occurs and no PID is elected (e.g., all PIDs are lost).
-  // The result is an error if the detector is not able to detect the
-  // leading master, possibly due to network disconnection.
-  //
-  // The future fails when the detector is destructed, it is never
-  // discarded.
+  // A failed future is returned if the detector is unable to detect
+  // the leading master due to a non-retryable error.
+  // Note that the detector transparently tries to recover from
+  // retryable errors.
+  // The future is never discarded unless it stays pending when the
+  // detector destructs.
   //
   // The 'previous' result (if any) should be passed back if this
   // method is called repeatedly so the detector only returns when it
-  // gets a different result, either because an error is recovered or
-  // the elected membership is different from the 'previous'.
-  virtual process::Future<Result<process::UPID> > detect(
-      const Result<process::UPID>& previous = None()) = 0;
+  // gets a different result.
+  virtual process::Future<Option<process::UPID> > detect(
+      const Option<process::UPID>& previous = None()) = 0;
 };
 
 
@@ -85,20 +85,11 @@ public:
   StandaloneMasterDetector(const process::UPID& leader);
   virtual ~StandaloneMasterDetector();
 
-  // Appoint the leading master so it can be *detected* by default.
-  // The leader can be NONE or ERROR.
-  // This method is used only by this basic implementation and not
-  // needed by child classes with other detection mechanisms such as
-  // Zookeeper.
-  //
-  // When used by Master, this method is called during its
-  // initialization process; when used by Slave and SchedulerDriver,
-  // the MasterDetector needs to have the leader installed prior to
-  // injection.
-  void appoint(const Result<process::UPID>& leader);
+  // Appoint the leading master so it can be *detected*.
+  void appoint(const Option<process::UPID>& leader);
 
-  virtual process::Future<Result<process::UPID> > detect(
-      const Result<process::UPID>& previous = None());
+  virtual process::Future<Option<process::UPID> > detect(
+      const Option<process::UPID>& previous = None());
 
 private:
   StandaloneMasterDetectorProcess* process;
@@ -111,13 +102,13 @@ public:
   // Creates a detector which uses ZooKeeper to determine (i.e.,
   // elect) a leading master.
   ZooKeeperMasterDetector(const zookeeper::URL& url);
-  // A constructor overload for testing purposes.
+  // Used for testing purposes.
   ZooKeeperMasterDetector(process::Owned<zookeeper::Group> group);
   virtual ~ZooKeeperMasterDetector();
 
   // MasterDetector implementation.
-  virtual process::Future<Result<process::UPID> > detect(
-      const Result<process::UPID>& previous = None());
+  virtual process::Future<Option<process::UPID> > detect(
+      const Option<process::UPID>& previous = None());
 
 private:
   ZooKeeperMasterDetectorProcess* process;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 78dff2f..38c5532 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -195,7 +195,6 @@ Master::Master(
   : ProcessBase("master"),
     http(*this),
     flags(_flags),
-    leader(None()),
     allocator(_allocator),
     registrar(_registrar),
     files(_files),
@@ -717,8 +716,10 @@ void Master::lostCandidacy(const Future<Nothing>& lost)
 }
 
 
-void Master::detected(const Future<Result<UPID> >& _leader)
+void Master::detected(const Future<Option<UPID> >& _leader)
 {
+  CHECK(!_leader.isDiscarded());
+
   if (_leader.isFailed()) {
     EXIT(1) << "Failed to detect the leading master: " << _leader.failure()
             << "; committing suicide!";
@@ -727,13 +728,8 @@ void Master::detected(const Future<Result<UPID> >& _leader)
   bool wasElected = elected();
   leader = _leader.get();
 
-  if (leader.isError()) {
-    EXIT(1) << "Failed to detect the leading master: " << leader.error()
-            << "; committing suicide!";
-  }
-
   LOG(INFO) << "The newly elected leader is "
-            << (leader.isSome() ? leader.get() : "NONE");
+            << (leader.isSome() ? leader.get() : "None");
 
   if (wasElected && !elected()) {
     EXIT(1) << "Lost leadership... committing suicide!";

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 2e921c1..95b9cec 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -202,7 +202,7 @@ protected:
   void lostCandidacy(const Future<Nothing>& lost);
 
   // Invoked when there is a newly elected leading master.
-  void detected(const Future<Result<UPID> >& pid);
+  void detected(const Future<Option<UPID> >& pid);
 
   // Process a launch tasks request (for a non-cancelled offer) by
   // launching the desired tasks (if the offer contains a valid set of
@@ -315,7 +315,7 @@ private:
 
   const Flags flags;
 
-  Result<UPID> leader; // Current leading master.
+  Option<UPID> leader; // Current leading master.
 
   // Whether we are the current leading master.
   bool elected() const { return leader.isSome() && leader.get() == self(); }

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 54610f5..c01e564 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -108,7 +108,6 @@ public:
       mutex(_mutex),
       cond(_cond),
       failover(_framework.has_id() && !framework.id().value().empty()),
-      master(None()),
       connected(false),
       aborted(false),
       detector(_detector),
@@ -173,7 +172,7 @@ protected:
       .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1));
   }
 
-  void detected(const Future<Result<UPID> >& pid)
+  void detected(const Future<Option<UPID> >& pid)
   {
     if (aborted) {
       VLOG(1) << "Ignoring the master change because the driver is aborted!";
@@ -221,12 +220,10 @@ protected:
 
         doReliableRegistration();
       }
-    } else if (master.isNone()) {
+    } else {
       // In this case, we don't actually invoke Scheduler::error
       // since we might get reconnected to a master imminently.
       VLOG(1) << "No master detected";
-    } else {
-      EXIT(1) << "Failed to detect master: " << master.error();
     }
 
     // Keep detecting masters.
@@ -245,7 +242,7 @@ protected:
 
     authenticated = false;
 
-    if (!master.isSome()) {
+    if (master.isNone()) {
       return;
     }
 
@@ -303,7 +300,7 @@ protected:
     CHECK_SOME(authenticating);
     const Future<bool>& future = authenticating.get();
 
-    if (!master.isSome()) {
+    if (master.isNone()) {
       LOG(INFO) << "Ignoring _authenticate because the master is lost";
       authenticating = None();
       // Set it to false because we do not want further retries until
@@ -377,7 +374,7 @@ protected:
       return;
     }
 
-    if (!master.isSome() || from != master.get()) {
+    if (master != from) {
       VLOG(1) << "Ignoring framework registered message because it was sent "
               << "from '" << from << "' instead of the leading master '"
               << (master.isSome() ? master.get() : UPID()) << "'";
@@ -418,7 +415,7 @@ protected:
       return;
     }
 
-    if (!master.isSome() || from != master.get()) {
+    if (master != from) {
       VLOG(1) << "Ignoring framework re-registered message because it was sent "
               << "from '" << from << "' instead of the leading master '"
               << (master.isSome() ? master.get() : UPID()) << "'";
@@ -444,7 +441,7 @@ protected:
 
   void doReliableRegistration()
   {
-    if (connected || !master.isSome()) {
+    if (connected || master.isNone()) {
       return;
     }
 
@@ -983,7 +980,7 @@ private:
   pthread_mutex_t* mutex;
   pthread_cond_t* cond;
   bool failover;
-  Result<UPID> master;
+  Option<UPID> master;
 
   bool connected; // Flag to indicate if framework is registered.
   volatile bool aborted; // Flag to indicate if the driver is aborted.

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8f77731..396293b 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -81,7 +81,6 @@ Slave::Slave(const slave::Flags& _flags,
     http(*this),
     flags(_flags),
     local(_local),
-    master(None()),
     completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
     detector(_detector),
     isolator(_isolator),
@@ -429,7 +428,7 @@ void Slave::shutdown(const UPID& from)
   // Allow shutdown message only if
   // 1) Its a message received from the registered master or
   // 2) If its called locally (e.g tests)
-  if (from && (!master.isSome() || from != master.get())) {
+  if (from && master != from) {
     LOG(WARNING) << "Ignoring shutdown message from " << from
                  << " because it is not from the registered master: "
                  << (master.isSome() ? master.get() : "None");
@@ -479,7 +478,7 @@ Nothing Slave::detachFile(const string& path)
 }
 
 
-void Slave::detected(const Future<Result<UPID> >& pid)
+void Slave::detected(const Future<Option<UPID> >& pid)
 {
   CHECK(state == DISCONNECTED ||
         state == RUNNING ||
@@ -518,10 +517,8 @@ void Slave::detected(const Future<Result<UPID> >& pid)
     }
 
     doReliableRegistration();
-  } else if (master.isNone()) {
-    LOG(INFO) << "Lost leading master";
   } else {
-    EXIT(1) << "Failed to detect a master: " << master.error();
+    LOG(INFO) << "Lost leading master";
   }
 
   // Keep detecting masters.
@@ -533,7 +530,7 @@ void Slave::detected(const Future<Result<UPID> >& pid)
 
 void Slave::registered(const UPID& from, const SlaveID& slaveId)
 {
-  if (!master.isSome() || from != master.get()) {
+  if (master != from) {
     LOG(WARNING) << "Ignoring registration message from " << from
                  << " because it is not the expected master: "
                  << (master.isSome() ? master.get() : "None");
@@ -592,7 +589,7 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
 
 void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
 {
-  if (!master.isSome() || from != master.get()) {
+  if (master != from) {
     LOG(WARNING) << "Ignoring re-registration message from " << from
                  << " because it is not the expected master: "
                  << (master.isSome() ? master.get() : "None");
@@ -638,7 +635,7 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
 
 void Slave::doReliableRegistration()
 {
-  if (!master.isSome()) {
+  if (master.isNone()) {
     LOG(INFO) << "Skipping registration because no master present";
     return;
   }
@@ -730,7 +727,7 @@ void Slave::runTask(
     const string& pid,
     const TaskInfo& task)
 {
-  if (!master.isSome() || from != master.get()) {
+  if (master != from) {
     LOG(WARNING) << "Ignoring run task message from " << from
                  << " because it is not the expected master: "
                  << (master.isSome() ? master.get() : "None");
@@ -1007,7 +1004,7 @@ void Slave::killTask(
     const FrameworkID& frameworkId,
     const TaskID& taskId)
 {
-  if (!master.isSome() || from != master.get()) {
+  if (master != from) {
     LOG(WARNING) << "Ignoring kill task message from " << from
                  << " because it is not the expected master: "
                  << (master.isSome() ? master.get() : "None");
@@ -1135,7 +1132,7 @@ void Slave::shutdownFramework(
   // Allow shutdownFramework() only if
   // its called directly (e.g. Slave::finalize()) or
   // its a message from the currently registered master.
-  if (from && (!master.isSome() || from != master.get())) {
+  if (from && master != from) {
     LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
                  << " from " << from
                  << " because it is not from the registered master ("
@@ -1968,7 +1965,7 @@ void Slave::exited(const UPID& pid)
 {
   LOG(INFO) << pid << " exited";
 
-  if (!master.isSome() || master.get() == pid) {
+  if (master.isNone() || master.get() == pid) {
     LOG(WARNING) << "Master disconnected!"
                  << " Waiting for a new master to be elected";
     // TODO(benh): After so long waiting for a master, commit suicide.

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 71fa4f0..b00f970 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -223,7 +223,7 @@ protected:
   Nothing detachFile(const std::string& path);
 
   // Invoked whenever the detector detects a change in masters.
-  void detected(const Future<Result<UPID> >& pid);
+  void detected(const Future<Option<UPID> >& pid);
 
   // Helper routine to lookup a framework.
   Framework* getFramework(const FrameworkID& frameworkId);
@@ -318,7 +318,7 @@ private:
 
   SlaveInfo info;
 
-  Result<UPID> master;
+  Option<UPID> master;
 
   Resources resources;
   Attributes attributes;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
index d496641..d532c17 100644
--- a/src/tests/master_contender_detector_tests.cpp
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -158,7 +158,7 @@ TEST(BasicMasterContenderDetectorTest, Detector)
 
   StandaloneMasterDetector detector;
 
-  Future<Result<UPID> > detected = detector.detect();
+  Future<Option<UPID> > detected = detector.detect();
 
   // No one has appointed the leader so we are pending.
   EXPECT_TRUE(detected.isPending());
@@ -196,7 +196,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender)
 
   ZooKeeperMasterDetector detector(url.get());
 
-  Future<Result<UPID> > leader = detector.detect();
+  Future<Option<UPID> > leader = detector.detect();
   EXPECT_SOME_EQ(master, leader.get());
   Future<Nothing> lostCandidacy = contended.get();
   leader = detector.detect(leader.get());
@@ -236,7 +236,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
 
   ZooKeeperMasterDetector detector1(url.get());
 
-  Future<Result<UPID> > leader1 = detector1.detect();
+  Future<Option<UPID> > leader1 = detector1.detect();
   AWAIT_READY(leader1);
   EXPECT_SOME_EQ(master1, leader1.get());
 
@@ -252,7 +252,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
   AWAIT_READY(contended2);
 
   ZooKeeperMasterDetector detector2(url.get());
-  Future<Result<UPID> > leader2 = detector2.detect();
+  Future<Option<UPID> > leader2 = detector2.detect();
   AWAIT_READY(leader2);
   EXPECT_SOME_EQ(master1, leader2.get());
 
@@ -261,12 +261,69 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
   // Destroying detector1 (below) causes leadership change.
   delete contender1;
 
-  Future<Result<UPID> > leader3 = detector2.detect(master1);
+  Future<Option<UPID> > leader3 = detector2.detect(master1);
   AWAIT_READY(leader3);
   EXPECT_SOME_EQ(master2, leader3.get());
 }
 
 
+// Verifies that contender and detector operations fail when facing
+// non-retryable errors returned by ZooKeeper.
+TEST_F(ZooKeeperMasterContenderDetectorTest, NonRetryableFrrors)
+{
+  // group1 creates a base directory in ZooKeeper and sets the
+  // credential for the user.
+  zookeeper::Group group1(
+      server->connectString(),
+      MASTER_CONTENDER_ZK_SESSION_TIMEOUT,
+      "/mesos",
+      zookeeper::Authentication("digest", "member:member"));
+  AWAIT_READY(group1.join("data"));
+
+  PID<Master> master;
+  master.ip = 10000000;
+  master.port = 10000;
+
+  // group2's password is wrong and operations on it will fail.
+  Owned<zookeeper::Group> group2(new Group(
+      server->connectString(),
+      MASTER_CONTENDER_ZK_SESSION_TIMEOUT,
+      "/mesos",
+      zookeeper::Authentication("digest", "member:wrongpass")));
+  ZooKeeperMasterContender contender(group2);
+  contender.initialize(master);
+
+  // Fails due to authentication error.
+  AWAIT_FAILED(contender.contend());
+
+  // Now test non-retryable failures in detection.
+  ZooKeeperTest::TestWatcher watcher;
+  ZooKeeper authenticatedZk(server->connectString(), NO_TIMEOUT, &watcher);
+  watcher.awaitSessionEvent(ZOO_CONNECTED_STATE);
+  authenticatedZk.authenticate("digest", "creator:creator");
+
+  // Creator of the base path restricts the all accesses to be
+  // itself.
+  ACL onlyCreatorCanAccess[] = {{ ZOO_PERM_ALL, ZOO_AUTH_IDS }};
+  authenticatedZk.create("/test",
+                         "42",
+                         (ACL_vector) {1, onlyCreatorCanAccess},
+                         0,
+                         NULL);
+  ASSERT_ZK_GET("42", &authenticatedZk, "/test");
+
+  // group3 cannot read the base path thus detector should fail.
+  Owned<Group> group3(new Group(
+      server->connectString(),
+      MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
+      "/test",
+      None()));
+
+  ZooKeeperMasterDetector detector(group3);
+  AWAIT_FAILED(detector.detect());
+}
+
+
 // Master contention and detection fail when the network is down, it
 // recovers when the network is back up.
 TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
@@ -292,7 +349,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
 
   ZooKeeperMasterDetector detector(url.get());
 
-  Future<Result<UPID> > leader = detector.detect();
+  Future<Option<UPID> > leader = detector.detect();
   AWAIT_READY(leader);
   EXPECT_SOME_EQ(master, leader.get());
 
@@ -365,7 +422,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
 
   ZooKeeperMasterDetector leaderDetector(leaderGroup);
 
-  Future<Result<UPID> > detected = leaderDetector.detect();
+  Future<Option<UPID> > detected = leaderDetector.detect();
   AWAIT_READY(detected);
   EXPECT_SOME_EQ(leader, detected.get());
 
@@ -418,9 +475,9 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
   AWAIT_READY(nonContenderReconnecting);
 
   // Now the detectors re-detect.
-  Future<Result<UPID> > leaderDetected = leaderDetector.detect(leader);
-  Future<Result<UPID> > followerDetected = followerDetector.detect(leader);
-  Future<Result<UPID> > nonContenderDetected =
+  Future<Option<UPID> > leaderDetected = leaderDetector.detect(leader);
+  Future<Option<UPID> > followerDetected = followerDetector.detect(leader);
+  Future<Option<UPID> > nonContenderDetected =
     nonContenderDetector.detect(leader);
 
   Clock::pause();
@@ -474,12 +531,12 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
 
   ZooKeeperMasterDetector leaderDetector(url.get());
 
-  Future<Result<UPID> > detected = leaderDetector.detect();
+  Future<Option<UPID> > detected = leaderDetector.detect();
   AWAIT_READY(detected);
   EXPECT_SOME_EQ(leader, detected.get());
 
   // Keep detecting.
-  Future<Result<UPID> > newLeaderDetected =
+  Future<Option<UPID> > newLeaderDetected =
     leaderDetector.detect(detected.get());
 
   // Simulate a following master.
@@ -545,7 +602,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession)
 
   ZooKeeperMasterDetector slaveDetector(group);
 
-  Future<Result<UPID> > detected = slaveDetector.detect();
+  Future<Option<UPID> > detected = slaveDetector.detect();
   AWAIT_READY(detected);
   EXPECT_SOME_EQ(master, detected.get());
 
@@ -598,7 +655,7 @@ TEST_F(ZooKeeperMasterContenderDetectorTest,
   Future<Future<Nothing> > contended = leaderContender.contend();
   AWAIT_READY(contended);
 
-  Future<Result<UPID> > detected = leaderDetector.detect(None());
+  Future<Option<UPID> > detected = leaderDetector.detect(None());
   AWAIT_READY(detected);
   EXPECT_SOME_EQ(leader, detected.get());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/tests/zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper_tests.cpp b/src/tests/zookeeper_tests.cpp
index 62ab1bc..94d324a 100644
--- a/src/tests/zookeeper_tests.cpp
+++ b/src/tests/zookeeper_tests.cpp
@@ -156,7 +156,7 @@ TEST_F(ZooKeeperTest, LeaderDetector)
   LeaderDetector detector(&group);
 
   // Detect the leader.
-  Future<Result<Group::Membership> > leader =
+  Future<Option<Group::Membership> > leader =
     detector.detect(None());
   AWAIT_READY(leader);
   ASSERT_SOME_EQ(membership1.get(), leader.get());
@@ -205,7 +205,7 @@ TEST_F(ZooKeeperTest, LeaderDetectorTimeoutHandling)
   AWAIT_READY(membership1);
   Future<bool> cancelled = membership1.get().cancelled();
 
-  Future<Result<Group::Membership> > leader = detector.detect();
+  Future<Option<Group::Membership> > leader = detector.detect();
 
   AWAIT_READY(leader);
   EXPECT_SOME(leader.get());

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/zookeeper/detector.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/detector.cpp b/src/zookeeper/detector.cpp
index 1de3663..6723d03 100644
--- a/src/zookeeper/detector.cpp
+++ b/src/zookeeper/detector.cpp
@@ -28,8 +28,8 @@ public:
   virtual void initialize();
 
   // LeaderDetector implementation.
-  Future<Result<Group::Membership> > detect(
-      const Result<Group::Membership>& previous);
+  Future<Option<Group::Membership> > detect(
+      const Option<Group::Membership>& previous);
 
 private:
   // Helper that sets up the watch on the group.
@@ -39,8 +39,8 @@ private:
   void watched(Future<set<Group::Membership> > memberships);
 
   Group* group;
-  Result<Group::Membership> leader;
-  set<Promise<Result<Group::Membership> >*> promises;
+  Option<Group::Membership> leader;
+  set<Promise<Option<Group::Membership> >*> promises;
 };
 
 
@@ -50,8 +50,8 @@ LeaderDetectorProcess::LeaderDetectorProcess(Group* _group)
 
 LeaderDetectorProcess::~LeaderDetectorProcess()
 {
-  foreach (Promise<Result<Group::Membership> >* promise, promises) {
-    promise->fail("No longer detecting leader");
+  foreach (Promise<Option<Group::Membership> >* promise, promises) {
+    promise->future().discard();
     delete promise;
   }
   promises.clear();
@@ -64,23 +64,18 @@ void LeaderDetectorProcess::initialize()
 }
 
 
-Future<Result<Group::Membership> > LeaderDetectorProcess::detect(
-    const Result<Group::Membership>& previous)
+Future<Option<Group::Membership> > LeaderDetectorProcess::detect(
+    const Option<Group::Membership>& previous)
 {
   // Return immediately if the incumbent leader is different from the
   // expected.
-  if (leader.isError() != previous.isError() ||
-      leader.isNone() != previous.isNone() ||
-      leader.isSome() != previous.isSome()) {
-    return leader; // State change.
-  } else if (leader.isSome() && previous.isSome() &&
-             leader.get() != previous.get()) {
-    return leader; // Leadership change.
+  if (leader != previous) {
+    return leader;
   }
 
   // Otherwise wait for the next election result.
-  Promise<Result<Group::Membership> >* promise =
-    new Promise<Result<Group::Membership> >();
+  Promise<Option<Group::Membership> >* promise =
+    new Promise<Option<Group::Membership> >();
   promises.insert(promise);
   return promise->future();
 }
@@ -95,22 +90,19 @@ void LeaderDetectorProcess::watch(const set<Group::Membership>& expected)
 
 void LeaderDetectorProcess::watched(Future<set<Group::Membership> > memberships)
 {
+  CHECK(!memberships.isDiscarded());
+
   if (memberships.isFailed()) {
     LOG(ERROR) << "Failed to watch memberships: " << memberships.failure();
-    leader = Error(memberships.failure());
-    foreach (Promise<Result<Group::Membership> >* promise, promises) {
-      promise->set(leader);
+    leader = None();
+    foreach (Promise<Option<Group::Membership> >* promise, promises) {
+      promise->fail(memberships.failure());
       delete promise;
     }
     promises.clear();
-
-    // Start over.
-    watch(set<Group::Membership>());
     return;
   }
 
-  CHECK(memberships.isReady()) << "Not expecting Group to discard futures";
-
   // Update leader status based on memberships.
   if (leader.isSome() && memberships.get().count(leader.get()) == 0) {
     VLOG(1) << "The current leader (id=" << leader.get().id() << ") is lost";
@@ -121,24 +113,17 @@ void LeaderDetectorProcess::watched(Future<set<Group::Membership> > memberships)
   // incumbent wins the election.
   Option<Group::Membership> current;
   foreach (const Group::Membership& membership, memberships.get()) {
-    if (current.isNone() || membership.id() < current.get().id()) {
-      current = membership;
-    }
+    current = min(current, membership);
   }
 
-  if (current.isSome() && (!leader.isSome() || current.get() != leader.get())) {
-    LOG(INFO) << "Detected a new leader (id='" << current.get().id()
-              << "')";
-    foreach (Promise<Result<Group::Membership> >* promise, promises) {
-      promise->set(current);
-      delete promise;
-    }
-    promises.clear();
-  } else if (current.isNone() && !leader.isNone()) {
-    LOG(INFO) << "No new leader is elected after election";
+  if (current != leader) {
+    LOG(INFO) << "Detected a new leader: "
+              << (current.isSome()
+                  ? "'(id='" + stringify(current.get().id()) + "')"
+                  : "None");
 
-    foreach (Promise<Result<Group::Membership> >* promise, promises) {
-      promise->set(Result<Group::Membership>::none());
+    foreach (Promise<Option<Group::Membership> >* promise, promises) {
+      promise->set(current);
       delete promise;
     }
     promises.clear();
@@ -164,8 +149,8 @@ LeaderDetector::~LeaderDetector()
 }
 
 
-Future<Result<Group::Membership> > LeaderDetector::detect(
-    const Result<Group::Membership>& membership)
+Future<Option<Group::Membership> > LeaderDetector::detect(
+    const Option<Group::Membership>& membership)
 {
   return dispatch(process, &LeaderDetectorProcess::detect, membership);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1767e38/src/zookeeper/detector.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/detector.hpp b/src/zookeeper/detector.hpp
index de4acab..00fcf5e 100644
--- a/src/zookeeper/detector.hpp
+++ b/src/zookeeper/detector.hpp
@@ -26,17 +26,20 @@ public:
   // Returns some membership after an election has occurred and a
   // leader (membership) is elected, or none if an election occurs and
   // no leader is elected (e.g., all memberships are lost).
-  // The result is an error if the detector is not able to detect the
-  // leader, possibly due to network disconnection.
+  // A failed future is returned if the detector is unable to detect
+  // the leading master due to a non-retryable error.
+  // Note that the detector transparently tries to recover from
+  // retryable errors.
+  // The future is never discarded unless it stays pending when the
+  // detector destructs.
   //
   // The 'previous' result (if any) should be passed back if this
   // method is called repeatedly so the detector only returns when it
-  // gets a different result, either because an error is recovered or
-  // the elected membership is different from the 'previous'.
+  // gets a different result.
   //
   // TODO(xujyan): Use a Stream abstraction instead.
-  process::Future<Result<Group::Membership> > detect(
-      const Result<Group::Membership>& previous = None());
+  process::Future<Option<Group::Membership> > detect(
+      const Option<Group::Membership>& previous = None());
 
 private:
   LeaderDetectorProcess* process;