You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/11/19 19:57:42 UTC

[1/8] git commit: Increased the minimum number of threads that libprocess statically allocates.

Updated Branches:
  refs/heads/master d46a240ce -> f9d1dd819


Increased the minimum number of threads that libprocess statically
allocates.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/15658


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9fa1d473
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9fa1d473
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9fa1d473

Branch: refs/heads/master
Commit: 9fa1d473bd2d26b991d83385aeb3186e337245ff
Parents: ea5e10d
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Nov 19 10:38:57 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Nov 19 10:57:18 2013 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9fa1d473/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 0cc0ace..0c816cc 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1364,7 +1364,16 @@ void initialize(const string& delegate)
   socket_manager = new SocketManager();
 
   // Setup processing threads.
-  long cpus = std::max(4L, sysconf(_SC_NPROCESSORS_ONLN));
+  // We create no fewer than 8 threads because some tests require
+  // more worker threads than 'sysconf(_SC_NPROCESSORS_ONLN)' on
+  // computers with fewer cores.
+  // e.g. https://issues.apache.org/jira/browse/MESOS-818
+  //
+  // TODO(xujyan): Use a smarter algorithm to allocate threads.
+  // Allocating a static number of threads can cause starvation if
+  // there are more waiting Processes than the number of worker
+  // threads.
+  long cpus = std::max(8L, sysconf(_SC_NPROCESSORS_ONLN));
 
   for (int i = 0; i < cpus; i++) {
     pthread_t thread; // For now, not saving handles on our threads.


[6/8] git commit: Added Master contender and detector abstractions.

Posted by vi...@apache.org.
Added Master contender and detector abstractions.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/13087


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bcd1dc4e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bcd1dc4e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bcd1dc4e

Branch: refs/heads/master
Commit: bcd1dc4e10a1cff4fdc4e92daff108b6fa0475d3
Parents: 9fa1d47
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Nov 19 10:39:15 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Nov 19 10:57:19 2013 -0800

----------------------------------------------------------------------
 src/Makefile.am                               |  71 +--
 src/master/contender.cpp                      | 206 +++++++
 src/master/contender.hpp                      | 127 ++++
 src/master/detector.cpp                       | 361 ++++++++++++
 src/master/detector.hpp                       | 129 ++++
 src/tests/master_contender_detector_tests.cpp | 647 +++++++++++++++++++++
 src/tests/master_detector_tests.cpp           | 628 --------------------
 7 files changed, 1508 insertions(+), 661 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 3c48aee..969aead 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -161,7 +161,9 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	sasl/auxprop.cpp						\
 	sched/sched.cpp							\
 	local/local.cpp							\
+	master/contender.cpp						\
 	master/constants.cpp						\
+	master/detector.cpp						\
 	master/drf_sorter.cpp						\
 	master/http.cpp							\
 	master/master.cpp						\
@@ -224,7 +226,10 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp		\
 	linux/fs.hpp local/flags.hpp local/local.hpp			\
 	logging/flags.hpp logging/logging.hpp				\
 	master/allocator.hpp						\
-	master/constants.hpp master/drf_sorter.hpp master/flags.hpp	\
+	master/contender.hpp						\
+	master/constants.hpp						\
+	master/detector.hpp						\
+	master/drf_sorter.hpp master/flags.hpp				\
 	master/hierarchical_allocator_process.hpp			\
 	master/registrar.hpp						\
 	master/master.hpp master/sorter.hpp				\
@@ -779,38 +784,38 @@ balloon_executor_LDADD = libmesos.la
 
 check_PROGRAMS += mesos-tests
 
-mesos_tests_SOURCES =			\
-  tests/allocator_tests.cpp		\
-  tests/attributes_tests.cpp		\
-  tests/authentication_tests.cpp	\
-  tests/environment.cpp			\
-  tests/examples_tests.cpp		\
-  tests/exception_tests.cpp		\
-  tests/fault_tolerance_tests.cpp	\
-  tests/files_tests.cpp			\
-  tests/flags.cpp			\
-  tests/gc_tests.cpp			\
-  tests/isolator_tests.cpp		\
-  tests/log_tests.cpp			\
-  tests/logging_tests.cpp		\
-  tests/main.cpp			\
-  tests/master_detector_tests.cpp	\
-  tests/master_tests.cpp		\
-  tests/mesos.cpp			\
-  tests/monitor_tests.cpp		\
-  tests/paths_tests.cpp			\
-  tests/protobuf_io_tests.cpp		\
-  tests/reaper_tests.cpp		\
-  tests/registrar_tests.cpp		\
-  tests/resource_offers_tests.cpp	\
-  tests/resources_tests.cpp		\
-  tests/sasl_tests.cpp			\
-  tests/script.cpp			\
-  tests/slave_recovery_tests.cpp	\
-  tests/sorter_tests.cpp		\
-  tests/state_tests.cpp			\
-  tests/status_update_manager_tests.cpp	\
-  tests/utils.cpp			\
+mesos_tests_SOURCES =				\
+  tests/allocator_tests.cpp			\
+  tests/attributes_tests.cpp			\
+  tests/authentication_tests.cpp		\
+  tests/environment.cpp				\
+  tests/examples_tests.cpp			\
+  tests/exception_tests.cpp			\
+  tests/fault_tolerance_tests.cpp		\
+  tests/files_tests.cpp				\
+  tests/flags.cpp				\
+  tests/gc_tests.cpp				\
+  tests/isolator_tests.cpp			\
+  tests/log_tests.cpp				\
+  tests/logging_tests.cpp			\
+  tests/main.cpp				\
+  tests/master_contender_detector_tests.cpp	\
+  tests/master_tests.cpp			\
+  tests/mesos.cpp				\
+  tests/monitor_tests.cpp			\
+  tests/paths_tests.cpp				\
+  tests/protobuf_io_tests.cpp			\
+  tests/reaper_tests.cpp			\
+  tests/registrar_tests.cpp			\
+  tests/resource_offers_tests.cpp		\
+  tests/resources_tests.cpp			\
+  tests/sasl_tests.cpp				\
+  tests/script.cpp				\
+  tests/slave_recovery_tests.cpp		\
+  tests/sorter_tests.cpp			\
+  tests/state_tests.cpp				\
+  tests/status_update_manager_tests.cpp		\
+  tests/utils.cpp				\
   tests/zookeeper_url_tests.cpp
 
 mesos_tests_CPPFLAGS = $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/master/contender.cpp
----------------------------------------------------------------------
diff --git a/src/master/contender.cpp b/src/master/contender.cpp
new file mode 100644
index 0000000..84b0552
--- /dev/null
+++ b/src/master/contender.cpp
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <process/defer.hpp>
+#include <process/process.hpp>
+
+#include <stout/check.hpp>
+#include <stout/lambda.hpp>
+
+#include "master/contender.hpp"
+#include "master/master.hpp"
+
+#include "zookeeper/contender.hpp"
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+using std::string;
+
+using namespace process;
+using namespace zookeeper;
+
+namespace mesos {
+namespace internal {
+
+using namespace master;
+
+const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT = Seconds(10);
+
+
+class ZooKeeperMasterContenderProcess
+  : public Process<ZooKeeperMasterContenderProcess>
+{
+public:
+  ZooKeeperMasterContenderProcess(const zookeeper::URL& url);
+  ZooKeeperMasterContenderProcess(Owned<zookeeper::Group> group);
+  ~ZooKeeperMasterContenderProcess();
+
+  void initialize(const PID<Master>& master);
+
+  // MasterContender implementation.
+  virtual Future<Future<Nothing> > contend();
+
+private:
+  Owned<zookeeper::Group> group;
+  LeaderContender* contender;
+  PID<Master> master;
+};
+
+
+Try<MasterContender*> MasterContender::create(const string& zk)
+{
+  if (zk == "") {
+    return new StandaloneMasterContender();
+  } else if (strings::startsWith(zk, "zk://")) {
+    Try<zookeeper::URL> url = URL::parse(zk);
+    if (url.isError()) {
+      return Try<MasterContender*>::error(url.error());
+    }
+    if (url.get().path == "/") {
+      return Try<MasterContender*>::error(
+          "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
+    }
+    return new ZooKeeperMasterContender(url.get());
+  } else if (strings::startsWith(zk, "file://")) {
+    const string& path = zk.substr(7);
+    const Try<string> read = os::read(path);
+    if (read.isError()) {
+      return Error("Failed to read from file at '" + path + "'");
+    }
+
+    return create(strings::trim(read.get()));
+  }
+
+  return Try<MasterContender*>::error("Failed to parse '" + zk + "'");
+}
+
+
+MasterContender::~MasterContender() {}
+
+
+StandaloneMasterContender::~StandaloneMasterContender()
+{
+  if (promise != NULL) {
+    promise->set(Nothing()); // Leadership lost.
+    delete promise;
+  }
+}
+
+
+void StandaloneMasterContender::initialize(
+    const PID<master::Master>& master)
+{
+  // We don't really need to store the master in this basic
+  // implementation so we just restore an 'initialized' flag to make
+  // sure it is called.
+  initialized = true;
+}
+
+Future<Future<Nothing> > StandaloneMasterContender::contend()
+{
+  CHECK(initialized) << "Initialize the contender first";
+
+  if (promise != NULL) {
+    LOG(INFO) << "Withdrawing the previous membership before recontending";
+    promise->set(Nothing());
+    delete promise;
+  }
+
+  // Directly return a future that is always pending because it
+  // represents a membership/leadership that is not going to be lost
+  // until we 'withdraw'.
+  promise = new Promise<Nothing>();
+  return promise->future();
+}
+
+
+ZooKeeperMasterContender::ZooKeeperMasterContender(const URL& url)
+{
+  process = new ZooKeeperMasterContenderProcess(url);
+  spawn(process);
+}
+
+
+ZooKeeperMasterContender::ZooKeeperMasterContender(Owned<Group> group)
+{
+  process = new ZooKeeperMasterContenderProcess(group);
+  spawn(process);
+}
+
+
+ZooKeeperMasterContender::~ZooKeeperMasterContender()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+void ZooKeeperMasterContender::initialize(
+    const PID<master::Master>& master)
+{
+  process->initialize(master);
+}
+
+
+Future<Future<Nothing> > ZooKeeperMasterContender::contend()
+{
+  return dispatch(process, &ZooKeeperMasterContenderProcess::contend);
+}
+
+
+ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
+    const URL& url)
+  : group(new Group(url, MASTER_CONTENDER_ZK_SESSION_TIMEOUT)),
+    contender(NULL) {}
+
+
+ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
+    Owned<Group> _group)
+  : group(_group),
+    contender(NULL) {}
+
+
+ZooKeeperMasterContenderProcess::~ZooKeeperMasterContenderProcess()
+{
+  delete contender;
+}
+
+void ZooKeeperMasterContenderProcess::initialize(
+    const PID<Master>& _master)
+{
+  master = _master;
+}
+
+
+Future<Future<Nothing> > ZooKeeperMasterContenderProcess::contend()
+{
+  CHECK(master) << "Initialize the contender first";
+
+  if (contender != NULL) {
+    LOG(INFO) << "Withdrawing the previous membership before recontending";
+    delete contender;
+  }
+
+  contender = new LeaderContender(group.get(), master);
+  return contender->contend();
+}
+
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/master/contender.hpp
----------------------------------------------------------------------
diff --git a/src/master/contender.hpp b/src/master/contender.hpp
new file mode 100644
index 0000000..50fd4f3
--- /dev/null
+++ b/src/master/contender.hpp
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_CONTENDER_HPP__
+#define __MASTER_CONTENDER_HPP__
+
+#include <process/defer.hpp>
+#include <process/future.hpp>
+#include <process/pid.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include <stout/owned.hpp>
+
+#include "zookeeper/contender.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+namespace mesos {
+namespace internal {
+
+extern const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT;
+
+
+// Forward declarations.
+namespace master {
+class Master;
+}
+
+
+class ZooKeeperMasterContenderProcess;
+
+
+// An abstraction for contending to be a leading master.
+class MasterContender
+{
+public:
+  // Attempts to create a master contender using the specified
+  // Zookeeper.
+  // The Zookeeper address should be one of:
+  //   - zk://host1:port1,host2:port2,.../path
+  //   - zk://username:password@host1:port1,host2:port2,.../path
+  //   - file:///path/to/file (where file contains one of the above)
+  // Note that the returned contender still needs to be 'initialize()'d.
+  static Try<MasterContender*> create(const std::string& zk);
+
+  // Note that the contender's membership, if obtained, is scheduled
+  // to be cancelled during destruction.
+  virtual ~MasterContender() = 0;
+
+  // Initializes the contender with the PID of the master it contends
+  // on behalf of.
+  virtual void initialize(const process::PID<master::Master>& master) = 0;
+
+  // Returns a Future<Nothing> once the contender has entered the
+  // contest (by obtaining a membership) and an error otherwise.
+  // The inner Future returns Nothing when the contender is out of
+  // the contest (i.e. its membership is lost).
+  //
+  // This method can be used to contend again. Each call to this
+  // method causes the previous candidacy to be withdrawn before
+  // re-contending.
+  virtual process::Future<process::Future<Nothing> > contend() = 0;
+};
+
+
+// A basic implementation which assumes only one master is
+// contending.
+class StandaloneMasterContender : public MasterContender
+{
+public:
+  StandaloneMasterContender()
+    : initialized(false),
+      promise(NULL) {}
+  virtual ~StandaloneMasterContender();
+
+  // MasterContender implementation.
+  virtual void initialize(const process::PID<master::Master>& master);
+
+  // In this basic implementation the outer Future directly returns
+  // and inner Future stays pending because there is only one
+  // contender in the contest.
+  virtual process::Future<process::Future<Nothing> > contend();
+
+private:
+  bool initialized;
+  process::Promise<Nothing>* promise;
+};
+
+
+class ZooKeeperMasterContender : public MasterContender
+{
+public:
+  // Creates a contender that uses ZooKeeper to determine (i.e.,
+  // elect) a leading master.
+  ZooKeeperMasterContender(const zookeeper::URL& url);
+  ZooKeeperMasterContender(Owned<zookeeper::Group> group);
+
+  virtual ~ZooKeeperMasterContender();
+
+  // MasterContender implementation.
+  virtual void initialize(const process::PID<master::Master>& master);
+  virtual process::Future<process::Future<Nothing> > contend();
+
+private:
+  ZooKeeperMasterContenderProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MASTER_CONTENDER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/master/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector.cpp b/src/master/detector.cpp
new file mode 100644
index 0000000..2f73f66
--- /dev/null
+++ b/src/master/detector.cpp
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <set>
+#include <string>
+
+#include <tr1/functional>
+#include <tr1/memory> // TODO(benh): Replace shared_ptr with unique_ptr.
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/logging.hpp>
+#include <process/process.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+
+#include "master/detector.hpp"
+
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+using namespace process;
+using namespace zookeeper;
+
+using std::set;
+using std::string;
+
+namespace mesos {
+namespace internal {
+
+const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10);
+
+
+class StandaloneMasterDetectorProcess
+  : public Process<StandaloneMasterDetectorProcess>
+{
+public:
+  StandaloneMasterDetectorProcess() : leader(None()) {}
+  StandaloneMasterDetectorProcess(const UPID& _leader)
+    : leader(_leader) {}
+  ~StandaloneMasterDetectorProcess();
+
+  void appoint(const Result<UPID>& leader);
+  Future<Result<UPID> > detect(const Result<UPID>& previous = None());
+
+private:
+  // The leading master that's directly 'appoint()'ed.
+  Result<UPID> leader;
+
+  // Promises for the detection result.
+  set<Promise<Result<UPID> >*> promises;
+};
+
+
+class ZooKeeperMasterDetectorProcess
+  : public Process<ZooKeeperMasterDetectorProcess>
+{
+public:
+  ZooKeeperMasterDetectorProcess(const URL& url);
+  ZooKeeperMasterDetectorProcess(Owned<Group> group);
+  ~ZooKeeperMasterDetectorProcess();
+
+  virtual void initialize();
+
+  // ZooKeeperMasterDetector implementation.
+  Future<Result<UPID> > detect(const Result<UPID>& previous);
+
+private:
+  // Invoked when the group leadership has changed.
+  void detected(Future<Result<Group::Membership> > leader);
+
+  // Invoked when we have fetched the data associated with the leader.
+  void fetched(const Future<string>& data);
+
+  Owned<Group> group;
+  LeaderDetector detector;
+
+  // The leading Master.
+  Result<UPID> leader;
+  set<Promise<Result<UPID> >*> promises;
+};
+
+
+Try<MasterDetector*> MasterDetector::create(const string& master)
+{
+  if (master == "") {
+    return new StandaloneMasterDetector();
+  } else if (master.find("zk://") == 0) {
+    Try<URL> url = URL::parse(master);
+    if (url.isError()) {
+      return Try<MasterDetector*>::error(url.error());
+    }
+    if (url.get().path == "/") {
+      return Try<MasterDetector*>::error(
+          "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
+    }
+    return new ZooKeeperMasterDetector(url.get());
+  } else if (master.find("file://") == 0) {
+    const string& path = master.substr(7);
+    const Try<string> read = os::read(path);
+    if (read.isError()) {
+      return Error("Failed to read from file at '" + path + "'");
+    }
+
+    return create(strings::trim(read.get()));
+  }
+
+  // Okay, try and parse what we got as a PID.
+  UPID pid = master.find("master@") == 0
+    ? UPID(master)
+    : UPID("master@" + master);
+
+  if (!pid) {
+    return Try<MasterDetector*>::error(
+        "Failed to parse '" + master + "'");
+  }
+
+  return new StandaloneMasterDetector(pid);
+}
+
+
+MasterDetector::~MasterDetector() {}
+
+
+StandaloneMasterDetectorProcess::~StandaloneMasterDetectorProcess()
+{
+  foreach (Promise<Result<UPID> >* promise, promises) {
+    promise->set(Result<UPID>::error("MasterDetector is being destructed"));
+    delete promise;
+  }
+  promises.clear();
+}
+
+
+void StandaloneMasterDetectorProcess::appoint(
+    const Result<process::UPID>& _leader)
+{
+  leader = _leader;
+
+  foreach (Promise<Result<UPID> >* promise, promises) {
+    promise->set(leader);
+    delete promise;
+  }
+  promises.clear();
+}
+
+
+Future<Result<UPID> > StandaloneMasterDetectorProcess::detect(
+    const Result<UPID>& previous)
+{
+  // Directly return the current leader is not the
+  // same as the previous one.
+  if (leader.isError() != previous.isError() ||
+      leader.isNone() != previous.isNone() ||
+      leader.isSome() != previous.isSome()) {
+    return leader; // State change.
+  } else if (leader.isSome() && previous.isSome() &&
+             leader.get() != previous.get()) {
+    return leader; // Leadership change.
+  }
+
+  Promise<Result<UPID> >* promise = new Promise<Result<UPID> >();
+  promises.insert(promise);
+  return promise->future();
+}
+
+
+StandaloneMasterDetector::StandaloneMasterDetector()
+{
+  process = new StandaloneMasterDetectorProcess();
+  spawn(process);
+}
+
+
+StandaloneMasterDetector::StandaloneMasterDetector(const UPID& leader)
+{
+  process = new StandaloneMasterDetectorProcess(leader);
+  spawn(process);
+}
+
+
+StandaloneMasterDetector::~StandaloneMasterDetector()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+void StandaloneMasterDetector::appoint(const Result<process::UPID>& leader)
+{
+  return dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader);
+}
+
+
+Future<Result<UPID> > StandaloneMasterDetector::detect(
+    const Result<UPID>& previous)
+{
+  return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous);
+}
+
+
+// TODO(benh): Get ZooKeeper timeout from configuration.
+// TODO(xujyan): Use peer constructor after switching to C++ 11.
+ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
+    const URL& url)
+  : group(new Group(url.servers,
+                    MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
+                    url.path,
+                    url.authentication)),
+    detector(group.get()),
+    leader(None()) {}
+
+
+ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
+    Owned<Group> _group)
+  : group(_group),
+    detector(group.get()),
+    leader(None()) {}
+
+
+ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
+{
+  foreach (Promise<Result<UPID> >* promise, promises) {
+    promise->set(Result<UPID>::error("No longer detecting a master"));
+    delete promise;
+  }
+  promises.clear();
+}
+
+
+void ZooKeeperMasterDetectorProcess::initialize()
+{
+  detector.detect(None())
+    .onAny(defer(self(), &Self::detected, lambda::_1));
+}
+
+
+Future<Result<UPID> > ZooKeeperMasterDetectorProcess::detect(
+    const Result<UPID>& previous)
+{
+  // Directly return when the current leader and previous are not the
+  // same.
+  if (leader.isError() != previous.isError() ||
+      leader.isNone() != previous.isNone() ||
+      leader.isSome() != previous.isSome()) {
+    return leader; // State change.
+  } else if (leader.isSome() && previous.isSome() &&
+             leader.get() != previous.get()) {
+    return leader; // Leadership change.
+  }
+
+  Promise<Result<UPID> >* promise = new Promise<Result<UPID> >();
+  promises.insert(promise);
+  return promise->future();
+}
+
+
+void ZooKeeperMasterDetectorProcess::detected(
+    Future<Result<Group::Membership> > _leader)
+{
+  CHECK(_leader.isReady())
+    << "Not expecting LeaderDetector to fail or discard futures";
+
+  if (!_leader.get().isSome()) {
+    leader = _leader.get().isError()
+        ? Result<UPID>::error(_leader.get().error())
+        : Result<UPID>::none();
+
+    foreach (Promise<Result<UPID> >* promise, promises) {
+      promise->set(leader);
+      delete promise;
+    }
+    promises.clear();
+  } else {
+    // Fetch the data associated with the leader.
+    group->data(_leader.get().get())
+      .onAny(defer(self(), &Self::fetched, lambda::_1));
+  }
+
+  // Keep trying to detect leadership changes.
+  detector.detect(_leader.get())
+    .onAny(defer(self(), &Self::detected, lambda::_1));
+}
+
+
+void ZooKeeperMasterDetectorProcess::fetched(const Future<string>& data)
+{
+  if (data.isFailed()) {
+    leader = Error(data.failure());
+    foreach (Promise<Result<UPID> >* promise, promises) {
+      promise->set(leader);
+      delete promise;
+    }
+    promises.clear();
+    return;
+  }
+
+  CHECK(data.isReady()); // Not expecting Group to discard futures.
+
+  // Cache the master for subsequent requests.
+  leader = UPID(data.get());
+  LOG(INFO) << "A new leading master (UPID=" << leader.get() << ") is detected";
+
+  foreach (Promise<Result<UPID> >* promise, promises) {
+    promise->set(leader);
+    delete promise;
+  }
+  promises.clear();
+}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(const URL& url)
+{
+  process = new ZooKeeperMasterDetectorProcess(url);
+  spawn(process);
+}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(Owned<Group> group)
+{
+  process = new ZooKeeperMasterDetectorProcess(group);
+  spawn(process);
+}
+
+
+ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+Future<Result<UPID> > ZooKeeperMasterDetector::detect(
+    const Result<UPID>& previous)
+{
+  return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous);
+}
+
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/master/detector.hpp
----------------------------------------------------------------------
diff --git a/src/master/detector.hpp b/src/master/detector.hpp
new file mode 100644
index 0000000..ceb3a3f
--- /dev/null
+++ b/src/master/detector.hpp
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_DETECTOR_HPP__
+#define __MASTER_DETECTOR_HPP__
+
+#include <process/future.hpp>
+#include <process/pid.hpp>
+
+#include <stout/owned.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+namespace mesos {
+namespace internal {
+
+extern const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT;
+
+
+// Forward declarations.
+class StandaloneMasterDetectorProcess;
+class ZooKeeperMasterDetectorProcess;
+
+
+// An abstraction of a Master detector which can be used to
+// detect the leading master from a group.
+class MasterDetector
+{
+public:
+  // Attempts to create a master detector for the specified master.
+  // The master should be one of:
+  //   - host:port
+  //   - zk://host1:port1,host2:port2,.../path
+  //   - zk://username:password@host1:port1,host2:port2,.../path
+  //   - file:///path/to/file (where file contains one of the above)
+  static Try<MasterDetector*> create(const std::string& master);
+  virtual ~MasterDetector() = 0;
+
+  // Returns some PID after an election has occurred and the elected
+  // PID is different than that specified (if any), or NONE if an
+  // election occurs and no PID is elected (e.g., all PIDs are lost).
+  // The result is an error if the detector is not able to detect the
+  // leading master, possibly due to network disconnection.
+  //
+  // The future fails when the detector is destructed, it is never
+  // discarded.
+  //
+  // The 'previous' result (if any) should be passed back if this
+  // method is called repeatedly so the detector only returns when it
+  // gets a different result, either because an error is recovered or
+  // the elected membership is different from the 'previous'.
+  virtual process::Future<Result<process::UPID> > detect(
+      const Result<process::UPID>& previous = None()) = 0;
+};
+
+
+// A standalone implementation of the MasterDetector with no external
+// discovery mechanism so the user has to manually appoint a leader
+// to the detector for it to be detected.
+class StandaloneMasterDetector : public MasterDetector
+{
+public:
+  StandaloneMasterDetector();
+  // Use this constructor if the leader is known beforehand so it is
+  // unnecessary to call 'appoint()' separately.
+  StandaloneMasterDetector(const process::UPID& leader);
+  virtual ~StandaloneMasterDetector();
+
+  // Appoint the leading master so it can be *detected* by default.
+  // The leader can be NONE or ERROR.
+  // This method is used only by this basic implementation and not
+  // needed by child classes with other detection mechanisms such as
+  // Zookeeper.
+  //
+  // When used by Master, this method is called during its
+  // initialization process; when used by Slave and SchedulerDriver,
+  // the MasterDetector needs to have the leader installed prior to
+  // injection.
+  void appoint(const Result<process::UPID>& leader);
+
+  virtual process::Future<Result<process::UPID> > detect(
+      const Result<process::UPID>& previous = None());
+
+private:
+  StandaloneMasterDetectorProcess* process;
+};
+
+
+class ZooKeeperMasterDetector : public MasterDetector
+{
+public:
+  // Creates a detector which uses ZooKeeper to determine (i.e.,
+  // elect) a leading master.
+  ZooKeeperMasterDetector(const zookeeper::URL& url);
+  // A constructor overload for testing purposes.
+  ZooKeeperMasterDetector(Owned<zookeeper::Group> group);
+  virtual ~ZooKeeperMasterDetector();
+
+  // MasterDetector implementation.
+  virtual process::Future<Result<process::UPID> > detect(
+      const Result<process::UPID>& previous = None());
+
+private:
+  ZooKeeperMasterDetectorProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MASTER_DETECTOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
new file mode 100644
index 0000000..5e42374
--- /dev/null
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -0,0 +1,647 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <zookeeper.h>
+
+#include <gmock/gmock.h>
+
+#include <fstream>
+#include <map>
+#include <string>
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/pid.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/gtest.hpp>
+#include <stout/nothing.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/try.hpp>
+
+#include "master/contender.hpp"
+#include "master/detector.hpp"
+#include "master/master.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/isolator.hpp"
+#include "tests/mesos.hpp"
+#ifdef MESOS_HAS_JAVA
+#include "tests/zookeeper.hpp"
+#endif
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using namespace zookeeper;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+using process::UPID;
+
+using std::map;
+using std::string;
+using std::vector;
+
+using testing::_;
+using testing::AtMost;
+using testing::Return;
+
+
+class MasterContenderDetectorTest : public MesosTest {};
+
+
+TEST_F(MasterContenderDetectorTest, File)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Write "master" to a file and use the "file://" mechanism to
+  // create a master detector for the slave. Still requires a master
+  // detector for the master first.
+  slave::Flags flags = CreateSlaveFlags();
+
+  const string& path = path::join(flags.work_dir, "master");
+  ASSERT_SOME(os::write(path, stringify(master.get())));
+
+  Try<MasterDetector*> detector =
+    MasterDetector::create("file://" + path);
+
+  ASSERT_SOME(detector);
+
+  StartSlave(detector.get(), flags);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+TEST(BasicMasterContenderDetectorTest, Contender)
+{
+  PID<Master> master;
+  master.ip = 10000000;
+  master.port = 10000;
+
+  MasterContender* contender = new StandaloneMasterContender();
+
+  contender->initialize(master);
+
+  Future<Future<Nothing> > contended = contender->contend();
+  AWAIT_READY(contended);
+
+  Future<Nothing> lostCandidacy = contended.get();
+
+  // The candidacy is never lost.
+  EXPECT_TRUE(lostCandidacy.isPending());
+
+  delete contender;
+
+  // Deleting the contender also withdraws the previous candidacy.
+  AWAIT_READY(lostCandidacy);
+}
+
+
+TEST(BasicMasterContenderDetectorTest, Detector)
+{
+  PID<Master> master;
+  master.ip = 10000000;
+  master.port = 10000;
+
+  StandaloneMasterDetector detector;
+
+  Future<Result<UPID> > detected = detector.detect();
+
+  // No one has appointed the leader so we are pending.
+  EXPECT_TRUE(detected.isPending());
+
+  detector.appoint(master);
+
+  AWAIT_READY(detected);
+}
+
+
+#ifdef MESOS_HAS_JAVA
+class ZooKeeperMasterContenderDetectorTest : public ZooKeeperTest {};
+
+
+// A single contender gets elected automatically.
+TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender)
+{
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+      "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  Owned<zookeeper::Group> group(
+      new Group(url.get(), MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+
+  ZooKeeperMasterContender* contender = new ZooKeeperMasterContender(group);
+
+  PID<Master> master;
+  master.ip = 10000000;
+  master.port = 10000;
+
+  contender->initialize(master);
+  Future<Future<Nothing> > contended = contender->contend();
+  AWAIT_READY(contended);
+
+  ZooKeeperMasterDetector detector(url.get());
+
+  Future<Result<UPID> > leader = detector.detect();
+  EXPECT_SOME_EQ(master, leader.get());
+  Future<Nothing> lostCandidacy = contended.get();
+  leader = detector.detect(leader.get());
+
+  Future<Option<int64_t> > sessionId = group.get()->session();
+  AWAIT_READY(sessionId);
+  server->expireSession(sessionId.get().get());
+
+  // Session expiration causes candidacy to be lost and the
+  // Future<Nothing> to be fulfilled.
+  AWAIT_READY(lostCandidacy);
+  AWAIT_READY(leader);
+  EXPECT_NONE(leader.get());
+}
+
+
+// Two contenders, the first wins. Kill the first, then the second
+// is elected.
+TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
+{
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+      "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  ZooKeeperMasterContender* contender1 =
+    new ZooKeeperMasterContender(url.get());
+
+  PID<Master> master1;
+  master1.ip = 10000000;
+  master1.port = 10000;
+
+  contender1->initialize(master1);
+
+  Future<Future<Nothing> > contended1 = contender1->contend();
+  AWAIT_READY(contended1);
+
+  ZooKeeperMasterDetector detector1(url.get());
+
+  Future<Result<UPID> > leader1 = detector1.detect();
+  AWAIT_READY(leader1);
+  EXPECT_SOME_EQ(master1, leader1.get());
+
+  ZooKeeperMasterContender contender2(url.get());
+
+  PID<Master> master2;
+  master2.ip = 10000001;
+  master2.port = 10001;
+
+  contender2.initialize(master2);
+
+  Future<Future<Nothing> > contended2 = contender2.contend();
+  AWAIT_READY(contended2);
+
+  ZooKeeperMasterDetector detector2(url.get());
+  Future<Result<UPID> > leader2 = detector2.detect();
+  AWAIT_READY(leader2);
+  EXPECT_SOME_EQ(master1, leader2.get());
+
+  LOG(INFO) << "Killing the leading master";
+
+  // Destroying detector1 (below) causes leadership change.
+  delete contender1;
+
+  Future<Result<UPID> > leader3 = detector2.detect(master1);
+  AWAIT_READY(leader3);
+  EXPECT_SOME_EQ(master2, leader3.get());
+}
+
+
+// Master contention and detection fail when the network is down, it
+// recovers when the network is back up.
+TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
+{
+  Clock::pause();
+
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+      "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  ZooKeeperMasterContender contender(url.get());
+
+  PID<Master> master;
+  master.ip = 10000000;
+  master.port = 10000;
+
+  contender.initialize(master);
+
+  Future<Future<Nothing> > contended = contender.contend();
+  AWAIT_READY(contended);
+  Future<Nothing> lostCandidacy = contended.get();
+
+  ZooKeeperMasterDetector detector(url.get());
+
+  Future<Result<UPID> > leader = detector.detect();
+  EXPECT_SOME_EQ(master, leader.get());
+
+  leader = detector.detect(leader.get());
+
+  // Shut down ZooKeeper and expect things to fail after the timeout.
+  server->shutdownNetwork();
+
+  Clock::advance(std::max(
+      MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
+      MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+  Clock::settle();
+
+  AWAIT_EXPECT_FAILED(lostCandidacy);
+  AWAIT_READY(leader);
+  EXPECT_ERROR(leader.get());
+
+  // Retry.
+  contended = contender.contend();
+  leader = detector.detect(leader.get());
+
+  // Things will not change until the contender reconnects.
+  Clock::advance(Minutes(1));
+  Clock::settle();
+  EXPECT_TRUE(contended.isPending());
+  EXPECT_TRUE(leader.isPending());
+
+  server->startNetwork();
+  AWAIT_READY(contended);
+  AWAIT_READY(leader);
+}
+
+
+// Tests that detectors and contenders fail when we reach our
+// ZooKeeper session timeout. This is to enforce that we manually
+// expire the session when we do not get reconnected within the
+// timeout.
+TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
+{
+  // Use an arbitrary timeout value.
+  Duration sessionTimeout(Seconds(5));
+
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+        "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  Owned<zookeeper::Group> leaderGroup(new Group(url.get(), sessionTimeout));
+
+  // First we bring up three master contender/detector:
+  //   1. A leading contender.
+  //   2. A non-leading contender.
+  //   3. A non-contender (detector).
+
+  // 1. Simulate a leading contender.
+  ZooKeeperMasterContender leaderContender(leaderGroup);
+
+  PID<Master> leader;
+  leader.ip = 10000000;
+  leader.port = 10000;
+
+  leaderContender.initialize(leader);
+
+  Future<Future<Nothing> > contended = leaderContender.contend();
+  AWAIT_READY(contended);
+
+  ZooKeeperMasterDetector leaderDetector(leaderGroup);
+
+  Future<Result<UPID> > detected = leaderDetector.detect();
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // 2. Simulate a non-leading contender.
+  Owned<zookeeper::Group> followerGroup(new Group(url.get(), sessionTimeout));
+  ZooKeeperMasterContender followerContender(followerGroup);
+
+  PID<Master> follower;
+  follower.ip = 10000001;
+  follower.port = 10001;
+
+  followerContender.initialize(follower);
+
+  contended = followerContender.contend();
+  AWAIT_READY(contended);
+
+  ZooKeeperMasterDetector followerDetector(followerGroup);
+
+  detected = followerDetector.detect();
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // 3. Simulate a non-contender.
+  Owned<zookeeper::Group> nonContenderGroup(
+      new Group(url.get(), sessionTimeout));
+  ZooKeeperMasterDetector nonContenderDetector(nonContenderGroup);
+
+  detected = nonContenderDetector.detect();
+
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // Expecting the reconnecting event after we shut down the ZK.
+  Future<Nothing> leaderReconnecting = FUTURE_DISPATCH(
+      leaderGroup->process->self(),
+      &GroupProcess::reconnecting);
+
+  Future<Nothing> followerReconnecting = FUTURE_DISPATCH(
+      followerGroup->process->self(),
+      &GroupProcess::reconnecting);
+
+  Future<Nothing> nonContenderReconnecting = FUTURE_DISPATCH(
+      nonContenderGroup->process->self(),
+      &GroupProcess::reconnecting);
+
+  server->shutdownNetwork();
+
+  AWAIT_READY(leaderReconnecting);
+  AWAIT_READY(followerReconnecting);
+  AWAIT_READY(nonContenderReconnecting);
+
+  // Now the detectors re-detect.
+  Future<Result<UPID> > leaderNoMasterDetected = leaderDetector.detect(leader);
+  Future<Result<UPID> > followerNoMasterDetected =
+    followerDetector.detect(leader);
+  Future<Result<UPID> > nonContenderNoMasterDetected =
+    nonContenderDetector.detect(leader);
+
+  Clock::pause();
+
+  // We may need to advance multiple times because we could have
+  // advanced the clock before the timer in Group starts.
+  while (leaderNoMasterDetected.isPending() ||
+         followerNoMasterDetected.isPending() ||
+         nonContenderNoMasterDetected.isPending()) {
+    Clock::advance(sessionTimeout);
+    Clock::settle();
+  }
+
+  AWAIT_READY(leaderNoMasterDetected);
+  EXPECT_ERROR(leaderNoMasterDetected.get());
+  AWAIT_READY(followerNoMasterDetected);
+  EXPECT_ERROR(followerNoMasterDetected.get());
+  AWAIT_READY(nonContenderNoMasterDetected);
+  EXPECT_ERROR(nonContenderNoMasterDetected.get());
+
+  Clock::resume();
+}
+
+
+// Tests whether a leading master correctly detects a new master when
+// its ZooKeeper session is expired (the follower becomes the new
+// leader).
+TEST_F(ZooKeeperMasterContenderDetectorTest,
+       MasterDetectorExpireMasterZKSession)
+{
+  // Simulate a leading master.
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+      "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  PID<Master> leader;
+  leader.ip = 10000000;
+  leader.port = 10000;
+
+  // Create the group instance so we can expire its session.
+  Owned<zookeeper::Group> group(
+      new Group(url.get(), MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+
+  ZooKeeperMasterContender leaderContender(group);
+
+  leaderContender.initialize(leader);
+
+  Future<Future<Nothing> > leaderContended = leaderContender.contend();
+  AWAIT_READY(leaderContended);
+
+  Future<Nothing> leaderLostLeadership = leaderContended.get();
+
+  ZooKeeperMasterDetector leaderDetector(url.get());
+
+  Future<Result<UPID> > detected = leaderDetector.detect();
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // Keep detecting.
+  Future<Result<UPID> > newLeaderDetected =
+    leaderDetector.detect(detected.get());
+
+  // Simulate a following master.
+  PID<Master> follower;
+  follower.ip = 10000001;
+  follower.port = 10001;
+
+  ZooKeeperMasterDetector followerDetector(url.get());
+  ZooKeeperMasterContender followerContender(url.get());
+  followerContender.initialize(follower);
+
+  Future<Future<Nothing> > followerContended = followerContender.contend();
+  AWAIT_READY(followerContended);
+
+  LOG(INFO) << "The follower now is detecting the leader";
+  detected = followerDetector.detect(None());
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // Now expire the leader's zk session.
+  Future<Option<int64_t> > session = group->session();
+  AWAIT_READY(session);
+  EXPECT_SOME(session.get());
+
+  LOG(INFO) << "Now expire the ZK session: " << std::hex << session.get().get();
+
+  server->expireSession(session.get().get());
+
+  AWAIT_READY(leaderLostLeadership);
+
+  // Wait for session expiration and ensure the former leader detects
+  // a new leader.
+  AWAIT_READY(newLeaderDetected);
+  EXPECT_SOME(newLeaderDetected.get());
+  EXPECT_EQ(follower, newLeaderDetected.get().get());
+}
+
+
+// Tests whether a slave correctly DOES NOT disconnect from the
+// master when its ZooKeeper session is expired, but the master still
+// stays the leader when the slave re-connects with the ZooKeeper.
+TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession)
+{
+  // Simulate a leading master.
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+      "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  PID<Master> master;
+  master.ip = 10000000;
+  master.port = 10000;
+
+  ZooKeeperMasterContender masterContender(url.get());
+  masterContender.initialize(master);
+
+  Future<Future<Nothing> > leaderContended = masterContender.contend();
+  AWAIT_READY(leaderContended);
+
+  // Simulate a slave.
+  Owned<zookeeper::Group> group(
+        new Group(url.get(), MASTER_DETECTOR_ZK_SESSION_TIMEOUT));
+
+  ZooKeeperMasterDetector slaveDetector(group);
+
+  Future<Result<UPID> > detected = slaveDetector.detect();
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(master, detected.get());
+
+  detected = slaveDetector.detect(master);
+
+  // Now expire the slave's zk session.
+  Future<Option<int64_t> > session = group->session();
+  AWAIT_READY(session);
+
+  Future<Nothing> connected = FUTURE_DISPATCH(
+      group->process->self(),
+      &GroupProcess::connected);
+
+  server->expireSession(session.get().get());
+
+  // When connected is called, the leader has already expired and
+  // reconnected.
+  AWAIT_READY(connected);
+
+  // Still pending because there is no leader change.
+  EXPECT_TRUE(detected.isPending());
+}
+
+
+// Tests whether a slave correctly detects the new master when its
+// ZooKeeper session is expired and a new master is elected before the
+// slave reconnects with ZooKeeper.
+TEST_F(ZooKeeperMasterContenderDetectorTest,
+       MasterDetectorExpireSlaveZKSessionNewMaster)
+{
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+        "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  // Simulate a leading master.
+  Owned<zookeeper::Group> leaderGroup(
+      new Group(url.get(), MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+
+  // 1. Simulate a leading contender.
+  ZooKeeperMasterContender leaderContender(leaderGroup);
+  ZooKeeperMasterDetector leaderDetector(leaderGroup);
+
+  PID<Master> leader;
+  leader.ip = 10000000;
+  leader.port = 10000;
+
+  leaderContender.initialize(leader);
+
+  Future<Future<Nothing> > contended = leaderContender.contend();
+  AWAIT_READY(contended);
+
+  Future<Result<UPID> > detected = leaderDetector.detect(None());
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // 2. Simulate a non-leading contender.
+  Owned<zookeeper::Group> followerGroup(
+      new Group(url.get(), MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+  ZooKeeperMasterContender followerContender(followerGroup);
+  ZooKeeperMasterDetector followerDetector(followerGroup);
+
+  PID<Master> follower;
+  follower.ip = 10000001;
+  follower.port = 10001;
+
+  followerContender.initialize(follower);
+
+  contended = followerContender.contend();
+  AWAIT_READY(contended);
+
+  detected = followerDetector.detect(None());
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // 3. Simulate a non-contender.
+  Owned<zookeeper::Group> nonContenderGroup(
+      new Group(url.get(), MASTER_DETECTOR_ZK_SESSION_TIMEOUT));
+  ZooKeeperMasterDetector nonContenderDetector(nonContenderGroup);
+
+  detected = nonContenderDetector.detect();
+
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  detected = nonContenderDetector.detect(leader);
+
+  // Now expire the slave's and leading master's zk sessions.
+  // NOTE: Here we assume that slave stays disconnected from the ZK
+  // when the leading master loses its session.
+  Future<Option<int64_t> > slaveSession = nonContenderGroup->session();
+  AWAIT_READY(slaveSession);
+
+  Future<Option<int64_t> > masterSession = leaderGroup->session();
+  AWAIT_READY(masterSession);
+
+  server->expireSession(slaveSession.get().get());
+  server->expireSession(masterSession.get().get());
+
+  // Wait for session expiration and ensure a new master is detected.
+  AWAIT_READY(detected);
+
+  EXPECT_SOME_EQ(follower, detected.get());
+}
+
+#endif // MESOS_HAS_JAVA

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/tests/master_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_detector_tests.cpp b/src/tests/master_detector_tests.cpp
deleted file mode 100644
index 06c586d..0000000
--- a/src/tests/master_detector_tests.cpp
+++ /dev/null
@@ -1,628 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <zookeeper.h>
-
-#include <gmock/gmock.h>
-
-#include <fstream>
-#include <map>
-#include <string>
-#include <vector>
-
-#include <mesos/executor.hpp>
-#include <mesos/scheduler.hpp>
-
-#include <process/clock.hpp>
-#include <process/future.hpp>
-#include <process/pid.hpp>
-#include <process/protobuf.hpp>
-
-#include <stout/duration.hpp>
-#include <stout/gtest.hpp>
-#include <stout/nothing.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-#include <stout/try.hpp>
-
-#include "detector/detector.hpp"
-
-#include "master/master.hpp"
-
-#include "messages/messages.hpp"
-
-#include "slave/slave.hpp"
-
-#include "tests/isolator.hpp"
-#include "tests/mesos.hpp"
-#ifdef MESOS_HAS_JAVA
-#include "tests/zookeeper.hpp"
-#endif
-
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::tests;
-
-using mesos::internal::master::Master;
-
-using mesos::internal::slave::Slave;
-
-using process::Clock;
-using process::Future;
-using process::PID;
-using process::UPID;
-
-using std::map;
-using std::string;
-using std::vector;
-
-using testing::_;
-using testing::AtMost;
-using testing::Return;
-
-
-class MasterDetectorTest : public MesosTest {};
-
-
-TEST_F(MasterDetectorTest, File)
-{
-  Try<PID<Master> > master = StartMaster();
-  ASSERT_SOME(master);
-
-  Files files;
-  TestingIsolator isolator;
-
-  slave::Flags flags = CreateSlaveFlags();
-
-  Slave s(flags, true, &isolator, &files);
-  PID<Slave> slave = process::spawn(&s);
-
-  // Write "master" to a file and use the "file://" mechanism to
-  // create a master detector for the slave. Still requires a master
-  // detector for the master first.
-  BasicMasterDetector detector1(master.get(), vector<UPID>(), true);
-
-  const string& path = path::join(flags.work_dir, "master");
-  ASSERT_SOME(os::write(path, stringify(master.get())));
-
-  Try<MasterDetector*> detector =
-    MasterDetector::create("file://" + path, slave, false, true);
-
-  EXPECT_SOME(os::rm(path));
-
-  ASSERT_SOME(detector);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .Times(1);
-
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers));
-
-  driver.start();
-
-  AWAIT_READY(offers);
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-
-  process::terminate(slave);
-  process::wait(slave);
-}
-
-
-class MockMasterDetectorListenerProcess
-  : public ProtobufProcess<MockMasterDetectorListenerProcess>
-{
-public:
-  MockMasterDetectorListenerProcess() {}
-  virtual ~MockMasterDetectorListenerProcess() {}
-
-  MOCK_METHOD1(newMasterDetected, void(const process::UPID&));
-  MOCK_METHOD0(noMasterDetected, void(void));
-
-protected:
-  virtual void initialize()
-  {
-    install<NewMasterDetectedMessage>(
-        &MockMasterDetectorListenerProcess::newMasterDetected,
-        &NewMasterDetectedMessage::pid);
-
-    install<NoMasterDetectedMessage>(
-        &MockMasterDetectorListenerProcess::noMasterDetected);
-  }
-};
-
-
-#ifdef MESOS_HAS_JAVA
-class ZooKeeperMasterDetectorTest : public ZooKeeperTest {};
-
-
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetector)
-{
-  MockMasterDetectorListenerProcess mock;
-  process::spawn(mock);
-
-  Future<Nothing> newMasterDetected;
-  EXPECT_CALL(mock, newMasterDetected(mock.self()))
-    .WillOnce(FutureSatisfy(&newMasterDetected));
-
-  std::string master = "zk://" + server->connectString() + "/mesos";
-
-  Try<MasterDetector*> detector =
-    MasterDetector::create(master, mock.self(), true, true);
-
-  ASSERT_SOME(detector);
-
-  AWAIT_READY(newMasterDetected);
-
-  MasterDetector::destroy(detector.get());
-
-  process::terminate(mock);
-  process::wait(mock);
-}
-
-
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetectors)
-{
-  MockMasterDetectorListenerProcess mock1;
-  process::spawn(mock1);
-
-  Future<Nothing> newMasterDetected1;
-  EXPECT_CALL(mock1, newMasterDetected(mock1.self()))
-    .WillOnce(FutureSatisfy(&newMasterDetected1));
-
-  std::string master = "zk://" + server->connectString() + "/mesos";
-
-  Try<MasterDetector*> detector1 =
-    MasterDetector::create(master, mock1.self(), true, true);
-
-  ASSERT_SOME(detector1);
-
-  AWAIT_READY(newMasterDetected1);
-
-  MockMasterDetectorListenerProcess mock2;
-  process::spawn(mock2);
-
-  Future<Nothing> newMasterDetected2;
-  EXPECT_CALL(mock2, newMasterDetected(mock1.self())) // N.B. mock1
-    .WillOnce(FutureSatisfy(&newMasterDetected2));
-
-  Try<MasterDetector*> detector2 =
-    MasterDetector::create(master, mock2.self(), true, true);
-
-  ASSERT_SOME(detector2);
-
-  AWAIT_READY(newMasterDetected2);
-
-  // Destroying detector1 (below) might cause another election so we
-  // need to set up expectations appropriately.
-  EXPECT_CALL(mock2, newMasterDetected(_))
-    .WillRepeatedly(Return());
-
-  MasterDetector::destroy(detector1.get());
-
-  process::terminate(mock1);
-  process::wait(mock1);
-
-  MasterDetector::destroy(detector2.get());
-
-  process::terminate(mock2);
-  process::wait(mock2);
-}
-
-
-// Disabled due to MESOS-455.
-TEST_F(ZooKeeperMasterDetectorTest, DISABLED_MasterDetectorShutdownNetwork)
-{
-  Clock::pause();
-
-  MockMasterDetectorListenerProcess mock;
-  process::spawn(mock);
-
-  Future<Nothing> newMasterDetected1;
-  EXPECT_CALL(mock, newMasterDetected(mock.self()))
-    .WillOnce(FutureSatisfy(&newMasterDetected1));
-
-  std::string master = "zk://" + server->connectString() + "/mesos";
-
-  Try<MasterDetector*> detector =
-    MasterDetector::create(master, mock.self(), true, true);
-
-  ASSERT_SOME(detector);
-
-  AWAIT_READY(newMasterDetected1);
-
-  Future<Nothing> noMasterDetected;
-  EXPECT_CALL(mock, noMasterDetected())
-    .WillOnce(FutureSatisfy(&noMasterDetected));
-
-  server->shutdownNetwork();
-
-  Clock::advance(Seconds(10)); // TODO(benh): Get session timeout from detector.
-
-  AWAIT_READY(noMasterDetected);
-
-  Future<Nothing> newMasterDetected2;
-  EXPECT_CALL(mock, newMasterDetected(mock.self()))
-    .WillOnce(FutureSatisfy(&newMasterDetected2));
-
-  server->startNetwork();
-
-  AWAIT_READY(newMasterDetected2);
-
-  MasterDetector::destroy(detector.get());
-
-  process::terminate(mock);
-  process::wait(mock);
-
-  Clock::resume();
-}
-
-
-// Tests that a detector sends a NoMasterDetectedMessage when we
-// reach our ZooKeeper session timeout. This is to enforce that we
-// manually expire the session when we don't get reconnected within
-// the ZOOKEEPER_SESSION_TIMEOUT.
-TEST_F(ZooKeeperTest, MasterDetectorTimedoutSession)
-{
-  Try<zookeeper::URL> url =
-    zookeeper::URL::parse("zk://" + server->connectString() + "/mesos");
-  ASSERT_SOME(url);
-
-  // First we bring up three master detector listeners:
-  //   1. A leading contender.
-  //   2. A non-leading contender.
-  //   3. A non-contender.
-
-  // 1. Simulate a leading contender.
-  MockMasterDetectorListenerProcess leader;
-
-  Future<Nothing> newMasterDetected;
-  EXPECT_CALL(leader, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected));
-
-  process::spawn(leader);
-
-  ZooKeeperMasterDetector leaderDetector(
-      url.get(), leader.self(), true, true);
-
-  AWAIT_READY(newMasterDetected);
-
-  // 2. Simulate a non-leading contender.
-  MockMasterDetectorListenerProcess follower;
-
-  EXPECT_CALL(follower, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected));
-
-  process::spawn(follower);
-
-  ZooKeeperMasterDetector followerDetector(
-      url.get(), follower.self(), true, true);
-
-  AWAIT_READY(newMasterDetected);
-
-  // 3. Simulate a non-contender.
-  MockMasterDetectorListenerProcess nonContender;
-
-  EXPECT_CALL(nonContender, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected));
-
-  process::spawn(nonContender);
-
-  ZooKeeperMasterDetector nonContenderDetector(
-      url.get(), nonContender.self(), false, true);
-
-  AWAIT_READY(newMasterDetected);
-
-  // Now we want to induce lost connections on each of the
-  // master detectors.
-  // Induce a reconnection on the leader.
-  Future<Nothing> leaderReconnecting = FUTURE_DISPATCH(
-      leaderDetector.process->self(),
-      &ZooKeeperMasterDetectorProcess::reconnecting);
-
-  dispatch(leaderDetector.process,
-           &ZooKeeperMasterDetectorProcess::reconnecting);
-
-  AWAIT_READY(leaderReconnecting);
-
-  // Induce a reconnection on the follower.
-  Future<Nothing> followerReconnecting = FUTURE_DISPATCH(
-      followerDetector.process->self(),
-      &ZooKeeperMasterDetectorProcess::reconnecting);
-
-  dispatch(followerDetector.process,
-           &ZooKeeperMasterDetectorProcess::reconnecting);
-
-  AWAIT_READY(followerReconnecting);
-
-  // Induce a reconnection on the non-contender.
-  Future<Nothing> nonContenderReconnecting = FUTURE_DISPATCH(
-      nonContenderDetector.process->self(),
-      &ZooKeeperMasterDetectorProcess::reconnecting);
-
-  dispatch(nonContenderDetector.process,
-           &ZooKeeperMasterDetectorProcess::reconnecting);
-
-  AWAIT_READY(nonContenderReconnecting);
-
-  // Now induce the reconnection timeout.
-  Future<Nothing> leaderNoMasterDetected;
-  EXPECT_CALL(leader, noMasterDetected())
-    .WillOnce(FutureSatisfy(&leaderNoMasterDetected));
-
-  Future<Nothing> followerNoMasterDetected;
-  EXPECT_CALL(follower, noMasterDetected())
-    .WillOnce(FutureSatisfy(&followerNoMasterDetected));
-
-  Future<Nothing> nonContenderNoMasterDetected;
-  EXPECT_CALL(nonContender, noMasterDetected())
-    .WillOnce(FutureSatisfy(&nonContenderNoMasterDetected));
-
-  Clock::pause();
-  Clock::advance(ZOOKEEPER_SESSION_TIMEOUT);
-  Clock::settle();
-
-  AWAIT_READY(leaderNoMasterDetected);
-  AWAIT_READY(followerNoMasterDetected);
-  AWAIT_READY(nonContenderNoMasterDetected);
-
-  process::terminate(leader);
-  process::wait(leader);
-
-  process::terminate(follower);
-  process::wait(follower);
-
-  process::terminate(nonContender);
-  process::wait(nonContender);
-}
-
-
-// Tests whether a leading master correctly detects a new master when
-// its ZooKeeper session is expired.
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetectorExpireMasterZKSession)
-{
-  // Simulate a leading master.
-  MockMasterDetectorListenerProcess leader;
-
-  Future<Nothing> newMasterDetected1, newMasterDetected2;
-  EXPECT_CALL(leader, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected1))
-    .WillOnce(FutureSatisfy(&newMasterDetected2));
-
-  EXPECT_CALL(leader, noMasterDetected())
-    .Times(0);
-
-  process::spawn(leader);
-
-  std::string znode = "zk://" + server->connectString() + "/mesos";
-
-  Try<zookeeper::URL> url = zookeeper::URL::parse(znode);
-  ASSERT_SOME(url);
-
-  // Leader's detector.
-  ZooKeeperMasterDetector leaderDetector(
-      url.get(), leader.self(), true, true);
-
-  AWAIT_READY(newMasterDetected1);
-
-  // Simulate a following master.
-  MockMasterDetectorListenerProcess follower;
-
-  Future<Nothing> newMasterDetected3;
-  EXPECT_CALL(follower, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected3))
-    .WillRepeatedly(Return());
-
-  EXPECT_CALL(follower, noMasterDetected())
-    .Times(0);
-
-  process::spawn(follower);
-
-  // Follower's detector.
-  ZooKeeperMasterDetector followerDetector(
-      url.get(),
-      follower.self(),
-      true,
-      true);
-
-  AWAIT_READY(newMasterDetected3);
-
-  // Now expire the leader's zk session.
-  Future<int64_t> session = leaderDetector.session();
-  AWAIT_READY(session);
-
-  server->expireSession(session.get());
-
-  // Wait for session expiration and ensure we receive a
-  // NewMasterDetected message.
-  AWAIT_READY(newMasterDetected2);
-
-  process::terminate(follower);
-  process::wait(follower);
-
-  process::terminate(leader);
-  process::wait(leader);
-}
-
-
-// Tests whether a slave correctly DOES NOT disconnect from the master
-// when its ZooKeeper session is expired, but the master still stays
-// the leader when the slave re-connects with the ZooKeeper.
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetectorExpireSlaveZKSession)
-{
-  // Simulate a leading master.
-  MockMasterDetectorListenerProcess master;
-
-  Future<Nothing> newMasterDetected1;
-  EXPECT_CALL(master, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected1));
-
-  EXPECT_CALL(master, noMasterDetected())
-    .Times(0);
-
-  process::spawn(master);
-
-  std::string znode = "zk://" + server->connectString() + "/mesos";
-
-  Try<zookeeper::URL> url = zookeeper::URL::parse(znode);
-  ASSERT_SOME(url);
-
-  // Leading master's detector.
-  ZooKeeperMasterDetector masterDetector(
-      url.get(), master.self(), true, true);
-
-  AWAIT_READY(newMasterDetected1);
-
-  // Simulate a slave.
-  MockMasterDetectorListenerProcess slave;
-
-  Future<Nothing> newMasterDetected2, newMasterDetected3;
-  EXPECT_CALL(slave, newMasterDetected(_))
-    .Times(1)
-    .WillOnce(FutureSatisfy(&newMasterDetected2));
-
-  EXPECT_CALL(slave, noMasterDetected())
-    .Times(0);
-
-  process::spawn(slave);
-
-  // Slave's master detector.
-  ZooKeeperMasterDetector slaveDetector(
-      url.get(), slave.self(), false, true);
-
-  AWAIT_READY(newMasterDetected2);
-
-  // Now expire the slave's zk session.
-  Future<int64_t> session = slaveDetector.session();
-  AWAIT_READY(session);
-
-  server->expireSession(session.get());
-
-  // Wait for enough time to ensure no NewMasterDetected message is sent.
-  os::sleep(Seconds(4)); // ZooKeeper needs extra time for session expiration.
-
-  process::terminate(slave);
-  process::wait(slave);
-
-  process::terminate(master);
-  process::wait(master);
-}
-
-
-// Tests whether a slave correctly detects the new master when its
-// ZooKeeper session is expired and a new master is elected before the
-// slave reconnects with ZooKeeper.
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetectorExpireSlaveZKSessionNewMaster)
-{
-  // Simulate a leading master.
-  MockMasterDetectorListenerProcess master1;
-
-  Future<Nothing> newMasterDetected1;
-  EXPECT_CALL(master1, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected1))
-    .WillRepeatedly(Return());
-
-  EXPECT_CALL(master1, noMasterDetected())
-    .Times(0);
-
-  process::spawn(master1);
-
-  std::string znode = "zk://" + server->connectString() + "/mesos";
-
-  Try<zookeeper::URL> url = zookeeper::URL::parse(znode);
-  ASSERT_SOME(url);
-
-  // Leading master's detector.
-  ZooKeeperMasterDetector masterDetector1(
-      url.get(), master1.self(), true, true);
-
-  AWAIT_READY(newMasterDetected1);
-
-  // Simulate a non-leading master.
-  MockMasterDetectorListenerProcess master2;
-
-  Future<Nothing> newMasterDetected2;
-  EXPECT_CALL(master2, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected2))
-    .WillRepeatedly(Return());
-
-  EXPECT_CALL(master2, noMasterDetected())
-    .Times(0);
-
-  process::spawn(master2);
-
-  // Non-leading master's detector.
-  ZooKeeperMasterDetector masterDetector2(
-      url.get(), master2.self(), true, true);
-
-  AWAIT_READY(newMasterDetected2);
-
-  // Simulate a slave.
-  MockMasterDetectorListenerProcess slave;
-
-  Future<Nothing> newMasterDetected3, newMasterDetected4;
-  EXPECT_CALL(slave, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected3))
-    .WillOnce(FutureSatisfy(&newMasterDetected4));
-
-  EXPECT_CALL(slave, noMasterDetected())
-    .Times(AtMost(1));
-
-  process::spawn(slave);
-
-  // Slave's master detector.
-  ZooKeeperMasterDetector slaveDetector(
-      url.get(), slave.self(), false, true);
-
-  AWAIT_READY(newMasterDetected3);
-
-  // Now expire the slave's and leading master's zk sessions.
-  // NOTE: Here we assume that slave stays disconnected from the ZK when the
-  // leading master loses its session.
-  Future<int64_t> slaveSession = slaveDetector.session();
-  AWAIT_READY(slaveSession);
-
-  server->expireSession(slaveSession.get());
-
-  Future<int64_t> masterSession = masterDetector1.session();
-  AWAIT_READY(masterSession);
-
-  server->expireSession(masterSession.get());
-
-  // Wait for session expiration and ensure we receive a
-  // NewMasterDetected message.
-  AWAIT_READY_FOR(newMasterDetected4, Seconds(10));
-
-  process::terminate(slave);
-  process::wait(slave);
-
-  process::terminate(master2);
-  process::wait(master2);
-
-  process::terminate(master1);
-  process::wait(master1);
-}
-#endif // MESOS_HAS_JAVA


[8/8] git commit: Replaced usage of old detector with new Master contender and detector abstractions.

Posted by vi...@apache.org.
Replaced usage of old detector with new Master contender and detector
abstractions.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/15510


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f9d1dd81
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f9d1dd81
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f9d1dd81

Branch: refs/heads/master
Commit: f9d1dd819b6cc3843e4d1287ac10276d62cbfed4
Parents: bcd1dc4
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Nov 19 10:39:27 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Nov 19 10:57:19 2013 -0800

----------------------------------------------------------------------
 include/mesos/scheduler.hpp         |  12 +-
 src/Makefile.am                     |   3 +-
 src/cli/resolve.cpp                 |  27 +-
 src/detector/detector.cpp           | 580 -------------------------------
 src/detector/detector.hpp           | 216 ------------
 src/local/local.cpp                 |  22 +-
 src/local/main.cpp                  |   2 -
 src/master/http.cpp                 |   9 +-
 src/master/main.cpp                 |  35 +-
 src/master/master.cpp               | 128 ++++---
 src/master/master.hpp               |  31 +-
 src/messages/messages.proto         |  11 -
 src/sched/sched.cpp                 | 236 +++++++------
 src/slave/http.cpp                  |   4 +-
 src/slave/main.cpp                  |  15 +-
 src/slave/slave.cpp                 | 158 ++++-----
 src/slave/slave.hpp                 |  22 +-
 src/tests/allocator_tests.cpp       |   3 +-
 src/tests/authentication_tests.cpp  |  53 +--
 src/tests/cluster.hpp               | 113 ++++--
 src/tests/fault_tolerance_tests.cpp | 123 ++++---
 src/tests/gc_tests.cpp              |   3 +-
 src/tests/isolator_tests.cpp        |   3 +-
 src/tests/master_tests.cpp          |  27 +-
 src/tests/mesos.cpp                 |  41 +++
 src/tests/mesos.hpp                 |  59 ++++
 src/tests/slave_recovery_tests.cpp  |  16 +-
 27 files changed, 686 insertions(+), 1266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 380e087..161cc65 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -40,6 +40,7 @@ namespace mesos {
 class SchedulerDriver;
 
 namespace internal {
+class MasterDetector;
 class SchedulerProcess;
 }
 
@@ -386,9 +387,12 @@ private:
   FrameworkInfo framework;
   std::string master;
 
-  // Libprocess process for communicating with master.
+  // Used for communicating with the master.
   internal::SchedulerProcess* process;
 
+  // URL for the master (e.g., zk://, file://, etc).
+  std::string url;
+
   // Mutex to enforce all non-callbacks are execute serially.
   pthread_mutex_t mutex;
 
@@ -397,6 +401,12 @@ private:
 
   // Current status of the driver.
   Status status;
+
+  const Credential* credential;
+
+protected:
+  // Used to detect (i.e., choose) the master.
+  internal::MasterDetector* detector;
 };
 
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 969aead..feda34b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -183,7 +183,6 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	launcher/launcher.cpp						\
 	exec/exec.cpp							\
 	common/lock.cpp							\
-	detector/detector.cpp						\
 	common/date_utils.cpp						\
 	common/resources.cpp						\
 	common/attributes.cpp						\
@@ -220,7 +219,7 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp		\
 	common/protobuf_utils.hpp					\
 	common/lock.hpp							\
 	common/type_utils.hpp common/thread.hpp common/units.hpp	\
-	detector/detector.hpp examples/utils.hpp files/files.hpp	\
+	examples/utils.hpp files/files.hpp				\
 	hdfs/hdfs.hpp							\
 	launcher/launcher.hpp linux/cgroups.hpp				\
 	linux/fs.hpp local/flags.hpp local/local.hpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/cli/resolve.cpp
----------------------------------------------------------------------
diff --git a/src/cli/resolve.cpp b/src/cli/resolve.cpp
index b05f510..dddadfc 100644
--- a/src/cli/resolve.cpp
+++ b/src/cli/resolve.cpp
@@ -29,7 +29,7 @@
 #include <stout/strings.hpp>
 #include <stout/try.hpp>
 
-#include "detector/detector.hpp"
+#include "master/detector.hpp"
 
 using namespace mesos;
 using namespace mesos::internal;
@@ -117,19 +117,32 @@ int main(int argc, char** argv)
     return -1;
   }
 
-  Future<UPID> pid = mesos::internal::detect(master.get(), !verbose);
+  Try<MasterDetector*> detector = MasterDetector::create(master.get());
+
+  if (detector.isError()) {
+    cerr << "Failed to create a master detector: " << detector.error() << endl;
+    return -1;
+  }
+
+  Future<Result<UPID> > pid = detector.get()->detect();
 
   if (!pid.await(timeout)) {
     cerr << "Failed to detect master from '" << master.get()
          << "' within " << timeout << endl;
     return -1;
-  } else if (pid.isFailed()) {
-    cerr << "Failed to detect master from '" << master.get()
-         << "': " << pid.failure() << endl;
-    return -1;
+  } else {
+    // Not expecting detect() to fail or discard the future.
+    CHECK(pid.isReady());
+    if (pid.get().isError()) {
+      cerr << "Failed to detect master from '" << master.get()
+           << "': " << pid.failure() << endl;
+      return -1;
+    }
   }
 
-  cout << string(pid.get()).substr(7) << endl;
+  // The future is not satisfied unless the result is Some.
+  CHECK_SOME(pid.get());
+  cout << strings::remove(pid.get().get(), "master@") << endl;
 
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/detector/detector.cpp
----------------------------------------------------------------------
diff --git a/src/detector/detector.cpp b/src/detector/detector.cpp
deleted file mode 100644
index 8d9f118..0000000
--- a/src/detector/detector.cpp
+++ /dev/null
@@ -1,580 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <glog/logging.h>
-
-#include <fstream>
-#include <ios>
-#include <vector>
-
-#include <tr1/memory> // TODO(benh): Replace shared_ptr with unique_ptr.
-
-#include <process/delay.hpp>
-#include <process/dispatch.hpp>
-#include <process/future.hpp>
-#include <process/process.hpp>
-#include <process/protobuf.hpp>
-#include <process/timer.hpp>
-
-#include <stout/error.hpp>
-#include <stout/foreach.hpp>
-#include <stout/lambda.hpp>
-#include <stout/none.hpp>
-#include <stout/numify.hpp>
-#include <stout/option.hpp>
-#include <stout/try.hpp>
-
-#include "detector/detector.hpp"
-
-#include "logging/logging.hpp"
-
-#include "messages/messages.hpp"
-
-#include "zookeeper/authentication.hpp"
-#include "zookeeper/url.hpp"
-#include "zookeeper/watcher.hpp"
-#include "zookeeper/zookeeper.hpp"
-
-using process::Future;
-using process::Process;
-using process::Promise;
-using process::Timer;
-using process::UPID;
-using process::wait; // Necessary on some OS's to disambiguate.
-
-using std::pair;
-using std::string;
-using std::vector;
-
-namespace mesos {
-namespace internal {
-
-
-const Duration ZOOKEEPER_SESSION_TIMEOUT = Seconds(10);
-
-
-MasterDetector::~MasterDetector() {}
-
-
-Try<MasterDetector*> MasterDetector::create(
-    const string& master,
-    const UPID& pid,
-    bool contend,
-    bool quiet)
-{
-  if (master == "") {
-    if (contend) {
-      return new BasicMasterDetector(pid);
-    } else {
-      return Error("Cannot detect master");
-    }
-  } else if (master.find("zk://") == 0) {
-    Try<zookeeper::URL> url = zookeeper::URL::parse(master);
-    if (url.isError()) {
-      return Error(url.error());
-    }
-    if (url.get().path == "/") {
-      return Error(
-          "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
-    }
-    return new ZooKeeperMasterDetector(url.get(), pid, contend, quiet);
-  } else if (master.find("file://") == 0) {
-    const std::string& path = master.substr(7);
-    std::ifstream file(path.c_str());
-    if (!file.is_open()) {
-      return Error("Failed to open file at '" + path + "'");
-    }
-
-    std::string line;
-    getline(file, line);
-
-    if (!file) {
-      file.close();
-      return Error("Failed to read from file at '" + path + "'");
-    }
-
-    file.close();
-
-    return create(line, pid, contend, quiet);
-  }
-
-  // Okay, try and parse what we got as a PID.
-  process::UPID masterPid = master.find("master@") == 0
-    ? process::UPID(master)
-    : process::UPID("master@" + master);
-
-  if (!masterPid) {
-    return Error("Cannot parse '" + std::string(masterPid) + "'");
-  }
-
-  return new BasicMasterDetector(masterPid, pid);
-}
-
-
-void MasterDetector::destroy(MasterDetector *detector)
-{
-  if (detector != NULL)
-    delete detector;
-}
-
-
-BasicMasterDetector::BasicMasterDetector(const UPID& _master)
-  : master(_master)
-{
-  // Elect the master.
-  NewMasterDetectedMessage message;
-  message.set_pid(master);
-  process::post(master, message);
-}
-
-
-BasicMasterDetector::BasicMasterDetector(
-    const UPID& _master,
-    const UPID& pid,
-    bool elect)
-  : master(_master)
-{
-  if (elect) {
-    // Elect the master.
-    NewMasterDetectedMessage message;
-    message.set_pid(master);
-    process::post(master, message);
-  }
-
-  // Tell the pid about the master.
-  NewMasterDetectedMessage message;
-  message.set_pid(master);
-  process::post(pid, message);
-}
-
-
-BasicMasterDetector::BasicMasterDetector(
-    const UPID& _master,
-    const vector<UPID>& pids,
-    bool elect)
-  : master(_master)
-{
-  if (elect) {
-    // Elect the master.
-    NewMasterDetectedMessage message;
-    message.set_pid(master);
-    process::post(master, message);
-  }
-
-  // Tell each pid about the master.
-  foreach (const UPID& pid, pids) {
-    NewMasterDetectedMessage message;
-    message.set_pid(master);
-    process::post(pid, message);
-  }
-}
-
-
-BasicMasterDetector::~BasicMasterDetector() {}
-
-
-ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
-    const zookeeper::URL& _url,
-    const UPID& _pid,
-    bool _contend,
-    bool quiet)
-  : url(_url),
-    acl(url.authentication.isSome()
-        ? zookeeper::EVERYONE_READ_CREATOR_ALL
-        : ZOO_OPEN_ACL_UNSAFE),
-    pid(_pid),
-    contend(_contend),
-    watcher(NULL),
-    zk(NULL),
-    expire(false),
-    timer(),
-    currentMasterSeq(),
-    currentMasterPID()
-{
-  // Set verbosity level for underlying ZooKeeper library logging.
-  // TODO(benh): Put this in the C++ API.
-  zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
-}
-
-
-ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
-{
-  delete zk;
-  delete watcher;
-}
-
-
-void ZooKeeperMasterDetectorProcess::initialize()
-{
-  // Doing initialization here allows to avoid the race between
-  // instantiating the ZooKeeper instance and being spawned ourself.
-  watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
-  zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
-}
-
-
-int64_t ZooKeeperMasterDetectorProcess::session()
-{
-  CHECK_NOTNULL(zk);
-  return zk->getSessionId();
-}
-
-
-void ZooKeeperMasterDetectorProcess::connected(bool reconnect)
-{
-  if (!reconnect) {
-    LOG(INFO) << "Master detector (" << pid << ") connected to ZooKeeper ...";
-
-    if (url.authentication.isSome()) {
-      const std::string& scheme = url.authentication.get().scheme;
-      const std::string& credentials = url.authentication.get().credentials;
-      LOG(INFO) << "Authenticating to ZooKeeper using scheme '" << scheme << "'";
-      int code = zk->authenticate(scheme, credentials);
-      if (code != ZOK) {
-        LOG(FATAL) << "Failed to authenticate with ZooKeeper: "
-                   << zk->message(code);
-      }
-    }
-
-    // Assume the path (chroot) being used does not end with a "/".
-    CHECK(url.path.at(url.path.length() - 1) != '/');
-
-    // Create znode path (including intermediate znodes) as necessary.
-    LOG(INFO) << "Trying to create path '" << url.path << "' in ZooKeeper";
-
-    int code = zk->create(url.path, "", acl, 0, NULL, true);
-
-    // We fail all non-OK return codes except ZNODEEXISTS (since that
-    // means the path we were trying to create exists) and ZNOAUTH
-    // (since it's possible that the ACLs on 'dirname(url.path)' don't
-    // allow us to create a child znode but we are allowed to create
-    // children of 'url.path' itself, which will be determined below
-    // if we are contending). Note that it's also possible we got back
-    // a ZNONODE because we could not create one of the intermediate
-    // znodes (in which case we'll abort in the 'else' below since
-    // ZNONODE is non-retryable). TODO(benh): Need to check that we
-    // also can put a watch on the children of 'url.path'.
-    if (code != ZOK && code != ZNODEEXISTS && code != ZNOAUTH) {
-      LOG(FATAL) << "Failed to create '" << url.path
-                 << "' in ZooKeeper: " << zk->message(code);
-    }
-
-    if (contend) {
-      // We contend with the pid given in constructor.
-      string result;
-      int code = zk->create(url.path + "/", pid, acl,
-                            ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
-
-      if (code != ZOK) {
-        LOG(FATAL) << "Unable to create ephemeral child of '" << url.path
-                   << "' in ZooKeeper: %s" << zk->message(code);
-      }
-
-      LOG(INFO) << "Created ephemeral/sequence znode at '" << result << "'";
-    }
-
-    // Now determine who the master is (it may be us).
-    detectMaster();
-  } else {
-    LOG(INFO) << "Master detector (" << pid << ")  reconnected ...";
-
-    // Cancel and cleanup the reconnect timer (if necessary).
-    if (timer.isSome()) {
-      Timer::cancel(timer.get());
-      timer = None();
-    }
-
-    // If we decided to expire the session, make sure we delete the
-    // ZooKeeper instance so the session actually expires. We also
-    // create a new ZooKeeper instance for clients that want to
-    // continue detecting and/or contending (which is likely given
-    // that this code is getting executed).
-    if (expire) {
-      LOG(WARNING) << "Cleaning up after expired ZooKeeper session";
-
-      delete CHECK_NOTNULL(zk);
-      delete CHECK_NOTNULL(watcher);
-
-      watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
-      zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
-
-      expire = false;
-      return;
-    }
-
-    // We've reconnected and we didn't prematurely expire the session,
-    // but the master might have changed, so we should run an
-    // election. TODO(benh): Determine if this is really necessary or
-    // if the watch set via 'ZooKeeper::getChildren' in 'detectMaster'
-    // is sufficient (it should be).
-    detectMaster();
-  }
-}
-
-
-void ZooKeeperMasterDetectorProcess::reconnecting()
-{
-  LOG(INFO) << "Master detector (" << pid << ")  lost connection to ZooKeeper, "
-            << "attempting to reconnect ...";
-
-  // ZooKeeper won't tell us of a session expiration until we
-  // reconnect, which could occur much much later than the session was
-  // actually expired. This can lead to a prolonged split-brain
-  // scenario when network partitions occur. Rather than wait for a
-  // reconnection to occur (i.e., a network partition to be repaired)
-  // we create a local timer and "expire" our session prematurely if
-  // we haven't reconnected within the session expiration time
-  // out. Later, when we eventually do reconnect we can force the
-  // session to be expired if we decided locally to expire.
-  timer = process::delay(
-      ZOOKEEPER_SESSION_TIMEOUT, self(), &Self::timedout, zk->getSessionId());
-}
-
-
-void ZooKeeperMasterDetectorProcess::expired()
-{
-  LOG(WARNING) << "Master detector (" << pid << ")  ZooKeeper session expired!";
-
-  // Cancel and cleanup the reconnect timer (if necessary).
-  if (timer.isSome()) {
-    Timer::cancel(timer.get());
-    timer = None();
-  }
-
-  delete CHECK_NOTNULL(zk);
-  delete CHECK_NOTNULL(watcher);
-
-  watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
-  zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
-
-  expire = false;
-}
-
-
-void ZooKeeperMasterDetectorProcess::updated(const string& path)
-{
-  // A new master might have showed up and created a sequence
-  // identifier or a master may have died, determine who the master is now!
-  detectMaster();
-}
-
-
-void ZooKeeperMasterDetectorProcess::created(const string& path)
-{
-  LOG(FATAL) << "Unexpected ZooKeeper event (created) for '" << path << "'";
-}
-
-
-void ZooKeeperMasterDetectorProcess::deleted(const string& path)
-{
-  LOG(FATAL) << "Unexpected ZooKeeper event (deleted) for '" << path << "'";
-}
-
-
-void ZooKeeperMasterDetectorProcess::timedout(const int64_t& sessionId)
-{
-  CHECK_NOTNULL(zk);
-  if (timer.isSome() && zk->getSessionId() == sessionId) {
-    LOG(WARNING) << "Timed out waiting to reconnect to ZooKeeper "
-                 << "(sessionId=" << std::hex << sessionId << ")";
-    timer = None();
-    expire = true;
-
-    // TODO(bmahler): We always want to clear the sequence number
-    // prior to sending NoMasterDetectedMessage. It might be prudent
-    // to use a helper function to enforce this.
-    currentMasterSeq = "";  // Clear the master sequence number.
-    process::post(pid, NoMasterDetectedMessage());
-  }
-}
-
-
-void ZooKeeperMasterDetectorProcess::detectMaster()
-{
-  vector<string> results;
-
-  int code = zk->getChildren(url.path, true, &results);
-
-  if (code != ZOK) {
-    if (zk->retryable(code)) {
-      // NOTE: We don't expect a ZNONODE here because 'url.path' is always
-      // created in the connected() call. Despite that, we don't do a
-      // CHECK (code != ZNONODE) just to be safe in case the zk client library
-      // does return the code unexpectedly.
-      LOG(ERROR) << "Master detector (" << pid << ")  failed to get masters: "
-                 << zk->message(code);
-      return; // Try again when we reconnect.
-    } else {
-      LOG(FATAL) << "Non-retryable ZooKeeper error while getting masters: "
-                 << zk->message(code);
-    }
-  } else {
-    LOG(INFO) << "Master detector (" << pid << ")  found " << results.size()
-              << " registered masters";
-  }
-
-  string masterSeq;
-  long min = LONG_MAX;
-  foreach (const string& result, results) {
-    Try<int> i = numify<int>(result);
-    if (i.isError()) {
-      LOG(WARNING) << "Unexpected znode at '" << url.path
-                   << "': " << i.error();
-      continue;
-    }
-    if (i.get() < min) {
-      min = i.get();
-      masterSeq = result;
-    }
-  }
-
-  // No master present (lost or possibly hasn't come up yet).
-  if (masterSeq.empty()) {
-    LOG(INFO) << "Master detector (" << pid << ") couldn't find any masters";
-    currentMasterSeq = "";  // Clear the master sequence number.
-    process::post(pid, NoMasterDetectedMessage());
-  } else if (masterSeq != currentMasterSeq) {
-    // Okay, let's fetch the master pid from ZooKeeper.
-    string result;
-    code = zk->get(url.path + "/" + masterSeq, false, &result, NULL);
-
-    if (code != ZOK) {
-      // This is possible because the master might have failed since
-      // the invocation of ZooKeeper::getChildren above.
-      // It is fine to not send a NoMasterDetectedMessage here because,
-      // 1) If this is due to a connection loss or session expiration,
-      //    connected() or expired() will be called and the leader detection
-      //    code (detectMaster()) will be re-tried.
-      // 2) If this is due to no masters present (i.e., code == ZNONODE),
-      //    updated() will be called and the detectMaster() will be re-tried.
-      if (zk->retryable(code) || code == ZNONODE) {
-        LOG(ERROR) << "Master detector failed to fetch new master pid: "
-                   << zk->message(code);
-      } else {
-        LOG(FATAL) << "Non-retryable ZooKeeper error while fetching "
-                   << "new master pid: " << zk->message(code);
-      }
-    } else {
-      // Now let's parse what we fetched from ZooKeeper.
-      LOG(INFO) << "Master detector (" << pid << ")  got new master pid: "
-                << result;
-
-      UPID masterPid = result;
-
-      if (masterPid == UPID()) {
-        // TODO(benh): Maybe we should try again then!?!? Parsing
-        // might have failed because of DNS, and whoever is using the
-        // detector might sit "unconnected" indefinitely!
-        LOG(ERROR) << "Failed to parse new master pid!";
-        currentMasterSeq = "";  // Clear the master sequence number.
-        process::post(pid, NoMasterDetectedMessage());
-      } else {
-        currentMasterSeq = masterSeq;
-        currentMasterPID = masterPid;
-
-        NewMasterDetectedMessage message;
-        message.set_pid(currentMasterPID);
-        process::post(pid, message);
-      }
-    }
-  }
-}
-
-
-ZooKeeperMasterDetector::ZooKeeperMasterDetector(
-    const zookeeper::URL& url,
-    const UPID& pid,
-    bool contend,
-    bool quiet)
-{
-  process = new ZooKeeperMasterDetectorProcess(url, pid, contend, quiet);
-  spawn(process);
-}
-
-
-ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
-{
-  terminate(process);
-  wait(process);
-  delete process;
-}
-
-
-Future<int64_t> ZooKeeperMasterDetector::session()
-{
-  return dispatch(process, &ZooKeeperMasterDetectorProcess::session);
-}
-
-
-// A simple "listener" for doing one-time detections to support the
-// 'detect' function (see below). Note that this can't be
-// declared/defined inside of the 'detect' function because until
-// C++11 we can't have template arguments be local types (which
-// 'Listener' would be).
-class Listener : public ProtobufProcess<Listener>
-{
-public:
-  Future<UPID> future() { return promise.future(); }
-
-protected:
-  virtual void initialize()
-  {
-    // Stop listening if no one cares.
-    void(*terminate)(const UPID&, bool) = process::terminate;
-    promise.future().onDiscarded(lambda::bind(terminate, self(), true));
-
-    install<NewMasterDetectedMessage>(
-        &Listener::newMasterDetected,
-        &NewMasterDetectedMessage::pid);
-  }
-
-  void newMasterDetected(const UPID& pid)
-  {
-    promise.set(pid);
-    process::terminate(self());
-  }
-
-private:
-  Promise<UPID> promise;
-};
-
-
-Future<UPID> detect(const string& master, bool quiet)
-{
-  Listener* listener = new Listener();
-
-  // Save the future before we spawn.
-  Future<UPID> future = listener->future();
-
-  process::spawn(listener, true); // Let the GC clean up the Listener.
-
-  Try<MasterDetector*> detector =
-    MasterDetector::create(master, listener->self(), false, quiet);
-
-  if (detector.isError()) {
-    process::terminate(listener);
-    return Future<UPID>::failed(
-        "Failed to create a master detector: " + detector.error());
-  }
-
-  return future;
-}
-
-
-} // namespace internal {
-} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/detector/detector.hpp
----------------------------------------------------------------------
diff --git a/src/detector/detector.hpp b/src/detector/detector.hpp
deleted file mode 100644
index 3aaebfe..0000000
--- a/src/detector/detector.hpp
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __MASTER_DETECTOR_HPP__
-#define __MASTER_DETECTOR_HPP__
-
-#include <string>
-#include <iostream>
-#include <unistd.h>
-#include <climits>
-#include <cstdlib>
-
-#include <process/future.hpp>
-#include <process/pid.hpp>
-#include <process/process.hpp>
-#include <process/timer.hpp>
-
-#include <stout/try.hpp>
-
-#include "zookeeper/authentication.hpp"
-#include "zookeeper/url.hpp"
-#include "zookeeper/zookeeper.hpp"
-
-class Watcher;
-class ZooKeeper;
-
-namespace mesos {
-namespace internal {
-
-// Returns the current master PID (waiting until a master is elected).
-process::Future<process::UPID> detect(
-    const std::string& master,
-    bool quiet = true);
-
-
-// Forward declarations.
-class ZooKeeperMasterDetectorProcess;
-
-/**
- * Implements functionality for:
- *   a) detecting masters
- *   b) contending to be a master
- */
-class MasterDetector
-{
-public:
-  virtual ~MasterDetector() = 0;
-
-  /**
-   * Creates a master detector that, given the specified master (which
-   * may be a ZooKeeper URL), knows how to connect to the master, or
-   * contend to be a master. The master detector sends messages to the
-   * specified pid when a new master is elected, a master is lost,
-   * etc.
-   *
-   * @param master string possibly containing zk:// or file://
-   * @param pid libprocess pid to both receive our messages and be
-   *   used if we should contend
-   * @param contend true if should contend to be master
-   * @param quite true if should limit log output
-   * @return instance of MasterDetector
-   */
-  static Try<MasterDetector*> create(const std::string& master,
-                                     const process::UPID& pid,
-                                     bool contend = false,
-                                     bool quiet = true);
-
-  /**
-   * Cleans up and deallocates the detector.
-   */
-  static void destroy(MasterDetector* detector);
-};
-
-
-class BasicMasterDetector : public MasterDetector
-{
-public:
-  /**
-   * Create a new master detector where the specified pid contends to
-   * be the master and gets elected by default.
-   *
-   * @param master libprocess pid to send messages/updates and be the
-   * master
-   */
-  BasicMasterDetector(const process::UPID& master);
-
-  /**
-   * Create a new master detector where the 'master' pid is 
-   * the master (no contending).
-   *
-   * @param master libprocess pid to send messages/updates and be the
-   * master
-   * @param pid/pids libprocess pids to send messages/updates to regarding
-   * the master
-   * @param elect if true then contend and elect the specified master
-   */
-  BasicMasterDetector(const process::UPID& master,
-		      const process::UPID& pid,
-		      bool elect = false);
-
-  BasicMasterDetector(const process::UPID& master,
-		      const std::vector<process::UPID>& pids,
-		      bool elect = false);
-
-  virtual ~BasicMasterDetector();
-
-private:
-  const process::UPID master;
-};
-
-
-class ZooKeeperMasterDetector : public MasterDetector
-{
-public:
-  /**
-   * Uses ZooKeeper for both detecting masters and contending to be a
-   * master.
-   *
-   * @param url znode path of the master
-   * @param pid libprocess pid to send messages/updates to (and to
-   * use for contending to be a master)
-   * @param contend true if should contend to be master and false otherwise (not
-   * needed for slaves and frameworks)
-   * @param quiet verbosity logging level for underlying ZooKeeper library
-   */
-  ZooKeeperMasterDetector(const zookeeper::URL& url,
-                          const process::UPID& pid,
-                          bool contend,
-                          bool quiet);
-
-  virtual ~ZooKeeperMasterDetector();
-
-  /**
-   *  Returns the ZooKeeper session ID associated with this detector.
-   */
-  process::Future<int64_t> session();
-
-  // Visible for testing.
-  ZooKeeperMasterDetectorProcess* process;
-};
-
-
-// TODO(benh): Make this value configurable via flags and verify that
-// it is always LESS THAN the slave heartbeat timeout.
-extern const Duration ZOOKEEPER_SESSION_TIMEOUT;
-
-
-class ZooKeeperMasterDetectorProcess
-  : public process::Process<ZooKeeperMasterDetectorProcess>
-{
-public:
-  ZooKeeperMasterDetectorProcess(
-    const zookeeper::URL& url,
-    const process::UPID& pid,
-    bool contend,
-    bool quiet);
-
-  virtual ~ZooKeeperMasterDetectorProcess();
-
-  virtual void initialize();
-
-  // ZooKeeperMasterDetector implementation.
-  int64_t session();
-
-  // ZooKeeper events.
-  void connected(bool reconnect);
-  void reconnecting();
-  void expired();
-  void updated(const std::string& path);
-  void created(const std::string& path);
-  void deleted(const std::string& path);
-
-private:
-  // Handles reconnecting "timeouts" by prematurely expiring a session
-  // (only used for contending instances). TODO(benh): Remove 'const
-  // &' after fixing libprocess.
-  void timedout(const int64_t& sessionId);
-
-  // Attempts to detect a master.
-  void detectMaster();
-
-  const zookeeper::URL url;
-  const ACL_vector acl;
-
-  const process::UPID pid;
-  bool contend;
-
-  Watcher* watcher;
-  ZooKeeper* zk;
-
-  bool expire;
-  Option<process::Timer> timer;
-
-  std::string currentMasterSeq;
-  process::UPID currentMasterPID;
-};
-
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __MASTER_DETECTOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 180756a..83a7f91 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -27,12 +27,12 @@
 
 #include "local.hpp"
 
-#include "detector/detector.hpp"
-
 #include "logging/flags.hpp"
 #include "logging/logging.hpp"
 
 #include "master/allocator.hpp"
+#include "master/contender.hpp"
+#include "master/detector.hpp"
 #include "master/drf_sorter.hpp"
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
@@ -79,7 +79,8 @@ static state::protobuf::State* state = NULL;
 static Registrar* registrar = NULL;
 static Master* master = NULL;
 static map<Isolator*, Slave*> slaves;
-static MasterDetector* detector = NULL;
+static StandaloneMasterDetector* detector = NULL;
+static MasterContender* contender = NULL;
 static Files* files = NULL;
 
 
@@ -126,7 +127,11 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
     state = new state::protobuf::State(storage);
     registrar = new Registrar(state);
 
-    master = new Master(_allocator, registrar, files, flags);
+    contender = new StandaloneMasterContender();
+    detector = new StandaloneMasterDetector();
+    master =
+      new Master(_allocator, registrar, files, contender, detector, flags);
+    detector->appoint(master->self());
   }
 
   PID<Master> pid = process::spawn(master);
@@ -147,13 +152,13 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
     // Use a different work directory for each slave.
     flags.work_dir = path::join(flags.work_dir, stringify(i));
 
-    Slave* slave = new Slave(flags, true, isolator, files);
+    // NOTE: At this point detector is already initialized by the
+    // Master.
+    Slave* slave = new Slave(flags, true, detector, isolator, files);
     slaves[isolator] = slave;
     pids.push_back(process::spawn(slave));
   }
 
-  detector = new BasicMasterDetector(pid, pids, true);
-
   return pid;
 }
 
@@ -186,6 +191,9 @@ void shutdown()
     delete detector;
     detector = NULL;
 
+    delete contender;
+    contender = NULL;
+
     delete files;
     files = NULL;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/local/main.cpp
----------------------------------------------------------------------
diff --git a/src/local/main.cpp b/src/local/main.cpp
index 5995c53..da431b7 100644
--- a/src/local/main.cpp
+++ b/src/local/main.cpp
@@ -22,8 +22,6 @@
 #include <stout/os.hpp>
 #include <stout/stringify.hpp>
 
-#include "detector/detector.hpp"
-
 #include "local/flags.hpp"
 #include "local/local.hpp"
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index deb5c97..218906a 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -298,7 +298,7 @@ Future<Response> Master::Http::redirect(const Request& request)
   LOG(INFO) << "HTTP request for '" << request.path << "'";
 
   // If there's no leader, redirect to this master's base url.
-  UPID pid = master.leader != UPID() ? master.leader : master.self();
+  UPID pid = master.leader.isSome() ? master.leader.get() : master.self();
 
   Try<string> hostname = net::getHostname(pid.ip);
   if (hostname.isError()) {
@@ -316,7 +316,7 @@ Future<Response> Master::Http::stats(const Request& request)
 
   JSON::Object object;
   object.values["uptime"] = (Clock::now() - master.startTime).secs();
-  object.values["elected"] = master.elected; // Note: using int not bool.
+  object.values["elected"] = master.elected(); // Note: using int not bool.
   object.values["total_schedulers"] = master.frameworks.size();
   object.values["active_schedulers"] = master.getActiveFrameworks().size();
   object.values["activated_slaves"] = master.slaves.size();
@@ -393,9 +393,8 @@ Future<Response> Master::Http::state(const Request& request)
     object.values["cluster"] = master.flags.cluster.get();
   }
 
-  // TODO(benh): Use an Option for the leader PID.
-  if (master.leader != UPID()) {
-    object.values["leader"] = string(master.leader);
+  if (master.leader.isSome()) {
+    object.values["leader"] = string(master.leader.get());
   }
 
   if (master.flags.log_dir.isSome()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index 45caf9d..60c86b3 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -30,12 +30,12 @@
 
 #include "common/build.hpp"
 
-#include "detector/detector.hpp"
-
 #include "logging/flags.hpp"
 #include "logging/logging.hpp"
 
 #include "master/allocator.hpp"
+#include "master/contender.hpp"
+#include "master/detector.hpp"
 #include "master/drf_sorter.hpp"
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
@@ -46,8 +46,11 @@
 #include "state/storage.hpp"
 
 
+#include "zookeeper/detector.hpp"
+
 using namespace mesos::internal;
 using namespace mesos::internal::master;
+using namespace zookeeper;
 
 using mesos::MasterInfo;
 
@@ -146,13 +149,28 @@ int main(int argc, char** argv)
   Registrar* registrar = new Registrar(state);
 
   Files files;
-  Master* master = new Master(allocator, registrar, &files, flags);
-  process::spawn(master);
 
-  Try<MasterDetector*> detector =
-    MasterDetector::create(zk, master->self(), true, flags.quiet);
+  MasterContender* contender;
+  MasterDetector* detector;
+
+  Try<MasterContender*> contender_ = MasterContender::create(zk);
+  CHECK_SOME(contender_) << "Failed to create a master contender";
+  contender = contender_.get();
+
+  Try<MasterDetector*> detector_ = MasterDetector::create(zk);
+  CHECK_SOME(detector_) << "Failed to create a master detector";
+  detector = detector_.get();
 
-  CHECK_SOME(detector) << "Failed to create a master detector";
+  Master* master = new Master(
+      allocator, registrar, &files, contender, detector, flags);
+
+  if (zk == "") {
+    // It means we are using the standalone detector so we need to
+    // appoint this Master as the leader.
+    dynamic_cast<StandaloneMasterDetector*>(detector)->appoint(master->self());
+  }
+
+  process::spawn(master);
 
   process::wait(master->self());
   delete master;
@@ -163,7 +181,8 @@ int main(int argc, char** argv)
   delete state;
   delete storage;
 
-  MasterDetector::destroy(detector.get());
+  delete contender;
+  delete detector;
 
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index abab6ce..f65b344 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -189,27 +189,19 @@ private:
 Master::Master(
     Allocator* _allocator,
     Registrar* _registrar,
-    Files* _files)
-  : ProcessBase("master"),
-    http(*this),
-    flags(),
-    allocator(_allocator),
-    registrar(_registrar),
-    files(_files),
-    completedFrameworks(MAX_COMPLETED_FRAMEWORKS) {}
-
-
-Master::Master(
-    Allocator* _allocator,
-    Registrar* _registrar,
     Files* _files,
+    MasterContender* _contender,
+    MasterDetector* _detector,
     const Flags& _flags)
   : ProcessBase("master"),
     http(*this),
     flags(_flags),
+    leader(None()),
     allocator(_allocator),
     registrar(_registrar),
     files(_files),
+    contender(_contender),
+    detector(_detector),
     completedFrameworks(MAX_COMPLETED_FRAMEWORKS) {}
 
 
@@ -294,7 +286,6 @@ void Master::initialize()
 
   // The master ID is currently comprised of the current date, the IP
   // address and port from self() and the OS PID.
-
   Try<string> id =
     strings::format("%s-%u-%u-%d", DateUtils::currentDate(),
                     self().ip, self().port, getpid());
@@ -410,8 +401,6 @@ void Master::initialize()
   whitelistWatcher = new WhitelistWatcher(flags.whitelist, allocator);
   spawn(whitelistWatcher);
 
-  elected = false;
-
   nextFrameworkId = 0;
   nextSlaveId = 0;
   nextOfferId = 0;
@@ -436,13 +425,6 @@ void Master::initialize()
       &Master::submitScheduler,
       &SubmitSchedulerRequest::name);
 
-  install<NewMasterDetectedMessage>(
-      &Master::newMasterDetected,
-      &NewMasterDetectedMessage::pid);
-
-  install<NoMasterDetectedMessage>(
-      &Master::noMasterDetected);
-
   install<RegisterFrameworkMessage>(
       &Master::registerFramework,
       &RegisterFrameworkMessage::framework);
@@ -558,6 +540,12 @@ void Master::initialize()
         .onAny(defer(self(), &Self::fileAttached, lambda::_1, log.get()));
     }
   }
+
+  contender->initialize(self());
+
+  // Start contending to be a leading master.
+  contender->contend()
+    .onAny(defer(self(), &Master::contended, lambda::_1));
 }
 
 
@@ -692,34 +680,80 @@ void Master::submitScheduler(const string& name)
 }
 
 
-void Master::newMasterDetected(const UPID& pid)
+void Master::contended(const Future<Future<Nothing> >& _contended)
 {
-  // Check and see if we are (1) still waiting to be the elected
-  // master, (2) newly elected master, (3) no longer elected master,
-  // or (4) still elected master.
-
-  leader = pid;
-
-  if (leader != self() && !elected) {
-    LOG(INFO) << "Waiting to be master!";
-  } else if (leader == self() && !elected) {
-    LOG(INFO) << "Elected as master!";
-    elected = true;
-  } else if (leader != self() && elected) {
-    LOG(FATAL) << "No longer elected master ... committing suicide!";
-  } else if (leader == self() && elected) {
-    LOG(INFO) << "Still acting as master!";
+  if (_contended.isFailed()) {
+    CHECK(!elected()) << "Failed to contend so we should not be elected";
+    LOG(ERROR) << "Failed to contend when not elected: "
+               << _contended.failure() << "; contend again...";
+    contender->contend()
+      .onAny(defer(self(), &Master::contended, lambda::_1));
+    return;
+  }
+
+  CHECK(_contended.isReady()) <<
+    "Not expecting MasterContender to discard this future";
+
+  // Now that we know we have our candidacy registered, we start
+  // detecting who is the leader.
+  detector->detect()
+    .onAny(defer(self(), &Master::detected, lambda::_1));
+
+  // Watch for candidacy change.
+  _contended.get()
+    .onAny(defer(self(), &Master::lostCandidacy, lambda::_1));
+}
+
+
+void Master::lostCandidacy(const Future<Nothing>& lost)
+{
+  CHECK(!lost.isDiscarded())
+    << "Not expecting MasterContender to discard this future";
+
+  if (lost.isFailed()) {
+    LOG(ERROR) << "Failed to watch for candidacy: " << lost.failure();
+  }
+
+  if (elected()) {
+    EXIT(1) << "Lost leadership... committing suicide!";
+  } else {
+    LOG(INFO) << "Lost candidacy as a follower... Contend again";
+    contender->contend()
+      .onAny(defer(self(), &Master::contended, lambda::_1));
   }
 }
 
 
-void Master::noMasterDetected()
+void Master::detected(const Future<Result<UPID> >& _leader)
 {
-  if (elected) {
-    LOG(FATAL) << "No longer elected master ... committing suicide!";
+  CHECK(_leader.isReady())
+    << "Not expecting MasterContender to fail or discard this future";
+
+  bool wasElected = elected();
+  leader = _leader.get();
+
+  if (leader.isError()) {
+    if (wasElected) {
+      EXIT(1) << "Failed to detect the leading master while elected: "
+                 << leader.error() << "; committing suicide!";
+    } else {
+      LOG(ERROR) << "Failed to detect the leading master when not elected: "
+                 << leader.error();
+    }
   } else {
-    LOG(FATAL) << "No master detected (?) ... committing suicide!";
+    LOG(INFO) << "The newly elected leader is "
+              << (leader.isSome() ? leader.get() : "NONE");
+
+    if (!wasElected && elected()) {
+      LOG(INFO) << "Elected as the leading master!";
+    } else if (wasElected && !elected()) {
+      EXIT(1) << "Lost leadership... committing suicide!";
+    }
   }
+
+  // Keep detecting.
+  detector->detect(leader)
+    .onAny(defer(self(), &Master::detected, lambda::_1));
 }
 
 
@@ -736,7 +770,7 @@ void Master::registerFramework(
     return;
   }
 
-  if (!elected) {
+  if (!elected()) {
     LOG(WARNING) << "Ignoring register framework message since not elected yet";
     return;
   }
@@ -815,7 +849,7 @@ void Master::reregisterFramework(
     return;
   }
 
-  if (!elected) {
+  if (!elected()) {
     LOG(WARNING) << "Ignoring re-register framework message since "
                  << "not elected yet";
     return;
@@ -1197,7 +1231,7 @@ void Master::schedulerMessage(const SlaveID& slaveId,
 
 void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
 {
-  if (!elected) {
+  if (!elected()) {
     LOG(WARNING) << "Ignoring register slave message from "
                  << slaveInfo.hostname() << " since not elected yet";
     return;
@@ -1243,7 +1277,7 @@ void Master::reregisterSlave(
     const vector<ExecutorInfo>& executorInfos,
     const vector<Task>& tasks)
 {
-  if (!elected) {
+  if (!elected()) {
     LOG(WARNING) << "Ignoring re-register slave message from "
                  << slaveInfo.hostname() << " since not elected yet";
     return;

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e377af8..c86c1f1 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -46,6 +46,8 @@
 #include "files/files.hpp"
 
 #include "master/constants.hpp"
+#include "master/contender.hpp"
+#include "master/detector.hpp"
 #include "master/flags.hpp"
 #include "master/registrar.hpp"
 
@@ -85,21 +87,15 @@ class Master : public ProtobufProcess<Master>
 public:
   Master(allocator::Allocator* allocator,
          Registrar* registrar,
-         Files* files);
-
-  Master(allocator::Allocator* allocator,
-         Registrar* registrar,
          Files* files,
-         const Flags& flags);
+         MasterContender* contender,
+         MasterDetector* detector,
+         const Flags& flags = Flags());
 
   virtual ~Master();
 
   void submitScheduler(
       const std::string& name);
-  void newMasterDetected(
-      const UPID& pid);
-  void noMasterDetected();
-  void masterDetectionFailure();
   void registerFramework(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo);
@@ -195,6 +191,15 @@ protected:
   // Return connected frameworks that are not in the process of being removed
   std::vector<Framework*> getActiveFrameworks() const;
 
+  // Invoked when the contender has entered the contest.
+  void contended(const Future<Future<Nothing> >& contended);
+
+  // Invoked when the contender has lost the candidacy.
+  void lostCandidacy(const Future<Nothing>& lost);
+
+  // Invoked when there is a newly elected leading master.
+  void detected(const Future<Result<UPID> >& pid);
+
   // Process a launch tasks request (for a non-cancelled offer) by
   // launching the desired tasks (if the offer contains a valid set of
   // tasks) and reporting any unused resources to the allocator.
@@ -301,15 +306,19 @@ private:
 
   const Flags flags;
 
-  UPID leader; // Current leading master.
+  Result<UPID> leader; // Current leading master.
 
-  bool elected;
+  // Whether we are the current leading master.
+  bool elected() const { return leader.isSome() && leader.get() == self(); }
 
   allocator::Allocator* allocator;
   WhitelistWatcher* whitelistWatcher;
   Registrar* registrar;
   Files* files;
 
+  MasterContender* contender;
+  MasterDetector* detector;
+
   MasterInfo info;
 
   hashmap<FrameworkID, Framework*> frameworks;

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 71f68a0..1f264d5 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -332,17 +332,6 @@ message FrameworkExpiredMessage {
 message ShutdownMessage {}
 
 
-// Master detector messages.
-
-
-message NoMasterDetectedMessage {}
-
-
-message NewMasterDetectedMessage {
-  required string pid = 2;
-}
-
-
 message AuthenticateMessage {
   required string pid = 1; // PID that needs to be authenticated.
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 3abe72f..51f95bb 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -34,11 +34,14 @@
 
 #include <process/defer.hpp>
 #include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 
+#include <stout/check.hpp>
 #include <stout/duration.hpp>
 #include <stout/error.hpp>
 #include <stout/flags.hpp>
@@ -55,10 +58,12 @@
 #include "common/lock.hpp"
 #include "common/type_utils.hpp"
 
-#include "detector/detector.hpp"
+#include "master/detector.hpp"
 
 #include "local/local.hpp"
 
+#include "master/detector.hpp"
+
 #include "logging/flags.hpp"
 #include "logging/logging.hpp"
 
@@ -66,6 +71,7 @@
 
 using namespace mesos;
 using namespace mesos::internal;
+using namespace mesos::internal::master;
 
 using namespace process;
 
@@ -92,22 +98,20 @@ public:
                    Scheduler* _scheduler,
                    const FrameworkInfo& _framework,
                    const Option<Credential>& _credential,
-                   const string& _url,
+                   MasterDetector* _detector,
                    pthread_mutex_t* _mutex,
                    pthread_cond_t* _cond)
     : ProcessBase(ID::generate("scheduler")),
       driver(_driver),
       scheduler(_scheduler),
       framework(_framework),
-      url(_url),
       mutex(_mutex),
       cond(_cond),
       failover(_framework.has_id() && !framework.id().value().empty()),
-      master(UPID()),
+      master(None()),
       connected(false),
       aborted(false),
-      // TODO(benh): Add Try().
-      detector(Error("uninitialized")),
+      detector(_detector),
       credential(_credential),
       authenticatee(NULL),
       authenticating(None()),
@@ -123,24 +127,8 @@ public:
 protected:
   virtual void initialize()
   {
-    // The master detector needs to be created after this process is
-    // running so that the "master detected" message is not dropped.
     // TODO(benh): Get access to flags so that we can decide whether
     // or not to make ZooKeeper verbose.
-    detector = MasterDetector::create(url, self(), false);
-    if (detector.isError()) {
-      driver->abort();
-      scheduler->error(driver, detector.error());
-      return;
-    }
-
-    install<NewMasterDetectedMessage>(
-        &SchedulerProcess::newMasterDetected,
-        &NewMasterDetectedMessage::pid);
-
-    install<NoMasterDetectedMessage>(
-        &SchedulerProcess::noMasterDetected);
-
     install<FrameworkRegisteredMessage>(
         &SchedulerProcess::registered,
         &FrameworkRegisteredMessage::framework_id,
@@ -179,31 +167,30 @@ protected:
     install<FrameworkErrorMessage>(
         &SchedulerProcess::error,
         &FrameworkErrorMessage::message);
-  }
 
-  virtual void finalize()
-  {
-    if (detector.isSome()) {
-      MasterDetector::destroy(detector.get());
-    }
+    // Start detecting masters.
+    detector->detect(master)
+      .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1));
   }
 
-  void newMasterDetected(const UPID& pid)
+  void detected(const Future<Result<UPID> >& pid)
   {
     if (aborted) {
-      VLOG(1) << "Ignoring new master detected message because "
-              << "the driver is aborted!";
+      VLOG(1) << "Ignoring the master change because the driver is aborted!";
       return;
     }
 
-    VLOG(1) << "New master at " << pid;
-
-    master = pid;
-    link(master);
+    // Not expecting MasterDetector to discard or fail the future.
+    CHECK(pid.isReady());
+    master = pid.get();
 
-    // If master failed over, inform the scheduler about the
-    // disconnection.
     if (connected) {
+      // There are three cases here:
+      //   1. The master failed.
+      //   2. The master failed over to a new master.
+      //   3. The master failed over to the same master.
+      // In any case, we will reconnect (possibly immediately), so we
+      // must notify schedulers of the disconnection.
       Stopwatch stopwatch;
       if (FLAGS_v >= 1) {
         stopwatch.start();
@@ -216,47 +203,35 @@ protected:
 
     connected = false;
 
-    if (credential.isSome()) {
-      // Authenticate with the master.
-      authenticate();
-    } else {
-      // Proceed with registration without authentication.
-      LOG(INFO) << "No credentials provided."
-                << " Attempting to register without authentication";
-
-      doReliableRegistration();
-    }
-  }
+    if (master.isSome()) {
+      VLOG(1) << "New master detected at " << master.get();
+      link(master.get());
 
-  void noMasterDetected()
-  {
-    if (aborted) {
-      VLOG(1) << "Ignoring no master detected message because "
-              << "the driver is aborted!";
-      return;
-    }
-
-    VLOG(1) << "No master detected, waiting for another master";
+      if (credential.isSome()) {
+        // Authenticate with the master.
+        authenticate();
+      } else {
+        // Proceed with registration without authentication.
+        LOG(INFO) << "No credentials provided."
+                  << " Attempting to register without authentication";
 
-    // Inform the scheduler about the disconnection if the driver
-    // was previously registered with the master.
-    if (connected) {
-      Stopwatch stopwatch;
-      if (FLAGS_v >= 1) {
-        stopwatch.start();
+        doReliableRegistration();
       }
-
-      scheduler->disconnected(driver);
-
-      VLOG(1) << "Scheduler::disconnected took " << stopwatch.elapsed();
+    } else if (master.isNone()) {
+      // In this case, we don't actually invoke Scheduler::error
+      // since we might get reconnected to a master imminently.
+      VLOG(1) << "No master detected";
+    } else {
+      LOG(ERROR) << "Failed to detect master: " << master.error();
     }
 
-    // In this case, we don't actually invoke Scheduler::error
-    // since we might get reconnected to a master imminently.
-    connected = false;
-    master = UPID();
+    // Keep detecting masters.
+    LOG(INFO) << "Detecting new master";
+    detector->detect(master)
+      .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1));
   }
 
+
   void authenticate()
   {
     if (aborted) {
@@ -266,7 +241,7 @@ protected:
 
     authenticated = false;
 
-    if (!master) {
+    if (!master.isSome()) {
       return;
     }
 
@@ -282,7 +257,7 @@ protected:
       return;
     }
 
-    LOG(INFO) << "Authenticating with master " << master;
+    LOG(INFO) << "Authenticating with master " << master.get();
 
     CHECK_SOME(credential);
 
@@ -302,7 +277,7 @@ protected:
     //     'Authenticatee'.
     // --> '~Authenticatee()' is invoked by 'AuthenticateeProcess'.
     // TODO(vinod): Consider using 'Shared' to 'Owned' upgrade.
-    authenticating = authenticatee->authenticate(master)
+    authenticating = authenticatee->authenticate(master.get())
       .onAny(defer(self(), &Self::_authenticate));
 
     delay(Seconds(5),
@@ -324,9 +299,21 @@ protected:
     CHECK_SOME(authenticating);
     const Future<bool>& future = authenticating.get();
 
+    if (!master.isSome()) {
+      LOG(INFO) << "Ignoring _authenticate because the master is lost";
+      authenticating = None();
+      // Set it to false because we do not want further retries until
+      // a new master is detected.
+      // We obviously do not need to reauthenticate either even if
+      // 'reauthenticate' is currently true because the master is
+      // lost.
+      reauthenticate = false;
+      return;
+    }
+
     if (reauthenticate || !future.isReady()) {
       LOG(WARNING)
-        << "Failed to authenticate with master " << master << ": "
+        << "Failed to authenticate with master " << master.get() << ": "
         << (reauthenticate ? "master changed" :
            (future.isFailed() ? future.failure() : "future discarded"));
 
@@ -339,12 +326,12 @@ protected:
     }
 
     if (!future.get()) {
-      LOG(ERROR) << "Master " << master << " refused authentication";
+      LOG(ERROR) << "Master " << master.get() << " refused authentication";
       error("Master refused authentication");
       return;
     }
 
-    LOG(INFO) << "Successfully authenticated with master " << master;
+    LOG(INFO) << "Successfully authenticated with master " << master.get();
 
     authenticated = true;
     authenticating = None();
@@ -433,7 +420,7 @@ protected:
 
   void doReliableRegistration()
   {
-    if (connected || !master) {
+    if (connected || !master.isSome()) {
       return;
     }
 
@@ -445,13 +432,13 @@ protected:
       // Touched for the very first time.
       RegisterFrameworkMessage message;
       message.mutable_framework()->MergeFrom(framework);
-      send(master, message);
+      send(master.get(), message);
     } else {
       // Not the first time, or failing over.
       ReregisterFrameworkMessage message;
       message.mutable_framework()->MergeFrom(framework);
       message.set_failover(failover);
-      send(master, message);
+      send(master.get(), message);
     }
 
     delay(Seconds(1), self(), &Self::doReliableRegistration);
@@ -582,10 +569,10 @@ protected:
       return;
     }
 
-    if (from != master) {
+    if (!master.isSome() || from != master.get()) {
       LOG(WARNING) << "Ignoring lost slave message from " << from
                    << " because it is not from the registered master ("
-                   << master << ")";
+                   << (master.isSome() ? master.get() : "NONE/ERROR") << ")";
       return;
     }
 
@@ -657,7 +644,8 @@ protected:
     if (connected && !failover) {
       UnregisterFrameworkMessage message;
       message.mutable_framework_id()->MergeFrom(framework.id());
-      send(master, message);
+      CHECK_SOME(master);
+      send(master.get(), message);
     }
 
     Lock lock(mutex);
@@ -683,7 +671,8 @@ protected:
 
     DeactivateFrameworkMessage message;
     message.mutable_framework_id()->MergeFrom(framework.id());
-    send(master, message);
+    CHECK_SOME(master);
+    send(master.get(), message);
 
     Lock lock(mutex);
     pthread_cond_signal(cond);
@@ -699,7 +688,8 @@ protected:
     KillTaskMessage message;
     message.mutable_framework_id()->MergeFrom(framework.id());
     message.mutable_task_id()->MergeFrom(taskId);
-    send(master, message);
+    CHECK_SOME(master);
+    send(master.get(), message);
   }
 
   void requestResources(const vector<Request>& requests)
@@ -714,7 +704,8 @@ protected:
     foreach (const Request& request, requests) {
       message.add_requests()->MergeFrom(request);
     }
-    send(master, message);
+    CHECK_SOME(master);
+    send(master.get(), message);
   }
 
   void launchTasks(const OfferID& offerId,
@@ -822,7 +813,8 @@ protected:
     // Remove the offer since we saved all the PIDs we might use.
     savedOffers.erase(offerId);
 
-    send(master, message);
+    CHECK_SOME(master);
+    send(master.get(), message);
   }
 
   void reviveOffers()
@@ -834,7 +826,8 @@ protected:
 
     ReviveOffersMessage message;
     message.mutable_framework_id()->MergeFrom(framework.id());
-    send(master, message);
+    CHECK_SOME(master);
+    send(master.get(), message);
   }
 
   void sendFrameworkMessage(const ExecutorID& executorId,
@@ -874,7 +867,8 @@ protected:
       message.mutable_framework_id()->MergeFrom(framework.id());
       message.mutable_executor_id()->MergeFrom(executorId);
       message.set_data(data);
-      send(master, message);
+      CHECK_SOME(master);
+      send(master.get(), message);
     }
   }
 
@@ -892,7 +886,8 @@ protected:
       message.add_statuses()->MergeFrom(status);
     }
 
-    send(master, message);
+    CHECK_SOME(master);
+    send(master.get(), message);
   }
 
 private:
@@ -901,16 +896,15 @@ private:
   MesosSchedulerDriver* driver;
   Scheduler* scheduler;
   FrameworkInfo framework;
-  string url; // URL for the master (e.g., zk://, file://, etc).
   pthread_mutex_t* mutex;
   pthread_cond_t* cond;
   bool failover;
-  UPID master;
+  Result<UPID> master;
 
   volatile bool connected; // Flag to indicate if framework is registered.
   volatile bool aborted; // Flag to indicate if the driver is aborted.
 
-  Try<MasterDetector*> detector;
+  MasterDetector* detector;
 
   hashmap<OfferID, hashmap<SlaveID, UPID> > savedOffers;
   hashmap<SlaveID, UPID> savedSlavePids;
@@ -955,7 +949,9 @@ MesosSchedulerDriver::MesosSchedulerDriver(
     framework(_framework),
     master(_master),
     process(NULL),
-    status(DRIVER_NOT_STARTED)
+    status(DRIVER_NOT_STARTED),
+    credential(NULL),
+    detector(NULL)
 {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
 
@@ -1007,13 +1003,7 @@ MesosSchedulerDriver::MesosSchedulerDriver(
 
   CHECK(process == NULL);
 
-  if (pid.isSome()) {
-    process = new SchedulerProcess(
-        this, scheduler, framework, None(), pid.get(), &mutex, &cond);
-  } else {
-    process = new SchedulerProcess(
-        this, scheduler, framework, None(), master, &mutex, &cond);
-  }
+  url = pid.isSome() ? static_cast<string>(pid.get()) : master;
 }
 
 
@@ -1023,12 +1013,14 @@ MesosSchedulerDriver::MesosSchedulerDriver(
     Scheduler* _scheduler,
     const FrameworkInfo& _framework,
     const string& _master,
-    const Credential& credential)
+    const Credential& _credential)
   : scheduler(_scheduler),
     framework(_framework),
     master(_master),
     process(NULL),
-    status(DRIVER_NOT_STARTED)
+    status(DRIVER_NOT_STARTED),
+    credential(new Credential(_credential)),
+    detector(NULL)
 {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
 
@@ -1080,13 +1072,7 @@ MesosSchedulerDriver::MesosSchedulerDriver(
 
   CHECK(process == NULL);
 
-  if (pid.isSome()) {
-    process = new SchedulerProcess(
-        this, scheduler, framework, credential, pid.get(), &mutex, &cond);
-  } else {
-    process = new SchedulerProcess(
-        this, scheduler, framework, credential, master, &mutex, &cond);
-  }
+  url = pid.isSome() ? static_cast<string>(pid.get()) : master;
 }
 
 
@@ -1116,6 +1102,10 @@ MesosSchedulerDriver::~MesosSchedulerDriver()
   pthread_mutex_destroy(&mutex);
   pthread_cond_destroy(&cond);
 
+  if (detector != NULL) {
+    delete detector;
+  }
+
   // Check and see if we need to shutdown a local cluster.
   if (master == "local" || master == "localquiet") {
     local::shutdown();
@@ -1131,7 +1121,31 @@ Status MesosSchedulerDriver::start()
     return status;
   }
 
-  CHECK(process != NULL);
+  if (detector == NULL) {
+    Try<MasterDetector*> detector_ = MasterDetector::create(url);
+
+    if (detector_.isError()) {
+      status = DRIVER_ABORTED;
+      string message = "Failed to create a master detector for '" +
+      master + "': " + detector_.error();
+      scheduler->error(this, message);
+      return status;
+    }
+
+    // Save the detector so we can delete it later.
+    detector = detector_.get();
+  }
+
+  CHECK(process == NULL);
+
+  if (credential == NULL) {
+    process = new SchedulerProcess(
+        this, scheduler, framework, None(), detector, &mutex, &cond);
+  } else {
+    const Credential& cred = *credential;
+    process = new SchedulerProcess(
+        this, scheduler, framework, cred, detector, &mutex, &cond);
+  }
 
   spawn(process);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 62fbb37..2f0bd8d 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -288,7 +288,7 @@ Future<Response> Slave::Http::stats(const Request& request)
   object.values["lost_tasks"] = slave.stats.tasks[TASK_LOST];
   object.values["valid_status_updates"] = slave.stats.validStatusUpdates;
   object.values["invalid_status_updates"] = slave.stats.invalidStatusUpdates;
-  object.values["registered"] = slave.master ? "1" : "0";
+  object.values["registered"] = slave.master.isSome() ? "1" : "0";
   object.values["recovery_errors"] = slave.recoveryErrors;
 
   return OK(object, request.query.get("jsonp"));
@@ -317,7 +317,7 @@ Future<Response> Slave::Http::state(const Request& request)
   object.values["failed_tasks"] = slave.stats.tasks[TASK_FAILED];
   object.values["lost_tasks"] = slave.stats.tasks[TASK_LOST];
 
-  Try<string> masterHostname = net::getHostname(slave.master.ip);
+  Try<string> masterHostname = net::getHostname(slave.master.get().ip);
   if (masterHostname.isSome()) {
     object.values["master_hostname"] = masterHostname.get();
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index 750a127..e83cd9e 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -27,7 +27,7 @@
 
 #include "common/build.hpp"
 
-#include "detector/detector.hpp"
+#include "master/detector.hpp"
 
 #include "logging/logging.hpp"
 
@@ -127,19 +127,18 @@ int main(int argc, char** argv)
   LOG(INFO) << "Build: " << build::DATE << " by " << build::USER;
   LOG(INFO) << "Starting Mesos slave";
 
-  Files files;
-  Slave* slave = new Slave(flags, false, isolator, &files);
-  process::spawn(slave);
-
-  Try<MasterDetector*> detector =
-    MasterDetector::create(master.get(), slave->self(), false, flags.quiet);
+  Try<MasterDetector*> detector = MasterDetector::create(master.get());
 
   CHECK_SOME(detector) << "Failed to create a master detector";
 
+  Files files;
+  Slave* slave = new Slave(flags, false,  detector.get(), isolator, &files);
+  process::spawn(slave);
+
   process::wait(slave->self());
   delete slave;
 
-  MasterDetector::destroy(detector.get());
+  delete detector.get();
   Isolator::destroy(isolator);
 
   return 0;

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index bb98fce..a9be378 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -73,6 +73,7 @@ using namespace state;
 
 Slave::Slave(const slave::Flags& _flags,
              bool _local,
+             MasterDetector* _detector,
              Isolator* _isolator,
              Files* _files)
   : ProcessBase(ID::generate("slave")),
@@ -80,7 +81,9 @@ Slave::Slave(const slave::Flags& _flags,
     http(*this),
     flags(_flags),
     local(_local),
+    master(None()),
     completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
+    detector(_detector),
     isolator(_isolator),
     files(_files),
     monitor(_isolator),
@@ -281,13 +284,6 @@ void Slave::initialize()
   startTime = Clock::now();
 
   // Install protobuf handlers.
-  install<NewMasterDetectedMessage>(
-      &Slave::newMasterDetected,
-      &NewMasterDetectedMessage::pid);
-
-  install<NoMasterDetectedMessage>(
-      &Slave::noMasterDetected);
-
   install<SlaveRegisteredMessage>(
       &Slave::registered,
       &SlaveRegisteredMessage::slave_id);
@@ -433,10 +429,10 @@ void Slave::shutdown(const UPID& from)
   // Allow shutdown message only if
   // 1) Its a message received from the registered master or
   // 2) If its called locally (e.g tests)
-  if (from && from != master) {
+  if (from && (!master.isSome() || from != master.get())) {
     LOG(WARNING) << "Ignoring shutdown message from " << from
-                 << " because it is not from the registered master ("
-                 << master << ")";
+                 << " because it is not from the registered master: "
+                 << (master.isSome() ? master.get() : "None/Error");
     return;
   }
 
@@ -483,75 +479,67 @@ Nothing Slave::detachFile(const string& path)
 }
 
 
-void Slave::newMasterDetected(const UPID& pid)
+void Slave::detected(const Future<Result<UPID> >& pid)
 {
-  LOG(INFO) << "New master detected at " << pid;
-
-  master = pid;
-  link(master);
-
-  // Inform the status updates manager about the new master.
-  statusUpdateManager->newMasterDetected(master);
+  CHECK(state == DISCONNECTED ||
+        state == RUNNING ||
+        state == TERMINATING) << state;
 
-  if (flags.recover == "cleanup") {
-    LOG(INFO) << "Skipping registration because slave is in 'cleanup' mode";
-    return;
+  if (state != TERMINATING) {
+    state = DISCONNECTED;
   }
 
-  switch (state) {
-    case RECOVERING:
-      LOG(INFO) << "Postponing registration until recovery is complete";
-      break;
-    case DISCONNECTED:
-    case RUNNING:
-      state = DISCONNECTED;
-      doReliableRegistration();
-      break;
-    case TERMINATING:
-      LOG(INFO) << "Skipping registration because slave is terminating";
-      break;
-    default:
-      LOG(FATAL) << "Unexpected slave state " << state;
-      break;
-  }
-}
+  // Not expecting MasterDetector to discard or fail futures.
+  CHECK(pid.isReady());
+  master = pid.get();
 
+  if (master.isSome()) {
+    LOG(INFO) << "New master detected at " << master.get();
+    link(master.get());
 
-void Slave::noMasterDetected()
-{
-  LOG(INFO) << "Lost master(s) ... waiting";
-  master = UPID();
+    // Inform the status updates manager about the new master.
+    statusUpdateManager->newMasterDetected(master.get());
 
-  CHECK(state == RECOVERING || state == DISCONNECTED ||
-        state == RUNNING || state == TERMINATING)
-    << state;
+    if (state == TERMINATING) {
+      LOG(INFO) << "Skipping registration because slave is terminating";
+      return;
+    }
 
-  // We only change state if the slave is in RUNNING state because
-  // if the slave is in:
-  // RECOVERY: Slave needs to finish recovery before changing states.
-  // DISCONNECTED: Redundant.
-  // TERMINATING: Slave is shutting down.
-  // TODO(vinod): Subscribe to master detector after recovery.
-  // Similarly, unsubscribe from master detector during termination.
-  // Currently it is tricky because master detector is injected into
-  // the slave from outside.
-  if (state == RUNNING) {
-    state = DISCONNECTED;
+    // The slave does not (re-)register if it is in the cleanup mode
+    // because we do not want to accept new tasks.
+    if (flags.recover == "cleanup") {
+      LOG(INFO)
+        << "Skipping registration because slave was started in cleanup mode";
+      return;
+    }
+
+    doReliableRegistration();
+  } else if (master.isNone()) {
+    LOG(INFO) << "Lost leading master";
+  } else {
+    LOG(ERROR) << "Failed to detect a master: " << master.error();
   }
+
+  // Keep detecting masters.
+  LOG(INFO) << "Detecting new master";
+  detector->detect(master)
+    .onAny(defer(self(), &Slave::detected, lambda::_1));
 }
 
 
 void Slave::registered(const UPID& from, const SlaveID& slaveId)
 {
-  if (from != master) {
+  if (!master.isSome() || from != master.get()) {
     LOG(WARNING) << "Ignoring registration message from " << from
-                 << " because it is not the expected master " << master;
+                 << " because it is not the expected master: "
+                 << (master.isSome() ? master.get() : "NONE/ERROR");
     return;
   }
 
   switch(state) {
     case DISCONNECTED: {
-      LOG(INFO) << "Registered with master " << master
+      CHECK_SOME(master);
+      LOG(INFO) << "Registered with master " << master.get()
                 << "; given slave ID " << slaveId;
 
       state = RUNNING;
@@ -575,7 +563,8 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
        EXIT(1) << "Registered but got wrong id: " << slaveId
                << "(expected: " << info.id() << "). Committing suicide";
       }
-      LOG(WARNING) << "Already registered with master " << master;
+      CHECK_SOME(master);
+      LOG(WARNING) << "Already registered with master " << master.get();
       break;
     case TERMINATING:
       LOG(WARNING) << "Ignoring registration because slave is terminating";
@@ -590,15 +579,17 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
 
 void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
 {
-  if (from != master) {
+  if (!master.isSome() || from != master.get()) {
     LOG(WARNING) << "Ignoring re-registration message from " << from
-                 << " because it is not the expected master " << master;
+                 << " because it is not the expected master: "
+                 << (master.isSome() ? master.get() : "NONE/ERROR");
     return;
   }
 
   switch(state) {
     case DISCONNECTED:
-      LOG(INFO) << "Re-registered with master " << master;
+      CHECK_SOME(master);
+      LOG(INFO) << "Re-registered with master " << master.get();
 
       state = RUNNING;
       if (!(info.id() == slaveId)) {
@@ -612,7 +603,8 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
         EXIT(1) << "Re-registered but got wrong id: " << slaveId
                 << "(expected: " << info.id() << "). Committing suicide";
       }
-      LOG(WARNING) << "Already re-registered with master " << master;
+      CHECK_SOME(master);
+      LOG(WARNING) << "Already re-registered with master " << master.get();
       break;
     case TERMINATING:
       LOG(WARNING) << "Ignoring re-registration because slave is terminating";
@@ -633,7 +625,7 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
 
 void Slave::doReliableRegistration()
 {
-  if (!master) {
+  if (!master.isSome()) {
     LOG(INFO) << "Skipping registration because no master present";
     return;
   }
@@ -649,7 +641,7 @@ void Slave::doReliableRegistration()
     // (Vinod): Is the above comment true?
     RegisterSlaveMessage message;
     message.mutable_slave()->CopyFrom(info);
-    send(master, message);
+    send(master.get(), message);
   } else {
     // Re-registering, so send tasks running.
     ReregisterSlaveMessage message;
@@ -698,7 +690,9 @@ void Slave::doReliableRegistration()
         }
       }
     }
-    send(master, message);
+
+    CHECK_SOME(master);
+    send(master.get(), message);
   }
 
   // Retry registration if necessary.
@@ -1117,10 +1111,11 @@ void Slave::shutdownFramework(
   // Allow shutdownFramework() only if
   // its called directly (e.g. Slave::finalize()) or
   // its a message from the currently registered master.
-  if (from && from != master) {
+  if (from && (!master.isSome() || from != master.get())) {
     LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
-                 << " from " << from << " because it is not from the registered"
-                 << " master (" << master << ")";
+                 << " from " << from
+                 << " because it is not from the registered master ("
+                 << (master.isSome() ? master.get() : "NONE/ERROR") << ")";
     return;
   }
 
@@ -1944,7 +1939,7 @@ void Slave::exited(const UPID& pid)
 {
   LOG(INFO) << pid << " exited";
 
-  if (master == pid) {
+  if (!master.isSome() || master.get() == pid) {
     LOG(WARNING) << "Master disconnected!"
                  << " Waiting for a new master to be elected";
     // TODO(benh): After so long waiting for a master, commit suicide.
@@ -2236,7 +2231,7 @@ void Slave::executorTerminated(
         message.mutable_executor_id()->MergeFrom(executorId);
         message.set_status(status);
 
-        send(master, message);
+        if (master.isSome()) { send(master.get(), message); }
       }
 
       // Remove the executor if either the framework is terminating or
@@ -2747,6 +2742,9 @@ void Slave::__recover(const Future<Nothing>& future)
 
   LOG(INFO) << "Finished recovery";
 
+  CHECK_EQ(RECOVERING, state);
+  state = DISCONNECTED;
+
   // Schedule all old slave directories for garbage collection.
   // TODO(vinod): Do this as part of recovery. This needs a fix
   // in the recovery code, to recover all slaves instead of only
@@ -2790,18 +2788,12 @@ void Slave::__recover(const Future<Nothing>& future)
   // in 'cleanup' mode.
   if (frameworks.empty() && flags.recover == "cleanup") {
     terminate(self());
-  } else if (flags.recover == "reconnect") {
-    // Re-register if reconnecting.
-    // NOTE: Since the slave in cleanup mode never re-registers, if
-    // the master fails over it will not forward the updates from
-    // the "unknown" slave to the scheduler. This could lead to the
-    // slave waiting indefinitely for acknowledgements. The master's
-    // registrar could help in handling this correctly.
-    state = DISCONNECTED;
-    if (master) {
-      doReliableRegistration();
-    }
+    return;
   }
+
+  // Start detecting masters.
+  detector->detect(master)
+    .onAny(defer(self(), &Slave::detected, lambda::_1));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index b39eaf4..6d7c3e8 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -30,6 +30,7 @@
 #include <mesos/resources.hpp>
 
 #include <process/http.hpp>
+#include <process/future.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 
@@ -38,11 +39,14 @@
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
 #include <stout/multihashmap.hpp>
+#include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/owned.hpp>
 #include <stout/path.hpp>
 #include <stout/uuid.hpp>
 
+#include "master/detector.hpp"
+
 #include "slave/constants.hpp"
 #include "slave/flags.hpp"
 #include "slave/gc.hpp"
@@ -59,9 +63,11 @@
 
 #include "messages/messages.hpp"
 
-
 namespace mesos {
 namespace internal {
+
+class MasterDetector; // Forward declaration.
+
 namespace slave {
 
 using namespace process;
@@ -71,22 +77,19 @@ class StatusUpdateManager;
 struct Executor;
 struct Framework;
 
-
 class Slave : public ProtobufProcess<Slave>
 {
 public:
   Slave(const Flags& flags,
         bool local,
-        Isolator *isolator,
+        MasterDetector* detector,
+        Isolator* isolator,
         Files* files);
 
   virtual ~Slave();
 
   void shutdown(const process::UPID& from);
 
-  void newMasterDetected(const UPID& pid);
-  void noMasterDetected();
-  void masterDetectionFailure();
   void registered(const process::UPID& from, const SlaveID& slaveId);
   void reregistered(const process::UPID& from, const SlaveID& slaveId);
   void doReliableRegistration();
@@ -215,6 +218,9 @@ protected:
 
   Nothing detachFile(const std::string& path);
 
+  // Invoked whenever the detector detects a change in masters.
+  void detected(const Future<Result<UPID> >& pid);
+
   // Helper routine to lookup a framework.
   Framework* getFramework(const FrameworkID& frameworkId);
 
@@ -308,7 +314,7 @@ private:
 
   SlaveInfo info;
 
-  UPID master;
+  Result<UPID> master;
 
   Resources resources;
   Attributes attributes;
@@ -317,6 +323,8 @@ private:
 
   boost::circular_buffer<Owned<Framework> > completedFrameworks;
 
+  MasterDetector* detector;
+
   Isolator* isolator;
   Files* files;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator_tests.cpp b/src/tests/allocator_tests.cpp
index b0beb72..61ab235 100644
--- a/src/tests/allocator_tests.cpp
+++ b/src/tests/allocator_tests.cpp
@@ -30,9 +30,8 @@
 #include <process/gmock.hpp>
 #include <process/pid.hpp>
 
-#include "detector/detector.hpp"
-
 #include "master/allocator.hpp"
+#include "master/detector.hpp"
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/authentication_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/authentication_tests.cpp b/src/tests/authentication_tests.cpp
index 48a9323..cc8b7a9 100644
--- a/src/tests/authentication_tests.cpp
+++ b/src/tests/authentication_tests.cpp
@@ -277,8 +277,9 @@ TEST_F(AuthenticationTest, MasterFailover)
   ASSERT_SOME(master);
 
   MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  Owned<StandaloneMasterDetector> detector =
+    new StandaloneMasterDetector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, detector.get());
 
   // Drop the authenticate message from the scheduler.
   Future<AuthenticateMessage> authenticateMessage =
@@ -287,7 +288,6 @@ TEST_F(AuthenticationTest, MasterFailover)
   driver.start();
 
   AWAIT_READY(authenticateMessage);
-  UPID frameworkPid = authenticateMessage.get().pid();
 
   // While the authentication is in progress simulate a failed over
   // master by restarting the master.
@@ -299,12 +299,8 @@ TEST_F(AuthenticationTest, MasterFailover)
   EXPECT_CALL(sched, registered(&driver, _, _))
     .WillOnce(FutureSatisfy(&registered));
 
-  // Send a new master detected message to inform the scheduler
-  // about the new master.
-  NewMasterDetectedMessage newMasterDetectedMsg;
-  newMasterDetectedMsg.set_pid(master.get());
-
-  process::post(frameworkPid, newMasterDetectedMsg);
+  // Appoint a new master and inform the scheduler about it.
+  detector->appoint(master.get());
 
   // Scheduler should successfully register with the new master.
   AWAIT_READY(registered);
@@ -325,11 +321,9 @@ TEST_F(AuthenticationTest, LeaderElection)
   ASSERT_SOME(master);
 
   MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  Future<AuthenticateMessage> authenticateMessage =
-    FUTURE_PROTOBUF(AuthenticateMessage(), _, _);
+  Owned<StandaloneMasterDetector> detector =
+    new StandaloneMasterDetector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, detector.get());
 
   // Drop the AuthenticationStepMessage from authenticator.
   Future<AuthenticationStepMessage> authenticationStepMessage =
@@ -337,10 +331,6 @@ TEST_F(AuthenticationTest, LeaderElection)
 
   driver.start();
 
-  // Grab the framework pid.
-  AWAIT_READY(authenticateMessage);
-  UPID frameworkPid = authenticateMessage.get().pid();
-
   // Drop the intermediate SASL message so that authentication fails.
   AWAIT_READY(authenticationStepMessage);
 
@@ -348,12 +338,8 @@ TEST_F(AuthenticationTest, LeaderElection)
   EXPECT_CALL(sched, registered(&driver, _, _))
     .WillOnce(FutureSatisfy(&registered));
 
-  // Send a new master detected message to inform the scheduler
-  // about the new master after a leader election.
-  NewMasterDetectedMessage newMasterDetectedMsg;
-  newMasterDetectedMsg.set_pid(master.get());
-
-  process::post(frameworkPid, newMasterDetectedMsg);
+  // Appoint a new master and inform the scheduler about it.
+  detector->appoint(master.get());
 
   // Scheduler should successfully register with the new master.
   AWAIT_READY(registered);
@@ -375,11 +361,9 @@ TEST_F(AuthenticationTest, SchedulerFailover)
 
   // Launch the first (i.e., failing) scheduler.
   MockScheduler sched1;
-  MesosSchedulerDriver driver1(
-      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  Future<AuthenticateMessage> authenticateMessage =
-    FUTURE_PROTOBUF(AuthenticateMessage(), _, _);
+  Owned<StandaloneMasterDetector> detector =
+    new StandaloneMasterDetector(master.get());
+  TestingMesosSchedulerDriver driver1(&sched1, detector.get());
 
   Future<FrameworkID> frameworkId;
   EXPECT_CALL(sched1, registered(&driver1, _, _))
@@ -387,10 +371,6 @@ TEST_F(AuthenticationTest, SchedulerFailover)
 
   driver1.start();
 
-  // Grab the framework pid.
-  AWAIT_READY(authenticateMessage);
-  UPID frameworkPid = authenticateMessage.get().pid();
-
   AWAIT_READY(frameworkId);
 
   // Drop the AuthenticationStepMessage from authenticator
@@ -400,11 +380,8 @@ TEST_F(AuthenticationTest, SchedulerFailover)
 
   EXPECT_CALL(sched1, disconnected(&driver1));
 
-  // Send a NewMasterDetected message to elicit authentication.
-  NewMasterDetectedMessage newMasterDetectedMsg;
-  newMasterDetectedMsg.set_pid(master.get());
-
-  process::post(frameworkPid, newMasterDetectedMsg);
+  // Appoint a new master and inform the scheduler about it.
+  detector->appoint(master.get());
 
   AWAIT_READY(authenticationStepMessage);
 


[7/8] Replaced usage of old detector with new Master contender and detector abstractions.

Posted by vi...@apache.org.
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/cluster.hpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp
index bea395a..fa64ed1 100644
--- a/src/tests/cluster.hpp
+++ b/src/tests/cluster.hpp
@@ -33,11 +33,11 @@
 #include <stout/path.hpp>
 #include <stout/try.hpp>
 
-#include "detector/detector.hpp"
-
 #include "files/files.hpp"
 
 #include "master/allocator.hpp"
+#include "master/contender.hpp"
+#include "master/detector.hpp"
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/flags.hpp"
 #include "master/master.hpp"
@@ -90,9 +90,7 @@ public:
     Try<Nothing> stop(const process::PID<master::Master>& pid);
 
     // Returns a new master detector for this instance of masters.
-    Owned<MasterDetector> detector(
-        const process::PID<slave::Slave>& pid,
-        const slave::Flags& flags);
+    Owned<MasterDetector> detector();
 
   private:
     // Not copyable, not assignable.
@@ -109,6 +107,7 @@ public:
         : master(NULL),
           allocator(NULL),
           allocatorProcess(NULL),
+          contender(NULL),
           detector(NULL) {}
 
       master::Master* master;
@@ -117,6 +116,7 @@ public:
       state::Storage* storage;
       state::protobuf::State* state;
       master::Registrar* registrar;
+      MasterContender* contender;
       MasterDetector* detector;
     };
 
@@ -145,6 +145,18 @@ public:
         slave::Isolator* isolator,
         const slave::Flags& flags = slave::Flags());
 
+    // Start and manage a new slave injecting the specified Master
+    // Detector. The detector is expected to outlive the launched
+    // slave (i.e., until it is stopped via Slaves::stop).
+    Try<process::PID<slave::Slave> > start(
+        Owned<MasterDetector> detector,
+        const slave::Flags& flags = slave::Flags());
+
+    Try<process::PID<slave::Slave> > start(
+        slave::Isolator* isolator,
+        Owned<MasterDetector> detector,
+        const slave::Flags& flags = slave::Flags());
+
     // Stops and cleans up a slave at the specified PID. If 'shutdown'
     // is true than the slave is sent a shutdown message instead of
     // being terminated.
@@ -168,6 +180,8 @@ public:
           slave(NULL),
           detector(NULL) {}
 
+      // Only register the isolator here if it is created within the
+      // Cluster.
       slave::Isolator* isolator;
       slave::Slave* slave;
       Owned<MasterDetector> detector;
@@ -249,20 +263,30 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
   master.state = new state::protobuf::State(master.storage);
   master.registrar = new master::Registrar(master.state);
 
+  if (url.isSome()) {
+    master.contender = new ZooKeeperMasterContender(url.get());
+    master.detector = new ZooKeeperMasterDetector(url.get());
+  } else {
+    master.contender = new StandaloneMasterContender();
+    master.detector = new StandaloneMasterDetector();
+  }
+
   master.master = new master::Master(
       master.allocator,
       master.registrar,
       &cluster->files,
+      master.contender,
+      master.detector,
       flags);
 
-  process::PID<master::Master> pid = process::spawn(master.master);
-
-  if (url.isSome()) {
-    master.detector = new ZooKeeperMasterDetector(url.get(), pid, true, true);
-  } else {
-    master.detector = new BasicMasterDetector(pid);
+  if (url.isNone()) {
+    // This means we are using the StandaloneMasterDetector.
+    dynamic_cast<StandaloneMasterDetector*>(master.detector)->appoint(
+        master.master->self());
   }
 
+  process::PID<master::Master> pid = process::spawn(master.master);
+
   masters[pid] = master;
 
   return pid;
@@ -299,20 +323,30 @@ inline Try<process::PID<master::Master> > Cluster::Masters::start(
   master.state = new state::protobuf::State(master.storage);
   master.registrar = new master::Registrar(master.state);
 
+  if (url.isSome()) {
+    master.contender = new ZooKeeperMasterContender(url.get());
+    master.detector = new ZooKeeperMasterDetector(url.get());
+  } else {
+    master.contender = new StandaloneMasterContender();
+    master.detector = new StandaloneMasterDetector();
+  }
+
   master.master = new master::Master(
       master.allocator,
       master.registrar,
       &cluster->files,
+      master.contender,
+      master.detector,
       flags);
 
-  process::PID<master::Master> pid = process::spawn(master.master);
-
-  if (url.isSome()) {
-    master.detector = new ZooKeeperMasterDetector(url.get(), pid, true, true);
-  } else {
-    master.detector = new BasicMasterDetector(pid);
+  if (url.isNone()) {
+    // This means we are using the StandaloneMasterDetector.
+    dynamic_cast<StandaloneMasterDetector*>(master.detector)->appoint(
+        master.master->self());
   }
 
+  process::PID<master::Master> pid = process::spawn(master.master);
+
   masters[pid] = master;
 
   return pid;
@@ -339,6 +373,7 @@ inline Try<Nothing> Cluster::Masters::stop(
   delete master.state;
   delete master.storage;
 
+  delete master.contender;
   delete master.detector;
 
   masters.erase(pid);
@@ -347,16 +382,15 @@ inline Try<Nothing> Cluster::Masters::stop(
 }
 
 
-inline Owned<MasterDetector> Cluster::Masters::detector(
-    const process::PID<slave::Slave>& pid,
-    const slave::Flags& flags)
+inline Owned<MasterDetector> Cluster::Masters::detector()
 {
   if (url.isSome()) {
-    return new ZooKeeperMasterDetector(url.get(), pid, false, flags.quiet);
+    return new ZooKeeperMasterDetector(url.get());
   }
 
   CHECK(masters.size() == 1);
-  return new BasicMasterDetector(masters.begin()->first, pid);
+
+  return new StandaloneMasterDetector(masters.begin()->first);
 }
 
 
@@ -392,11 +426,12 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
   slave.isolator = new slave::ProcessIsolator();
   process::spawn(slave.isolator);
 
-  slave.slave = new slave::Slave(flags, true, slave.isolator, &cluster->files);
-  process::PID<slave::Slave> pid = process::spawn(slave.slave);
-
   // Get a detector for the master(s).
-  slave.detector = masters->detector(pid, flags);
+  slave.detector = masters->detector();
+
+  slave.slave = new slave::Slave(flags, true, slave.detector.get(), slave.isolator,
+      &cluster->files);
+  process::PID<slave::Slave> pid = process::spawn(slave.slave);
 
   slaves[pid] = slave;
 
@@ -408,15 +443,33 @@ inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
     slave::Isolator* isolator,
     const slave::Flags& flags)
 {
+  return start(isolator, masters->detector(), flags);
+}
+
+
+inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
+    Owned<MasterDetector> detector,
+    const slave::Flags& flags)
+{
+  return start(new slave::ProcessIsolator(), detector, flags);
+}
+
+
+inline Try<process::PID<slave::Slave> > Cluster::Slaves::start(
+    slave::Isolator* isolator,
+    Owned<MasterDetector> detector,
+    const slave::Flags& flags)
+{
   // TODO(benh): Create a work directory if using the default.
 
   Slave slave;
 
-  slave.slave = new slave::Slave(flags, true, isolator, &cluster->files);
-  process::PID<slave::Slave> pid = process::spawn(slave.slave);
-
   // Get a detector for the master(s).
-  slave.detector = masters->detector(pid, flags);
+  slave.detector = detector;
+
+  slave.slave = new slave::Slave(flags, true, slave.detector.get(),
+      isolator, &cluster->files);
+  process::PID<slave::Slave> pid = process::spawn(slave.slave);
 
   slaves[pid] = slave;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index d521f4b..6cb5829 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -218,7 +218,10 @@ TEST_F(FaultToleranceTest, PartitionedSlaveReregistration)
 
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
 
-  Try<PID<Slave> > slave = StartSlave(&exec);
+  StandaloneMasterDetector* detector =
+    new StandaloneMasterDetector(master.get());
+
+  Try<PID<Slave> > slave = StartSlave(&exec, Owned<MasterDetector>(detector));
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -318,9 +321,9 @@ TEST_F(FaultToleranceTest, PartitionedSlaveReregistration)
   AWAIT_READY(slaveLost);
 
   // We now complete the partition on the slave side as well. This
-  // is done by simulating a NoMasterDetectedMessage which would
-  // normally occur during a network partition.
-  process::post(slave.get(), NoMasterDetectedMessage());
+  // is done by simulating a master loss event which would normally
+  // occur during a network partition.
+  detector->appoint(None());
 
   Future<Nothing> shutdown;
   EXPECT_CALL(exec, shutdown(_))
@@ -329,9 +332,7 @@ TEST_F(FaultToleranceTest, PartitionedSlaveReregistration)
   shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
 
   // Have the slave re-register with the master.
-  NewMasterDetectedMessage newMasterDetectedMessage;
-  newMasterDetectedMessage.set_pid(master.get());
-  process::post(slave.get(), newMasterDetectedMessage);
+  detector->appoint(master.get());
 
   // Upon re-registration, the master will shutdown the slave.
   // The slave will then shut down the executor.
@@ -606,8 +607,9 @@ TEST_F(FaultToleranceTest, MasterFailover)
   ASSERT_SOME(master);
 
   MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  StandaloneMasterDetector* detector =
+    new StandaloneMasterDetector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, detector);
 
   Future<process::Message> frameworkRegisteredMessage =
     FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
@@ -637,10 +639,7 @@ TEST_F(FaultToleranceTest, MasterFailover)
     .WillOnce(FutureSatisfy(&registered2));
 
   // Simulate a new master detected message to the scheduler.
-  NewMasterDetectedMessage newMasterDetectedMsg;
-  newMasterDetectedMsg.set_pid(master.get());
-
-  process::post(frameworkRegisteredMessage.get().to, newMasterDetectedMsg);
+  detector->appoint(master.get());
 
   // Scheduler should retry authentication.
   AWAIT_READY(authenticateMessage);
@@ -651,6 +650,8 @@ TEST_F(FaultToleranceTest, MasterFailover)
   driver.stop();
   driver.join();
 
+  delete detector;
+
   Shutdown();
 }
 
@@ -863,12 +864,19 @@ TEST_F(FaultToleranceTest, FrameworkReregister)
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
-  Try<PID<Slave> > slave = StartSlave();
+  Owned<MasterDetector> slaveDetector =
+    new StandaloneMasterDetector(master.get());
+  Try<PID<Slave> > slave = StartSlave(slaveDetector);
   ASSERT_SOME(slave);
 
+
+  // Create a detector for the scheduler driver because we want the
+  // spurious leading master change to be known by the scheduler
+  // driver only.
+  Owned<MasterDetector> schedDetector =
+    new StandaloneMasterDetector(master.get());
   MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  TestingMesosSchedulerDriver driver(&sched, schedDetector.get());
 
   Future<Nothing> registered;
   EXPECT_CALL(sched, registered(&driver, _, _))
@@ -896,12 +904,9 @@ TEST_F(FaultToleranceTest, FrameworkReregister)
   EXPECT_CALL(sched, offerRescinded(&driver, _))
     .Times(AtMost(1));
 
-  // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper
-  // expiration) at the scheduler.
-  NewMasterDetectedMessage newMasterDetectedMsg;
-  newMasterDetectedMsg.set_pid(master.get());
-
-  process::post(message.get().to, newMasterDetectedMsg);
+  // Simulate a spurious leading master change at the scheduler.
+  dynamic_cast<StandaloneMasterDetector*>(schedDetector.get())->appoint(
+      master.get());
 
   AWAIT_READY(disconnected);
 
@@ -923,8 +928,9 @@ TEST_F(FaultToleranceTest, TaskLost)
   ASSERT_SOME(slave);
 
   MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  StandaloneMasterDetector* detector =
+    new StandaloneMasterDetector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, detector);
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
@@ -947,8 +953,8 @@ TEST_F(FaultToleranceTest, TaskLost)
   EXPECT_CALL(sched, disconnected(&driver))
     .WillOnce(FutureSatisfy(&disconnected));
 
-  // Simulate a spurious noMasterDetected event at the scheduler.
-  process::post(message.get().to, NoMasterDetectedMessage());
+  // Simulate a spurious master loss event at the scheduler.
+  detector->appoint(None());
 
   AWAIT_READY(disconnected);
 
@@ -1099,12 +1105,14 @@ TEST_F(FaultToleranceTest, ReregisterFrameworkExitedExecutor)
 
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
   TestingIsolator isolator(&exec);
-  Try<PID<Slave> > slave = StartSlave(&isolator);
+  Owned<MasterDetector> slaveDetector = new StandaloneMasterDetector(master.get());
+  Try<PID<Slave> > slave = StartSlave(&isolator, slaveDetector);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  Owned<StandaloneMasterDetector> schedDetector =
+    new StandaloneMasterDetector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, schedDetector.get());
 
   Future<process::Message> frameworkRegisteredMessage =
     FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
@@ -1153,14 +1161,11 @@ TEST_F(FaultToleranceTest, ReregisterFrameworkExitedExecutor)
   master = StartMaster();
   ASSERT_SOME(master);
 
-  // Simulate a new master detected message to the slave.
-  NewMasterDetectedMessage newMasterDetectedMsg;
-  newMasterDetectedMsg.set_pid(master.get());
-
   Future<SlaveReregisteredMessage> slaveReregisteredMessage =
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
-  process::post(slave.get(), newMasterDetectedMsg);
+  dynamic_cast<StandaloneMasterDetector*>(slaveDetector.get())->appoint(
+      master.get());
 
   // Wait for the slave to re-register.
   AWAIT_READY(slaveReregisteredMessage);
@@ -1183,7 +1188,7 @@ TEST_F(FaultToleranceTest, ReregisterFrameworkExitedExecutor)
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
-  process::post(frameworkRegisteredMessage.get().to, newMasterDetectedMsg);
+  schedDetector->appoint(master.get());
 
   AWAIT_READY(frameworkRegisteredMessage2);
 
@@ -1484,7 +1489,8 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration)
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
-  Try<PID<Slave> > slave = StartSlave();
+  StandaloneMasterDetector* detector = new StandaloneMasterDetector(master.get());
+  Try<PID<Slave> > slave = StartSlave(Owned<MasterDetector>(detector));
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -1505,13 +1511,9 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration)
   Future<SlaveReregisteredMessage> slaveReregisteredMessage =
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
-  // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper
+  // Simulate a spurious master change event (e.g., due to ZooKeeper
   // expiration) at the slave.
-
-  NewMasterDetectedMessage message;
-  message.set_pid(master.get());
-
-  process::post(slave.get(), message);
+  detector->appoint(master.get());
 
   AWAIT_READY(slaveReregisteredMessage);
 
@@ -1535,7 +1537,10 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
   TestingIsolator isolator(&exec);
 
-  Try<PID<Slave> > slave = StartSlave(&isolator);
+  StandaloneMasterDetector* detector =
+    new StandaloneMasterDetector(master.get());
+  Try<PID<Slave> > slave =
+    StartSlave(&isolator, Owned<MasterDetector>(detector));
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -1580,16 +1585,13 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
 
   AWAIT_READY(executorExitedMessage);
 
-  // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper
+  // Simulate a spurious master change event (e.g., due to ZooKeeper
   // expiration) at the slave to force re-registration.
   Future<TaskStatus> status2;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status2));
 
-  NewMasterDetectedMessage message;
-  message.set_pid(master.get());
-
-  process::post(slave.get(), message);
+  detector->appoint(master.get());
 
   AWAIT_READY(status2);
   EXPECT_EQ(TASK_LOST, status2.get().state());
@@ -1609,7 +1611,9 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks)
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
-  Try<PID<Slave> > slave = StartSlave();
+  StandaloneMasterDetector* detector =
+    new StandaloneMasterDetector(master.get());
+  Try<PID<Slave> > slave = StartSlave(Owned<MasterDetector>(detector));
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -1655,13 +1659,9 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks)
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status));
 
-  // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper
+  // Simulate a spurious master change event (e.g., due to ZooKeeper
   // expiration) at the slave to force re-registration.
-
-  NewMasterDetectedMessage message;
-  message.set_pid(master.get());
-
-  process::post(slave.get(), message);
+  detector->appoint(master.get());
 
   AWAIT_READY(slaveReregisteredMessage);
 
@@ -1687,7 +1687,9 @@ TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
 
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
 
-  Try<PID<Slave> > slave = StartSlave(&exec);
+  StandaloneMasterDetector* detector =
+    new StandaloneMasterDetector(master.get());
+  Try<PID<Slave> > slave = StartSlave(&exec, Owned<MasterDetector>(detector));
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -1741,12 +1743,9 @@ TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
   Future<SlaveReregisteredMessage> slaveReregisteredMessage =
     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
 
-  // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper
+  // Simulate a spurious master change event (e.g., due to ZooKeeper
   // expiration) at the slave to force re-registration.
-  NewMasterDetectedMessage message;
-  message.set_pid(master.get());
-
-  process::post(slave.get(), message);
+  detector->appoint(master.get());
 
   AWAIT_READY(slaveReregisteredMessage);
 
@@ -1755,7 +1754,7 @@ TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
   // the only update the scheduler receives is the retried
   // TASK_FINISHED update.
   // NOTE: The status update manager resends the status update when
-  // it receives a NewMasterDetected message.
+  // it detects a new master.
   Clock::settle();
 
   AWAIT_READY(status);

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/gc_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/gc_tests.cpp b/src/tests/gc_tests.cpp
index 5459b78..538976a 100644
--- a/src/tests/gc_tests.cpp
+++ b/src/tests/gc_tests.cpp
@@ -39,13 +39,12 @@
 #include <stout/nothing.hpp>
 #include <stout/os.hpp>
 
-#include "detector/detector.hpp"
-
 #include "logging/logging.hpp"
 
 #include "local/local.hpp"
 
 #include "master/master.hpp"
+#include "master/detector.hpp"
 
 #include "slave/constants.hpp"
 #include "slave/flags.hpp"

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/isolator_tests.cpp b/src/tests/isolator_tests.cpp
index ab5b00a..45a41ca 100644
--- a/src/tests/isolator_tests.cpp
+++ b/src/tests/isolator_tests.cpp
@@ -30,9 +30,8 @@
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 
-#include "detector/detector.hpp"
-
 #include "master/master.hpp"
+#include "master/detector.hpp"
 
 #include "slave/flags.hpp"
 #ifdef __linux__

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index bf790d2..37ee7a0 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -876,12 +876,14 @@ TEST_F(MasterTest, MasterInfoOnReElection)
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
-  Try<PID<Slave> > slave = StartSlave();
+  StandaloneMasterDetector* detector =
+    new StandaloneMasterDetector(master.get());
+
+  Try<PID<Slave> > slave = StartSlave(Owned<MasterDetector>(detector));
   ASSERT_SOME(slave);
 
   MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  TestingMesosSchedulerDriver driver(&sched, detector);
 
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
@@ -896,11 +898,6 @@ TEST_F(MasterTest, MasterInfoOnReElection)
 
   AWAIT_READY(message);
 
-  // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper
-  // expiration) at the scheduler.
-  NewMasterDetectedMessage newMasterDetectedMsg;
-  newMasterDetectedMsg.set_pid(master.get());
-
   Future<Nothing> disconnected;
   EXPECT_CALL(sched, disconnected(&driver))
     .WillOnce(FutureSatisfy(&disconnected));
@@ -909,7 +906,9 @@ TEST_F(MasterTest, MasterInfoOnReElection)
   EXPECT_CALL(sched, reregistered(&driver, _))
     .WillOnce(FutureArg<1>(&masterInfo));
 
-  process::post(message.get().to, newMasterDetectedMsg);
+  // Simulate a spurious event (e.g., due to ZooKeeper
+  // expiration) at the scheduler.
+  detector->appoint(master.get());
 
   AWAIT_READY(disconnected);
 
@@ -984,12 +983,14 @@ TEST_F(MasterTest, MasterLost)
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
 
+  Owned<StandaloneMasterDetector> detector = new StandaloneMasterDetector();
+  detector->appoint(master.get());
+
   Try<PID<Slave> > slave = StartSlave();
   ASSERT_SOME(slave);
 
   MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  TestingMesosSchedulerDriver driver(&sched, detector.get());
 
   EXPECT_CALL(sched, registered(&driver, _, _))
     .Times(1);
@@ -1008,8 +1009,8 @@ TEST_F(MasterTest, MasterLost)
   EXPECT_CALL(sched, disconnected(&driver))
     .WillOnce(FutureSatisfy(&disconnected));
 
-  // Simulate a spurious noMasterDetected event at the scheduler.
-  process::post(message.get().to, NoMasterDetectedMessage());
+  // Simulate a spurious event at the scheduler.
+  detector->appoint(None());
 
   AWAIT_READY(disconnected);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 351ffd6..5359394 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -13,6 +13,7 @@
 #include "tests/isolator.hpp"
 #include "tests/mesos.hpp"
 
+using namespace process;
 
 namespace mesos {
 namespace internal {
@@ -134,6 +135,46 @@ Try<process::PID<slave::Slave> > MesosTest::StartSlave(
 }
 
 
+Try<process::PID<slave::Slave> > MesosTest::StartSlave(
+    slave::Isolator* isolator,
+    Owned<MasterDetector> detector,
+    const Option<slave::Flags>& flags)
+{
+  return cluster.slaves.start(
+      isolator, detector, flags.isNone() ? CreateSlaveFlags() : flags.get());
+}
+
+
+Try<PID<slave::Slave> > MesosTest::StartSlave(
+    Owned<MasterDetector> detector,
+    const Option<slave::Flags>& flags)
+{
+  return cluster.slaves.start(
+      detector, flags.isNone() ? CreateSlaveFlags() : flags.get());
+}
+
+
+Try<PID<slave::Slave> > MesosTest::StartSlave(
+    MockExecutor* executor,
+    Owned<MasterDetector> detector,
+    const Option<slave::Flags>& flags)
+{
+  TestingIsolator* isolator = new TestingIsolator(executor);
+
+  Try<process::PID<slave::Slave> > pid = cluster.slaves.start(
+      isolator, detector, flags.isNone() ? CreateSlaveFlags() : flags.get());
+
+  if (pid.isError()) {
+    delete isolator;
+    return pid;
+  }
+
+  isolators[pid.get()] = isolator;
+
+  return pid;
+}
+
+
 void MesosTest::Stop(const process::PID<master::Master>& pid)
 {
   cluster.masters.stop(pid);

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index ad39c09..3d510b2 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -20,6 +20,7 @@
 #define __TESTS_MESOS_HPP__
 
 #include <map>
+#include <set>
 #include <string>
 
 #include <mesos/executor.hpp>
@@ -42,6 +43,7 @@
 #include "messages/messages.hpp" // For google::protobuf::Message.
 
 #include "master/allocator.hpp"
+#include "master/detector.hpp"
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 
@@ -98,6 +100,24 @@ protected:
       slave::Isolator* isolator,
       const Option<slave::Flags>& flags = None());
 
+  // Starts a slave with the specified isolator, detector and flags.
+  virtual Try<process::PID<slave::Slave> > StartSlave(
+      slave::Isolator* isolator,
+      Owned<MasterDetector> detector,
+      const Option<slave::Flags>& flags = None());
+
+  // Starts a slave with the specified MasterDetector and flags.
+  virtual Try<process::PID<slave::Slave> > StartSlave(
+      Owned<MasterDetector> detector,
+      const Option<slave::Flags>& flags = None());
+
+  // Starts a slave with the specified mock executor, MasterDetector
+  // and flags.
+  virtual Try<process::PID<slave::Slave> > StartSlave(
+      MockExecutor* executor,
+      Owned<MasterDetector> detector,
+      const Option<slave::Flags>& flags = None());
+
   // Stop the specified master.
   virtual void Stop(
       const process::PID<master::Master>& pid);
@@ -306,6 +326,45 @@ public:
 };
 
 
+class TestingMesosSchedulerDriver : public MesosSchedulerDriver
+{
+public:
+  TestingMesosSchedulerDriver(
+      Scheduler* scheduler,
+      const FrameworkInfo& framework,
+      const Credential& credential,
+      MasterDetector* _detector)
+    : MesosSchedulerDriver(scheduler, framework, "", credential)
+  {
+    detector = _detector;
+  }
+
+  // A constructor that uses the DEFAULT_FRAMEWORK_INFO &
+  // DEFAULT_CREDENTIAL.
+  TestingMesosSchedulerDriver(
+      Scheduler* scheduler,
+      MasterDetector* _detector)
+    : MesosSchedulerDriver(
+          scheduler,
+          DEFAULT_FRAMEWORK_INFO,
+          "",
+          DEFAULT_CREDENTIAL)
+  {
+    detector = _detector;
+  }
+
+  ~TestingMesosSchedulerDriver()
+  {
+    // This is necessary because in the base class the detector is
+    // internally created and deleted whereas in the testing driver
+    // it is injected and thus should not be deleted in the
+    // destructor. Setting it to null allows the detector to survive
+    // MesosSchedulerDriver::~MesosSchedulerDriver().
+    detector = NULL;
+  }
+};
+
+
 template <typename T = master::allocator::AllocatorProcess>
 class MockAllocatorProcess : public master::allocator::AllocatorProcess
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 3ee7b87..eb2dc33 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -38,12 +38,11 @@
 
 #include "common/protobuf_utils.hpp"
 
-#include "detector/detector.hpp"
-
 #ifdef __linux__
 #include "linux/cgroups.hpp"
 #endif
 
+#include "master/detector.hpp"
 #include "master/master.hpp"
 
 #include "slave/gc.hpp"
@@ -2321,8 +2320,10 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
   frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
   frameworkInfo.set_checkpoint(true);
 
-  MesosSchedulerDriver driver(
-      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+  Owned<StandaloneMasterDetector> detector =
+    new StandaloneMasterDetector(master.get());
+  TestingMesosSchedulerDriver driver(
+      &sched, frameworkInfo, DEFAULT_CREDENTIAL, detector.get());
 
   EXPECT_CALL(sched, registered(_, _, _));
 
@@ -2366,11 +2367,8 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
   EXPECT_CALL(sched, registered(&driver, _, _))
     .WillOnce(FutureSatisfy(&registered));
 
-  // Simulate a new master detected message to the scheduler.
-  NewMasterDetectedMessage newMasterDetectedMsg;
-  newMasterDetectedMsg.set_pid(master.get());
-
-  process::post(frameworkRegisteredMessage.get().to, newMasterDetectedMsg);
+  // Simulate a new master detected event to the scheduler.
+  detector->appoint(master.get());
 
   // Framework should get a registered callback.
   AWAIT_READY(registered);


[2/8] git commit: Added local timeout so that Group operations fail after we locally determine that the session is timed out.

Posted by vi...@apache.org.
Added local timeout so that Group operations fail after
we locally determine that the session is timed out.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/15016


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c0a83ef9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c0a83ef9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c0a83ef9

Branch: refs/heads/master
Commit: c0a83ef92867aaabda1c0909167f66e50c70be78
Parents: c8e93e9
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Nov 19 10:36:17 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Nov 19 10:57:18 2013 -0800

----------------------------------------------------------------------
 src/zookeeper/group.cpp | 85 ++++++++++++++++++++++++++++++++++++++++++--
 src/zookeeper/group.hpp | 13 +++++++
 2 files changed, 95 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c0a83ef9/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index cae19d4..749104a 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -72,6 +72,25 @@ GroupProcess::GroupProcess(
 {}
 
 
+// TODO(xujyan): Reuse the peer constructor above once we switch to
+// C++ 11.
+GroupProcess::GroupProcess(
+    const URL& url,
+    const Duration& _timeout)
+  : servers(url.servers),
+    timeout(_timeout),
+    znode(strings::remove(url.path, "/", strings::SUFFIX)),
+    auth(url.authentication),
+    acl(url.authentication.isSome()
+        ? EVERYONE_READ_CREATOR_ALL
+        : ZOO_OPEN_ACL_UNSAFE),
+    watcher(NULL),
+    zk(NULL),
+    state(DISCONNECTED),
+    retrying(false)
+{}
+
+
 GroupProcess::~GroupProcess()
 {
   fail(&pending.joins, "No longer watching group");
@@ -302,6 +321,14 @@ void GroupProcess::connected(bool reconnect)
       abort(); // Cancels everything pending.
       return;
     }
+  } else {
+    LOG(INFO) << "Group process (" << self() << ")  reconnected to Zookeeper";
+
+    // Cancel and cleanup the reconnect timer (if necessary).
+    if (timer.isSome()) {
+      Timer::cancel(timer.get());
+      timer = None();
+    }
   }
 
   state = CONNECTED;
@@ -312,12 +339,49 @@ void GroupProcess::connected(bool reconnect)
 
 void GroupProcess::reconnecting()
 {
+  LOG(INFO) << "Lost connection to ZooKeeper, attempting to reconnect ...";
+
   state = CONNECTING;
+
+  // ZooKeeper won't tell us of a session expiration until we
+  // reconnect, which could occur much much later than the session was
+  // actually expired. This can lead to a prolonged split-brain
+  // scenario when network partitions occur. Rather than wait for a
+  // reconnection to occur (i.e., a network partition to be repaired)
+  // we create a local timer and "expire" our session prematurely if
+  // we haven't reconnected within the session expiration time out.
+  // The timer can be reset if the connection is restored.
+  CHECK(timer.isNone());
+  timer = delay(timeout, self(), &Self::timedout, zk->getSessionId());
+}
+
+
+void GroupProcess::timedout(const int64_t& sessionId)
+{
+  CHECK_NOTNULL(zk);
+
+  if (timer.isSome() &&
+      timer.get().timeout().expired() &&
+      zk->getSessionId() == sessionId) {
+    // The timer can be reset or replaced and 'zk' can be replaced
+    // since this method was dispatched.
+    std::ostringstream error_;
+    error_ << "Timed out waiting to reconnect to ZooKeeper (sessionId="
+           << std::hex << sessionId << ")";
+    error = error_.str();
+    abort();
+  }
 }
 
 
 void GroupProcess::expired()
 {
+  // Cancel and cleanup the reconnect timer (if necessary).
+  if (timer.isSome()) {
+    Timer::cancel(timer.get());
+    timer = None();
+  }
+
   // Invalidate the cache.
   memberships = None();
 
@@ -337,7 +401,9 @@ void GroupProcess::expired()
 
   state = DISCONNECTED;
 
-  delete zk;
+  delete CHECK_NOTNULL(zk);
+  delete CHECK_NOTNULL(watcher);
+  watcher = new ProcessWatcher<GroupProcess>(self());
   zk = new ZooKeeper(servers, timeout, watcher);
 
   state = CONNECTING;
@@ -675,8 +741,13 @@ void GroupProcess::abort()
   fail(&pending.datas, error.get());
   fail(&pending.watches, error.get());
 
-  // TODO(benh): Delete the ZooKeeper instance in order to terminate
-  // our session (cleaning up any ephemeral znodes as necessary)?
+  error = None();
+
+  // If we decide to abort, make sure we expire the session
+  // (cleaning up any ephemeral ZNodes as necessary). We also
+  // create a new ZooKeeper instance for clients that want to
+  // continue to reuse this group instance.
+  expired();
 }
 
 
@@ -690,6 +761,14 @@ Group::Group(const string& servers,
 }
 
 
+Group::Group(const URL& url,
+             const Duration& timeout)
+{
+  process = new GroupProcess(url, timeout);
+  spawn(process);
+}
+
+
 Group::~Group()
 {
   terminate(process);

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0a83ef9/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index c17dee4..04068e3 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -5,6 +5,7 @@
 #include <set>
 
 #include "process/future.hpp"
+#include "process/timer.hpp"
 
 #include <stout/duration.hpp>
 #include <stout/none.hpp>
@@ -40,6 +41,11 @@ public:
       return sequence == that.sequence;
     }
 
+    bool operator != (const Membership& that) const
+    {
+      return sequence != that.sequence;
+    }
+
     bool operator < (const Membership& that) const
     {
       return sequence < that.sequence;
@@ -92,6 +98,9 @@ public:
         const Duration& timeout,
         const std::string& znode,
         const Option<Authentication>& auth = None());
+  Group(const URL& url,
+        const Duration& timeout);
+
   ~Group();
 
   // Returns the result of trying to join a "group" in ZooKeeper. If
@@ -251,6 +260,10 @@ private:
   // Cache of owned + unowned, where 'None' represents an invalid
   // cache and 'Some' represents a valid cache.
   Option<std::set<Group::Membership> > memberships;
+
+  // The timer that determines whether we should quit waiting for the
+  // connection to be restored.
+  Option<process::Timer> timer;
 };
 
 } // namespace zookeeper {


[3/8] git commit: Moved GroupProcess into group.hpp and made it a public member of Group for testing.

Posted by vi...@apache.org.
Moved GroupProcess into group.hpp and made it a public member of
Group for testing.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/15221


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c8e93e96
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c8e93e96
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c8e93e96

Branch: refs/heads/master
Commit: c8e93e9634340c1a869a9dcea361da93fe078ca6
Parents: d46a240
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Nov 19 10:35:59 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Nov 19 10:57:18 2013 -0800

----------------------------------------------------------------------
 src/zookeeper/group.cpp | 127 +--------------------------------------
 src/zookeeper/group.hpp | 138 ++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 138 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c8e93e96/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index cd58d23..cae19d4 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -1,5 +1,4 @@
 #include <algorithm>
-#include <map>
 #include <queue>
 #include <utility>
 #include <vector>
@@ -20,7 +19,6 @@
 
 #include "logging/logging.hpp"
 
-#include "zookeeper/authentication.hpp"
 #include "zookeeper/group.hpp"
 #include "zookeeper/watcher.hpp"
 #include "zookeeper/zookeeper.hpp"
@@ -30,7 +28,6 @@ using namespace process;
 using process::wait; // Necessary on some OS's to disambiguate.
 
 using std::make_pair;
-using std::map;
 using std::queue;
 using std::set;
 using std::string;
@@ -40,129 +37,7 @@ using std::vector;
 namespace zookeeper {
 
 // Time to wait after retryable errors.
-const Duration RETRY_INTERVAL = Seconds(2);
-
-
-class GroupProcess : public Process<GroupProcess>
-{
-public:
-  GroupProcess(const string& servers,
-               const Duration& timeout,
-               const string& znode,
-               const Option<Authentication>& auth);
-  virtual ~GroupProcess();
-
-  virtual void initialize();
-
-  // Group implementation.
-  Future<Group::Membership> join(const string& data);
-  Future<bool> cancel(const Group::Membership& membership);
-  Future<string> data(const Group::Membership& membership);
-  Future<set<Group::Membership> > watch(
-      const set<Group::Membership>& expected);
-  Future<Option<int64_t> > session();
-
-  // ZooKeeper events.
-  void connected(bool reconnect);
-  void reconnecting();
-  void expired();
-  void updated(const string& path);
-  void created(const string& path);
-  void deleted(const string& path);
-
-private:
-  Result<Group::Membership> doJoin(const string& data);
-  Result<bool> doCancel(const Group::Membership& membership);
-  Result<string> doData(const Group::Membership& membership);
-
-  // Attempts to cache the current set of memberships.
-  bool cache();
-
-  // Updates any pending watches.
-  void update();
-
-  // Synchronizes pending operations with ZooKeeper and also attempts
-  // to cache the current set of memberships if necessary.
-  bool sync();
-
-  // Generic retry method. This mechanism is "generic" in the sense
-  // that it is not specific to any particular operation, but rather
-  // attempts to perform all pending operations (including caching
-  // memberships if necessary).
-  void retry(const Duration& duration);
-
-  // Fails all pending operations.
-  void abort();
-
-  Option<string> error; // Potential non-retryable error.
-
-  const string servers;
-  const Duration timeout;
-  const string znode;
-
-  Option<Authentication> auth; // ZooKeeper authentication.
-
-  const ACL_vector acl; // Default ACL to use.
-
-  Watcher* watcher;
-  ZooKeeper* zk;
-
-  enum State { // ZooKeeper connection state.
-    DISCONNECTED,
-    CONNECTING,
-    CONNECTED,
-  } state;
-
-  struct Join
-  {
-    Join(const string& _data) : data(_data) {}
-    string data;
-    Promise<Group::Membership> promise;
-  };
-
-  struct Cancel
-  {
-    Cancel(const Group::Membership& _membership)
-      : membership(_membership) {}
-    Group::Membership membership;
-    Promise<bool> promise;
-  };
-
-  struct Data
-  {
-    Data(const Group::Membership& _membership)
-      : membership(_membership) {}
-    Group::Membership membership;
-    Promise<string> promise;
-  };
-
-  struct Watch
-  {
-    Watch(const set<Group::Membership>& _expected)
-      : expected(_expected) {}
-    set<Group::Membership> expected;
-    Promise<set<Group::Membership> > promise;
-  };
-
-  struct {
-    queue<Join*> joins;
-    queue<Cancel*> cancels;
-    queue<Data*> datas;
-    queue<Watch*> watches;
-  } pending;
-
-  bool retrying;
-
-  // Expected ZooKeeper sequence numbers (either owned/created by this
-  // group instance or not) and the promise we associate with their
-  // "cancellation" (i.e., no longer part of the group).
-  map<uint64_t, Promise<bool>*> owned;
-  map<uint64_t, Promise<bool>*> unowned;
-
-  // Cache of owned + unowned, where 'None' represents an invalid
-  // cache and 'Some' represents a valid cache.
-  Option<set<Group::Membership> > memberships;
-};
+const Duration GroupProcess::RETRY_INTERVAL = Seconds(2);
 
 
 // Helper for failing a queue of promises.

http://git-wip-us.apache.org/repos/asf/mesos/blob/c8e93e96/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index 6b6c155..c17dee4 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -1,6 +1,7 @@
 #ifndef __ZOOKEEPER_GROUP_HPP__
 #define __ZOOKEEPER_GROUP_HPP__
 
+#include <map>
 #include <set>
 
 #include "process/future.hpp"
@@ -10,6 +11,11 @@
 #include <stout/option.hpp>
 
 #include "zookeeper/authentication.hpp"
+#include "zookeeper/url.hpp"
+
+// Forward declarations.
+class Watcher;
+class ZooKeeper;
 
 namespace zookeeper {
 
@@ -113,10 +119,140 @@ public:
   // or none if no session currently exists.
   process::Future<Option<int64_t> > session();
 
-private:
+  // Made public for testing purposes.
   GroupProcess* process;
 };
 
+
+class GroupProcess : public process::Process<GroupProcess>
+{
+public:
+  GroupProcess(const std::string& servers,
+               const Duration& timeout,
+               const std::string& znode,
+               const Option<Authentication>& auth);
+
+  GroupProcess(const URL& url,
+               const Duration& timeout);
+
+  virtual ~GroupProcess();
+
+  virtual void initialize();
+
+  static const Duration RETRY_INTERVAL;
+
+  // Group implementation.
+  process::Future<Group::Membership> join(const std::string& data);
+  process::Future<bool> cancel(const Group::Membership& membership);
+  process::Future<std::string> data(const Group::Membership& membership);
+  process::Future<std::set<Group::Membership> > watch(
+      const std::set<Group::Membership>& expected);
+  process::Future<Option<int64_t> > session();
+
+  // ZooKeeper events.
+  void connected(bool reconnect);
+  void reconnecting();
+  void expired();
+  void updated(const std::string& path);
+  void created(const std::string& path);
+  void deleted(const std::string& path);
+
+private:
+  Result<Group::Membership> doJoin(const std::string& data);
+  Result<bool> doCancel(const Group::Membership& membership);
+  Result<std::string> doData(const Group::Membership& membership);
+
+  // Attempts to cache the current set of memberships.
+  bool cache();
+
+  // Updates any pending watches.
+  void update();
+
+  // Synchronizes pending operations with ZooKeeper and also attempts
+  // to cache the current set of memberships if necessary.
+  bool sync();
+
+  // Generic retry method. This mechanism is "generic" in the sense
+  // that it is not specific to any particular operation, but rather
+  // attempts to perform all pending operations (including caching
+  // memberships if necessary).
+  void retry(const Duration& duration);
+
+  // Fails all pending operations.
+  void abort();
+
+  void timedout(const int64_t& sessionId);
+
+  Option<std::string> error; // Potential non-retryable error.
+
+  const std::string servers;
+  const Duration timeout;
+  const std::string znode;
+
+  Option<Authentication> auth; // ZooKeeper authentication.
+
+  const ACL_vector acl; // Default ACL to use.
+
+  Watcher* watcher;
+  ZooKeeper* zk;
+
+  enum State { // ZooKeeper connection state.
+    DISCONNECTED,
+    CONNECTING,
+    CONNECTED,
+  } state;
+
+  struct Join
+  {
+    Join(const std::string& _data) : data(_data) {}
+    std::string data;
+    process::Promise<Group::Membership> promise;
+  };
+
+  struct Cancel
+  {
+    Cancel(const Group::Membership& _membership)
+      : membership(_membership) {}
+    Group::Membership membership;
+    process::Promise<bool> promise;
+  };
+
+  struct Data
+  {
+    Data(const Group::Membership& _membership)
+      : membership(_membership) {}
+    Group::Membership membership;
+    process::Promise<std::string> promise;
+  };
+
+  struct Watch
+  {
+    Watch(const std::set<Group::Membership>& _expected)
+      : expected(_expected) {}
+    std::set<Group::Membership> expected;
+    process::Promise<std::set<Group::Membership> > promise;
+  };
+
+  struct {
+    std::queue<Join*> joins;
+    std::queue<Cancel*> cancels;
+    std::queue<Data*> datas;
+    std::queue<Watch*> watches;
+  } pending;
+
+  bool retrying;
+
+  // Expected ZooKeeper sequence numbers (either owned/created by this
+  // group instance or not) and the promise we associate with their
+  // "cancellation" (i.e., no longer part of the group).
+  std::map<uint64_t, process::Promise<bool>*> owned;
+  std::map<uint64_t, process::Promise<bool>*> unowned;
+
+  // Cache of owned + unowned, where 'None' represents an invalid
+  // cache and 'Some' represents a valid cache.
+  Option<std::set<Group::Membership> > memberships;
+};
+
 } // namespace zookeeper {
 
 #endif // __ZOOKEEPER_GROUP_HPP__


[5/8] git commit: Fixed a bug where Group::cancel can return a failed future if the membership is already cancelled.

Posted by vi...@apache.org.
Fixed a bug where Group::cancel can return a failed future
if the membership is already cancelled.

The API stipulates that a successful Future<bool> but with 'false'
value should be returned.

This can happen when:
1) The cancel() operation is queued up due to connectivity issue but
then invoked again by sync() the membership could be gone.
2) Even if not delayed, in cancel() we first checked whether the
membership exists in our local cache but the membership could have
expired by then (or when we invoke zk->remove(...)) but we have yet to
receive the update for it.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/15487


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/61aee928
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/61aee928
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/61aee928

Branch: refs/heads/master
Commit: 61aee928b7763b12849ceae359566f5842749774
Parents: c0a83ef
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Nov 19 10:36:37 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Nov 19 10:57:18 2013 -0800

----------------------------------------------------------------------
 src/zookeeper/group.cpp | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/61aee928/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 749104a..12c781b 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -500,6 +500,10 @@ Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
   if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
     CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
     return None();
+  } else if (code == ZNONODE) {
+    // This can happen because the membership could have expired but
+    // we have yet to receive the update about it.
+    return false;
   } else if (code != ZOK) {
     return Error(
         "Failed to remove ephemeral node '" + path +


[4/8] git commit: Added Zookeeper leader contender and detector abstractions.

Posted by vi...@apache.org.
Added Zookeeper leader contender and detector abstractions.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/13086


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ea5e10df
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ea5e10df
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ea5e10df

Branch: refs/heads/master
Commit: ea5e10dfa50791bfc6bb822426dd3fc35399735e
Parents: 61aee92
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Nov 19 10:38:42 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Nov 19 10:57:18 2013 -0800

----------------------------------------------------------------------
 src/Makefile.am                     |   4 +
 src/tests/zookeeper_test_server.cpp |   2 +-
 src/tests/zookeeper_tests.cpp       | 216 ++++++++++++++++++++++
 src/zookeeper/contender.cpp         | 299 +++++++++++++++++++++++++++++++
 src/zookeeper/contender.hpp         |  61 +++++++
 src/zookeeper/detector.cpp          | 173 ++++++++++++++++++
 src/zookeeper/detector.hpp          |  47 +++++
 7 files changed, 801 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ea5e10df/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index e6f7120..3c48aee 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -188,6 +188,8 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	common/values.cpp						\
 	files/files.cpp							\
 	logging/logging.cpp						\
+	zookeeper/contender.cpp						\
+	zookeeper/detector.cpp						\
 	zookeeper/zookeeper.cpp						\
 	zookeeper/authentication.cpp					\
 	zookeeper/group.cpp						\
@@ -241,6 +243,8 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp		\
 	tests/isolator.hpp						\
 	tests/mesos.hpp							\
 	tests/zookeeper_test_server.hpp zookeeper/authentication.hpp	\
+	zookeeper/contender.hpp						\
+	zookeeper/detector.hpp						\
 	zookeeper/group.hpp zookeeper/watcher.hpp			\
 	zookeeper/zookeeper.hpp zookeeper/url.hpp
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea5e10df/src/tests/zookeeper_test_server.cpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper_test_server.cpp b/src/tests/zookeeper_test_server.cpp
index 0b22f31..dc53d6a 100644
--- a/src/tests/zookeeper_test_server.cpp
+++ b/src/tests/zookeeper_test_server.cpp
@@ -86,7 +86,7 @@ std::string ZooKeeperTestServer::connectString() const
 
 void ZooKeeperTestServer::shutdownNetwork()
 {
-  if (started && connectionFactory->isAlive()) {
+  if (started && connectionFactory && connectionFactory->isAlive()) {
     connectionFactory->shutdown();
     delete connectionFactory;
     connectionFactory = NULL;

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea5e10df/src/tests/zookeeper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/zookeeper_tests.cpp b/src/tests/zookeeper_tests.cpp
index 16c5fb7..0059438 100644
--- a/src/tests/zookeeper_tests.cpp
+++ b/src/tests/zookeeper_tests.cpp
@@ -22,14 +22,23 @@
 
 #include <string>
 
+#include <process/gtest.hpp>
+
+#include <stout/gtest.hpp>
+#include <stout/owned.hpp>
 #include <stout/strings.hpp>
 
 #include "zookeeper/authentication.hpp"
+#include "zookeeper/contender.hpp"
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
 
 #include "tests/zookeeper.hpp"
 
 using namespace mesos::internal;
 using namespace mesos::internal::tests;
+using namespace process;
+using namespace zookeeper;
 
 
 TEST_F(ZooKeeperTest, Auth)
@@ -105,3 +114,210 @@ TEST_F(ZooKeeperTest, Create)
                                    true));
   EXPECT_TRUE(strings::startsWith(result, "/foo/bar/baz/0"));
 }
+
+
+TEST_F(ZooKeeperTest, LeaderDetector)
+{
+  Group group(server->connectString(), NO_TIMEOUT, "/test/");
+
+  // Initialize two members.
+  Future<Group::Membership> membership1 =
+    group.join("member 1");
+  AWAIT_READY(membership1);
+  Future<Group::Membership> membership2 =
+    group.join("member 2");
+  AWAIT_READY(membership2);
+
+  LeaderDetector detector(&group);
+
+  // Detect the leader.
+  Future<Result<Group::Membership> > leader =
+    detector.detect(None());
+  AWAIT_READY(leader);
+  ASSERT_SOME_EQ(membership1.get(), leader.get());
+
+  // Detect next leader change.
+  leader = detector.detect(leader.get());
+  EXPECT_TRUE(leader.isPending());
+
+  // Leader doesn't change after cancelling the follower.
+  Future<bool> cancellation = group.cancel(membership2.get());
+  AWAIT_READY(cancellation);
+  EXPECT_TRUE(cancellation.get());
+  EXPECT_TRUE(leader.isPending());
+
+  // Join member 2 back.
+  membership2 = group.join("member 2");
+  AWAIT_READY(membership2);
+  EXPECT_TRUE(leader.isPending());
+
+  // Cancelling the incumbent leader allows member 2 to be elected.
+  cancellation = group.cancel(membership1.get());
+  AWAIT_READY(cancellation);
+  EXPECT_TRUE(cancellation.get());
+  AWAIT_READY(leader);
+  EXPECT_SOME_EQ(membership2.get(), leader.get());
+
+  // Cancelling the only member results in no leader elected.
+  leader = detector.detect(leader.get().get());
+  EXPECT_TRUE(leader.isPending());
+  cancellation = group.cancel(membership2.get());
+
+  AWAIT_READY(cancellation);
+  EXPECT_TRUE(cancellation.get());
+  AWAIT_READY(leader);
+  ASSERT_TRUE(leader.get().isNone());
+}
+
+
+TEST_F(ZooKeeperTest, LeaderDetectorFailureHandling)
+{
+  Seconds timeout(10);
+  Group group(server->connectString(), timeout, "/test/");
+  LeaderDetector detector(&group);
+
+  Future<Group::Membership> membership1 =
+    group.join("member 1");
+  AWAIT_READY(membership1);
+
+  Future<Result<Group::Membership> > leader =
+    detector.detect();
+
+  AWAIT_READY(leader);
+  EXPECT_SOME(leader.get());
+
+  leader = detector.detect(leader.get());
+
+  server->shutdownNetwork();
+
+  Clock::pause();
+  // We may need to advance multiple times because we could have
+  // advanced the clock before the timer in Group starts.
+  while (leader.isPending()) {
+    Clock::advance(timeout);
+    Clock::settle();
+  }
+  Clock::resume();
+
+  // The detect operation times out, which results in the erroneous
+  // state in the detector.
+  AWAIT_READY(leader);
+  EXPECT_ERROR(leader.get());
+
+  // The detection will fail because of the error state.
+  leader = detector.detect();
+  AWAIT_READY(leader);
+  EXPECT_ERROR(leader.get());
+
+  // Passing the previous state forces the detector to retry.
+  leader = detector.detect(leader.get());
+
+  server->startNetwork();
+
+  membership1 = group.join("member 1");
+  AWAIT_READY(membership1);
+
+  AWAIT_READY(leader);
+  EXPECT_SOME(leader.get());
+
+  // Cancel the member and join another.
+  Future<bool> cancelled = group.cancel(leader.get().get());
+  AWAIT_READY(cancelled);
+  Future<Group::Membership> membership2 = group.join("member 2");
+  AWAIT_READY(membership2);
+
+  // Detect a new leader.
+  leader = detector.detect(leader.get());
+  AWAIT_READY(leader);
+  EXPECT_SOME(leader.get());
+}
+
+
+TEST_F(ZooKeeperTest, LeaderContender)
+{
+  Seconds timeout(10);
+  Group group(server->connectString(), timeout, "/test/");
+
+  Owned<LeaderContender> contender = new LeaderContender(&group, "candidate 1");
+  contender->contend();
+
+  // Immediately withdrawing after contending leads to delayed
+  // cancellation.
+  Future<bool> withdrawn = contender->withdraw();
+  AWAIT_READY(withdrawn);
+  EXPECT_TRUE(withdrawn.get());
+
+  // Normal workflow.
+  contender = new LeaderContender(&group, "candidate 1");
+
+  Future<Future<Nothing> > candidated = contender->contend();
+  AWAIT_READY(candidated);
+
+  Future<Nothing> lostCandidacy = candidated.get();
+  EXPECT_TRUE(lostCandidacy.isPending());
+
+  // Expire the Group session while we are watching for updates from
+  // the contender and the candidacy will be lost.
+  Future<Option<int64_t> > session = group.session();
+  AWAIT_READY(session);
+  ASSERT_SOME(session.get());
+  server->expireSession(session.get().get());
+  AWAIT_READY(lostCandidacy);
+
+  // Withdraw directly returns because candidacy is lost and there
+  // is nothing to cancel.
+  withdrawn = contender->withdraw();
+  AWAIT_READY(withdrawn);
+  EXPECT_FALSE(withdrawn.get());
+
+  // Contend again.
+  contender = new LeaderContender(&group, "candidate 1");
+  candidated = contender->contend();
+
+  session = group.session();
+  AWAIT_READY(session);
+  ASSERT_SOME(session.get());
+
+  server->expireSession(session.get().get());
+
+  Clock::pause();
+  // The retry timeout.
+  Clock::advance(GroupProcess::RETRY_INTERVAL);
+  Clock::settle();
+  Clock::resume();
+
+  // The contender weathered the expiration and succeeded in a retry.
+  AWAIT_READY(candidated);
+
+  withdrawn = contender->withdraw();
+  AWAIT_READY(withdrawn);
+
+  // Contend (3) and shutdown the network this time.
+  contender = new LeaderContender(&group, "candidate 1");
+  candidated = contender->contend();
+  AWAIT_READY(candidated);
+  lostCandidacy = candidated.get();
+
+  server->shutdownNetwork();
+
+  Clock::pause();
+
+  // We may need to advance multiple times because we could have
+  // advanced the clock before the timer in Group starts.
+  while (lostCandidacy.isPending()) {
+    Clock::advance(timeout);
+    Clock::settle();
+  }
+
+  // Server failure results in failed future.
+  AWAIT_ASSERT_FAILED(lostCandidacy);
+
+  Clock::resume();
+
+  server->startNetwork();
+
+  // Contend again (4).
+  contender = new LeaderContender(&group, "candidate 1");
+  candidated = contender->contend();
+  AWAIT_READY(candidated);
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea5e10df/src/zookeeper/contender.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/contender.cpp b/src/zookeeper/contender.cpp
new file mode 100644
index 0000000..bb7e255
--- /dev/null
+++ b/src/zookeeper/contender.cpp
@@ -0,0 +1,299 @@
+#include <set>
+#include <string>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+
+#include <stout/check.hpp>
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+
+#include "zookeeper/contender.hpp"
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+
+using namespace process;
+
+using std::set;
+using std::string;
+
+namespace zookeeper {
+
+class LeaderContenderProcess : public Process<LeaderContenderProcess>
+{
+public:
+  LeaderContenderProcess(Group* group, const std::string& data);
+  virtual ~LeaderContenderProcess();
+
+  // LeaderContender implementation.
+  Future<Future<Nothing> > contend();
+  Future<bool> withdraw();
+
+protected:
+  virtual void finalize();
+
+private:
+  // Invoked when we have joined the group (or failed to do so).
+  void joined();
+
+  // Invoked when the group memberships have changed.
+  void watched(const Future<set<Group::Membership> >& memberships);
+
+  // Invoked when the group membership is cancelled.
+  void cancelled(const Future<bool>& successful);
+
+  // Helper for setting error and failing pending promises.
+  void fail(const string& message);
+
+  // Helper for cancelling the Group membership.
+  void cancel();
+
+  Group* group;
+  const string data;
+
+  // The contender's state transitions from contending -> watching ->
+  // withdrawing or contending -> withdrawing. Each state is
+  // identified by the corresponding Option<Promise> being assigned.
+  // Note that these Option<Promise>s are never reset to None once it
+  // is assigned.
+
+  // Holds the promise for the future for contend().
+  Option<Promise<Future<Nothing> >*> contending;
+
+  // Holds the promise for the inner future enclosed by contend()'s
+  // result which is satisfied when the contender's candidacy is
+  // lost.
+  Option<Promise<Nothing>*> watching;
+
+  // Holds the promise for the future for withdraw().
+  Option<Promise<bool>*> withdrawing;
+
+  // Stores the result for joined().
+  Future<Group::Membership> candidacy;
+};
+
+
+LeaderContenderProcess::LeaderContenderProcess(
+    Group* _group,
+    const string& _data)
+  : group(_group),
+    data(_data) {}
+
+
+LeaderContenderProcess::~LeaderContenderProcess()
+{
+  if (contending.isSome()) {
+    delete contending.get();
+    contending = None();
+  }
+
+  if (watching.isSome()) {
+    delete watching.get();
+    watching = None();
+  }
+
+  if (withdrawing.isSome()) {
+    delete withdrawing.get();
+    withdrawing = None();
+  }
+}
+
+
+void LeaderContenderProcess::finalize()
+{
+  // We do not wait for the result here because the Group keeps
+  // retrying (even after the contender is destroyed) until it
+  // either succeeds or its session times out. In either case the
+  // old membership is eventually cancelled.
+  // There is a tricky situation where the contender terminates after
+  // it has contended but before it is notified of the obtained
+  // membership. In this case the membership is not cancelled during
+  // contender destruction. The client thus should use withdraw() to
+  // wait for the membership to be first obtained and then cancelled.
+  cancel();
+}
+
+
+Future<Future<Nothing> > LeaderContenderProcess::contend()
+{
+  CHECK(contending.isNone()) << "Cannot contend more than once";
+
+  LOG(INFO) << "Joining the ZK group with data: '" << data << "'";
+  candidacy = group->join(data);
+  candidacy
+    .onAny(defer(self(), &Self::joined));
+
+  // Okay, we wait and see what unfolds.
+  contending = new Promise<Future<Nothing> >();
+  return contending.get()->future();
+}
+
+
+Future<bool> LeaderContenderProcess::withdraw()
+{
+  CHECK_SOME(contending)
+    << "Can only withdraw after the contender has contended";
+
+  if (withdrawing.isSome()) {
+    // Repeated calls to withdraw get the same result.
+    return withdrawing.get();
+  }
+
+  withdrawing = new Promise<bool>();
+
+  if (candidacy.isPending()) {
+    // If we have not obtained the candidacy yet, we withdraw after
+    // it is obtained.
+    LOG(INFO) << "Withdraw requested before the candidacy is obtained; will "
+              << "withdraw after it happens";
+    candidacy.onAny(defer(self(), &Self::cancel));
+  } else if (candidacy.isReady()) {
+    cancel();
+  } else {
+    CHECK(candidacy.isFailed()) << "Not expecting candidacy to be discarded";
+
+    // We have failed to obtain the candidacy so we do not need to
+    // cancel it.
+    return false;
+  }
+
+  return withdrawing.get()->future();
+}
+
+
+void LeaderContenderProcess::cancel()
+{
+  if (!candidacy.isReady()) {
+    // Nothing to cancel.
+    return;
+  }
+
+  LOG(INFO) << "Now cancelling the membership: " << candidacy.get().id();
+
+  group->cancel(candidacy.get())
+    .onAny(defer(self(), &Self::cancelled, lambda::_1));
+}
+
+
+void LeaderContenderProcess::cancelled(const Future<bool>& successful)
+{
+  CHECK(candidacy.isReady());
+  LOG(INFO) << "Membership cancelled: " << candidacy.get().id();
+
+  CHECK_SOME(withdrawing);
+  withdrawing.get()->set(successful);
+}
+
+
+void LeaderContenderProcess::joined()
+{
+  if (candidacy.isFailed()) {
+    fail(candidacy.failure());
+    return;
+  }
+
+  CHECK(candidacy.isReady()) << "Not expecting Group to discard the future";
+
+  if (withdrawing.isSome()) {
+    LOG(INFO) << "Joined group after the contender started withdrawing";
+    return;
+  }
+
+  LOG(INFO) << "New candidate (id='" << candidacy.get().id() << "', data='"
+            << data << "') has entered the contest for leadership";
+
+  // Transition to 'watching' state.
+  CHECK(watching.isNone());
+  watching = new Promise<Nothing>();
+
+  // Notify the client.
+  CHECK(contending.isSome());
+  if (contending.get()->set(watching.get()->future())) {
+    // Continue to watch that our membership is not removed (if the
+    // client still cares about it).
+    group->watch()
+      .onAny(defer(self(), &Self::watched, lambda::_1));
+  }
+}
+
+
+void LeaderContenderProcess::watched(
+    const Future<set<Group::Membership> >& memberships)
+{
+  CHECK_SOME(contending);
+  CHECK(contending.get()->future().isReady())
+    << "'Contending' must be ready before 'watching'";
+
+  if (withdrawing.isSome()) {
+    LOG(INFO)
+      << "Group memberships changed after the contender started withdrawing";
+    return;
+  }
+
+  // Fail all operations.
+  if (memberships.isFailed()) {
+    fail(memberships.failure());
+    return;
+  }
+
+  CHECK(memberships.isReady()) << "Not expecting Group to discard the future";
+
+  CHECK_SOME(watching);
+  CHECK(candidacy.isReady());
+
+  if (memberships.get().count(candidacy.get()) == 0) {
+    // We had joined the group but our membership is gone.
+    LOG(INFO) << "Lost candidacy: " << candidacy.get().id();
+    watching.get()->set(Nothing());
+  } else {
+    // Continue to watch that our membership is not removed.
+    group->watch(memberships.get())
+      .onAny(defer(self(), &Self::watched, lambda::_1));
+  }
+}
+
+
+void LeaderContenderProcess::fail(const string& message)
+{
+  if (contending.isSome()) {
+    contending.get()->fail(message);
+  }
+
+  if (watching.isSome()) {
+    watching.get()->fail(message);
+  }
+
+  if (withdrawing.isSome()) {
+    withdrawing.get()->fail(message);
+  }
+}
+
+
+LeaderContender::LeaderContender(Group* group, const string& data)
+{
+  process = new LeaderContenderProcess(group, data);
+  spawn(process);
+}
+
+
+LeaderContender::~LeaderContender()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+Future<Future<Nothing> > LeaderContender::contend()
+{
+  return dispatch(process, &LeaderContenderProcess::contend);
+}
+
+
+Future<bool> LeaderContender::withdraw()
+{
+  return dispatch(process, &LeaderContenderProcess::withdraw);
+}
+
+} // namespace zookeeper {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea5e10df/src/zookeeper/contender.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/contender.hpp b/src/zookeeper/contender.hpp
new file mode 100644
index 0000000..d0b386e
--- /dev/null
+++ b/src/zookeeper/contender.hpp
@@ -0,0 +1,61 @@
+#ifndef __ZOOKEEPER_CONTENDER_HPP
+#define __ZOOKEEPER_CONTENDER_HPP
+
+#include <string>
+
+#include <process/future.hpp>
+
+#include <stout/nothing.hpp>
+
+#include "zookeeper/group.hpp"
+
+namespace zookeeper {
+
+// Forward declaration.
+class LeaderContenderProcess;
+
+
+// Provides an abstraction for contending to be the leader of a
+// ZooKeeper group.
+// Note that the contender is NOT reusable, which means its methods
+// are supposed to be called once and the client needs to create a
+// new instance to contend again.
+class LeaderContender
+{
+public:
+  // The specified 'group' is expected to outlive the contender. The
+  // specified 'data' is associated with the group membership created
+  // by this contender.
+  LeaderContender(Group* group, const std::string& data);
+
+  // Note that the contender's membership, if obtained, is scheduled
+  // to be cancelled during destruction.
+  // NOTE: The client should call withdraw() to guarantee that the
+  // membership is cancelled when its returned future is satisfied.
+  virtual ~LeaderContender();
+
+  // Returns a Future<Nothing> once the contender has achieved
+  // candidacy (by obtaining a membership) and a failure otherwise.
+  // The inner Future returns Nothing when the contender is out of
+  // the contest (i.e. its membership is lost) and a failure if it is
+  // unable to watch the membership.
+  process::Future<process::Future<Nothing> > contend();
+
+  // Returns true if successfully withdrawn from the contest (either
+  // while contending or has already contended and is watching for
+  // membership loss).
+  // It should only be called after contend() is called.
+  // A false return value implies that there was no valid group
+  // membership to cancel, which may be a result of a race to cancel
+  // an expired membership.
+  // A failed future is returned if the contender is unable to
+  // withdraw (but the membership will expire).
+  process::Future<bool> withdraw();
+
+private:
+  LeaderContenderProcess* process;
+};
+
+} // namespace zookeeper {
+
+#endif // __ZOOKEEPER_CONTENDER_HPP

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea5e10df/src/zookeeper/detector.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/detector.cpp b/src/zookeeper/detector.cpp
new file mode 100644
index 0000000..1de3663
--- /dev/null
+++ b/src/zookeeper/detector.cpp
@@ -0,0 +1,173 @@
+#include <set>
+#include <string>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include "process/logging.hpp"
+#include <process/process.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+
+using namespace process;
+
+using std::set;
+using std::string;
+
+namespace zookeeper {
+
+class LeaderDetectorProcess : public Process<LeaderDetectorProcess>
+{
+public:
+  LeaderDetectorProcess(Group* group);
+  virtual ~LeaderDetectorProcess();
+  virtual void initialize();
+
+  // LeaderDetector implementation.
+  Future<Result<Group::Membership> > detect(
+      const Result<Group::Membership>& previous);
+
+private:
+  // Helper that sets up the watch on the group.
+  void watch(const set<Group::Membership>& expected);
+
+  // Invoked when the group memberships have changed.
+  void watched(Future<set<Group::Membership> > memberships);
+
+  Group* group;
+  Result<Group::Membership> leader;
+  set<Promise<Result<Group::Membership> >*> promises;
+};
+
+
+LeaderDetectorProcess::LeaderDetectorProcess(Group* _group)
+  : group(_group), leader(None()) {}
+
+
+LeaderDetectorProcess::~LeaderDetectorProcess()
+{
+  foreach (Promise<Result<Group::Membership> >* promise, promises) {
+    promise->fail("No longer detecting leader");
+    delete promise;
+  }
+  promises.clear();
+}
+
+
+void LeaderDetectorProcess::initialize()
+{
+  watch(set<Group::Membership>());
+}
+
+
+Future<Result<Group::Membership> > LeaderDetectorProcess::detect(
+    const Result<Group::Membership>& previous)
+{
+  // Return immediately if the incumbent leader is different from the
+  // expected.
+  if (leader.isError() != previous.isError() ||
+      leader.isNone() != previous.isNone() ||
+      leader.isSome() != previous.isSome()) {
+    return leader; // State change.
+  } else if (leader.isSome() && previous.isSome() &&
+             leader.get() != previous.get()) {
+    return leader; // Leadership change.
+  }
+
+  // Otherwise wait for the next election result.
+  Promise<Result<Group::Membership> >* promise =
+    new Promise<Result<Group::Membership> >();
+  promises.insert(promise);
+  return promise->future();
+}
+
+
+void LeaderDetectorProcess::watch(const set<Group::Membership>& expected)
+{
+  group->watch(expected)
+    .onAny(defer(self(), &Self::watched, lambda::_1));
+}
+
+
+void LeaderDetectorProcess::watched(Future<set<Group::Membership> > memberships)
+{
+  if (memberships.isFailed()) {
+    LOG(ERROR) << "Failed to watch memberships: " << memberships.failure();
+    leader = Error(memberships.failure());
+    foreach (Promise<Result<Group::Membership> >* promise, promises) {
+      promise->set(leader);
+      delete promise;
+    }
+    promises.clear();
+
+    // Start over.
+    watch(set<Group::Membership>());
+    return;
+  }
+
+  CHECK(memberships.isReady()) << "Not expecting Group to discard futures";
+
+  // Update leader status based on memberships.
+  if (leader.isSome() && memberships.get().count(leader.get()) == 0) {
+    VLOG(1) << "The current leader (id=" << leader.get().id() << ") is lost";
+  }
+
+  // Run an "election". The leader is the oldest member (smallest
+  // membership id). We do not fulfill any of our promises if the
+  // incumbent wins the election.
+  Option<Group::Membership> current;
+  foreach (const Group::Membership& membership, memberships.get()) {
+    if (current.isNone() || membership.id() < current.get().id()) {
+      current = membership;
+    }
+  }
+
+  if (current.isSome() && (!leader.isSome() || current.get() != leader.get())) {
+    LOG(INFO) << "Detected a new leader (id='" << current.get().id()
+              << "')";
+    foreach (Promise<Result<Group::Membership> >* promise, promises) {
+      promise->set(current);
+      delete promise;
+    }
+    promises.clear();
+  } else if (current.isNone() && !leader.isNone()) {
+    LOG(INFO) << "No new leader is elected after election";
+
+    foreach (Promise<Result<Group::Membership> >* promise, promises) {
+      promise->set(Result<Group::Membership>::none());
+      delete promise;
+    }
+    promises.clear();
+  }
+
+  leader = current;
+  watch(memberships.get());
+}
+
+
+LeaderDetector::LeaderDetector(Group* group)
+{
+  process = new LeaderDetectorProcess(group);
+  spawn(process);
+}
+
+
+LeaderDetector::~LeaderDetector()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+Future<Result<Group::Membership> > LeaderDetector::detect(
+    const Result<Group::Membership>& membership)
+{
+  return dispatch(process, &LeaderDetectorProcess::detect, membership);
+}
+
+} // namespace zookeeper {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea5e10df/src/zookeeper/detector.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/detector.hpp b/src/zookeeper/detector.hpp
new file mode 100644
index 0000000..de4acab
--- /dev/null
+++ b/src/zookeeper/detector.hpp
@@ -0,0 +1,47 @@
+#ifndef __ZOOKEEPER_DETECTOR_HPP__
+#define __ZOOKEEPER_DETECTOR_HPP__
+
+#include <string>
+
+#include <stout/result.hpp>
+
+#include <process/future.hpp>
+
+#include "zookeeper/group.hpp"
+
+namespace zookeeper {
+
+// Forward declaration.
+class LeaderDetectorProcess;
+
+// Provides an abstraction for detecting the leader of a ZooKeeper
+// group.
+class LeaderDetector
+{
+public:
+  // The specified 'group' is expected to outlive the detector.
+  LeaderDetector(Group* group);
+  virtual ~LeaderDetector();
+
+  // Returns some membership after an election has occurred and a
+  // leader (membership) is elected, or none if an election occurs and
+  // no leader is elected (e.g., all memberships are lost).
+  // The result is an error if the detector is not able to detect the
+  // leader, possibly due to network disconnection.
+  //
+  // The 'previous' result (if any) should be passed back if this
+  // method is called repeatedly so the detector only returns when it
+  // gets a different result, either because an error is recovered or
+  // the elected membership is different from the 'previous'.
+  //
+  // TODO(xujyan): Use a Stream abstraction instead.
+  process::Future<Result<Group::Membership> > detect(
+      const Result<Group::Membership>& previous = None());
+
+private:
+  LeaderDetectorProcess* process;
+};
+
+} // namespace zookeeper {
+
+#endif // __ZOOKEEPER_DETECTOR_HPP__