You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/11/19 19:57:47 UTC

[6/8] git commit: Added Master contender and detector abstractions.

Added Master contender and detector abstractions.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/13087


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bcd1dc4e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bcd1dc4e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bcd1dc4e

Branch: refs/heads/master
Commit: bcd1dc4e10a1cff4fdc4e92daff108b6fa0475d3
Parents: 9fa1d47
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Nov 19 10:39:15 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Nov 19 10:57:19 2013 -0800

----------------------------------------------------------------------
 src/Makefile.am                               |  71 +--
 src/master/contender.cpp                      | 206 +++++++
 src/master/contender.hpp                      | 127 ++++
 src/master/detector.cpp                       | 361 ++++++++++++
 src/master/detector.hpp                       | 129 ++++
 src/tests/master_contender_detector_tests.cpp | 647 +++++++++++++++++++++
 src/tests/master_detector_tests.cpp           | 628 --------------------
 7 files changed, 1508 insertions(+), 661 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 3c48aee..969aead 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -161,7 +161,9 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	sasl/auxprop.cpp						\
 	sched/sched.cpp							\
 	local/local.cpp							\
+	master/contender.cpp						\
 	master/constants.cpp						\
+	master/detector.cpp						\
 	master/drf_sorter.cpp						\
 	master/http.cpp							\
 	master/master.cpp						\
@@ -224,7 +226,10 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp		\
 	linux/fs.hpp local/flags.hpp local/local.hpp			\
 	logging/flags.hpp logging/logging.hpp				\
 	master/allocator.hpp						\
-	master/constants.hpp master/drf_sorter.hpp master/flags.hpp	\
+	master/contender.hpp						\
+	master/constants.hpp						\
+	master/detector.hpp						\
+	master/drf_sorter.hpp master/flags.hpp				\
 	master/hierarchical_allocator_process.hpp			\
 	master/registrar.hpp						\
 	master/master.hpp master/sorter.hpp				\
@@ -779,38 +784,38 @@ balloon_executor_LDADD = libmesos.la
 
 check_PROGRAMS += mesos-tests
 
-mesos_tests_SOURCES =			\
-  tests/allocator_tests.cpp		\
-  tests/attributes_tests.cpp		\
-  tests/authentication_tests.cpp	\
-  tests/environment.cpp			\
-  tests/examples_tests.cpp		\
-  tests/exception_tests.cpp		\
-  tests/fault_tolerance_tests.cpp	\
-  tests/files_tests.cpp			\
-  tests/flags.cpp			\
-  tests/gc_tests.cpp			\
-  tests/isolator_tests.cpp		\
-  tests/log_tests.cpp			\
-  tests/logging_tests.cpp		\
-  tests/main.cpp			\
-  tests/master_detector_tests.cpp	\
-  tests/master_tests.cpp		\
-  tests/mesos.cpp			\
-  tests/monitor_tests.cpp		\
-  tests/paths_tests.cpp			\
-  tests/protobuf_io_tests.cpp		\
-  tests/reaper_tests.cpp		\
-  tests/registrar_tests.cpp		\
-  tests/resource_offers_tests.cpp	\
-  tests/resources_tests.cpp		\
-  tests/sasl_tests.cpp			\
-  tests/script.cpp			\
-  tests/slave_recovery_tests.cpp	\
-  tests/sorter_tests.cpp		\
-  tests/state_tests.cpp			\
-  tests/status_update_manager_tests.cpp	\
-  tests/utils.cpp			\
+mesos_tests_SOURCES =				\
+  tests/allocator_tests.cpp			\
+  tests/attributes_tests.cpp			\
+  tests/authentication_tests.cpp		\
+  tests/environment.cpp				\
+  tests/examples_tests.cpp			\
+  tests/exception_tests.cpp			\
+  tests/fault_tolerance_tests.cpp		\
+  tests/files_tests.cpp				\
+  tests/flags.cpp				\
+  tests/gc_tests.cpp				\
+  tests/isolator_tests.cpp			\
+  tests/log_tests.cpp				\
+  tests/logging_tests.cpp			\
+  tests/main.cpp				\
+  tests/master_contender_detector_tests.cpp	\
+  tests/master_tests.cpp			\
+  tests/mesos.cpp				\
+  tests/monitor_tests.cpp			\
+  tests/paths_tests.cpp				\
+  tests/protobuf_io_tests.cpp			\
+  tests/reaper_tests.cpp			\
+  tests/registrar_tests.cpp			\
+  tests/resource_offers_tests.cpp		\
+  tests/resources_tests.cpp			\
+  tests/sasl_tests.cpp				\
+  tests/script.cpp				\
+  tests/slave_recovery_tests.cpp		\
+  tests/sorter_tests.cpp			\
+  tests/state_tests.cpp				\
+  tests/status_update_manager_tests.cpp		\
+  tests/utils.cpp				\
   tests/zookeeper_url_tests.cpp
 
 mesos_tests_CPPFLAGS = $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/master/contender.cpp
----------------------------------------------------------------------
diff --git a/src/master/contender.cpp b/src/master/contender.cpp
new file mode 100644
index 0000000..84b0552
--- /dev/null
+++ b/src/master/contender.cpp
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <process/defer.hpp>
+#include <process/process.hpp>
+
+#include <stout/check.hpp>
+#include <stout/lambda.hpp>
+
+#include "master/contender.hpp"
+#include "master/master.hpp"
+
+#include "zookeeper/contender.hpp"
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+using std::string;
+
+using namespace process;
+using namespace zookeeper;
+
+namespace mesos {
+namespace internal {
+
+using namespace master;
+
+const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT = Seconds(10);
+
+
+class ZooKeeperMasterContenderProcess
+  : public Process<ZooKeeperMasterContenderProcess>
+{
+public:
+  ZooKeeperMasterContenderProcess(const zookeeper::URL& url);
+  ZooKeeperMasterContenderProcess(Owned<zookeeper::Group> group);
+  ~ZooKeeperMasterContenderProcess();
+
+  void initialize(const PID<Master>& master);
+
+  // MasterContender implementation.
+  virtual Future<Future<Nothing> > contend();
+
+private:
+  Owned<zookeeper::Group> group;
+  LeaderContender* contender;
+  PID<Master> master;
+};
+
+
+Try<MasterContender*> MasterContender::create(const string& zk)
+{
+  if (zk == "") {
+    return new StandaloneMasterContender();
+  } else if (strings::startsWith(zk, "zk://")) {
+    Try<zookeeper::URL> url = URL::parse(zk);
+    if (url.isError()) {
+      return Try<MasterContender*>::error(url.error());
+    }
+    if (url.get().path == "/") {
+      return Try<MasterContender*>::error(
+          "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
+    }
+    return new ZooKeeperMasterContender(url.get());
+  } else if (strings::startsWith(zk, "file://")) {
+    const string& path = zk.substr(7);
+    const Try<string> read = os::read(path);
+    if (read.isError()) {
+      return Error("Failed to read from file at '" + path + "'");
+    }
+
+    return create(strings::trim(read.get()));
+  }
+
+  return Try<MasterContender*>::error("Failed to parse '" + zk + "'");
+}
+
+
+MasterContender::~MasterContender() {}
+
+
+StandaloneMasterContender::~StandaloneMasterContender()
+{
+  if (promise != NULL) {
+    promise->set(Nothing()); // Leadership lost.
+    delete promise;
+  }
+}
+
+
+void StandaloneMasterContender::initialize(
+    const PID<master::Master>& master)
+{
+  // We don't really need to store the master in this basic
+  // implementation so we just restore an 'initialized' flag to make
+  // sure it is called.
+  initialized = true;
+}
+
+Future<Future<Nothing> > StandaloneMasterContender::contend()
+{
+  CHECK(initialized) << "Initialize the contender first";
+
+  if (promise != NULL) {
+    LOG(INFO) << "Withdrawing the previous membership before recontending";
+    promise->set(Nothing());
+    delete promise;
+  }
+
+  // Directly return a future that is always pending because it
+  // represents a membership/leadership that is not going to be lost
+  // until we 'withdraw'.
+  promise = new Promise<Nothing>();
+  return promise->future();
+}
+
+
+ZooKeeperMasterContender::ZooKeeperMasterContender(const URL& url)
+{
+  process = new ZooKeeperMasterContenderProcess(url);
+  spawn(process);
+}
+
+
+ZooKeeperMasterContender::ZooKeeperMasterContender(Owned<Group> group)
+{
+  process = new ZooKeeperMasterContenderProcess(group);
+  spawn(process);
+}
+
+
+ZooKeeperMasterContender::~ZooKeeperMasterContender()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+void ZooKeeperMasterContender::initialize(
+    const PID<master::Master>& master)
+{
+  process->initialize(master);
+}
+
+
+Future<Future<Nothing> > ZooKeeperMasterContender::contend()
+{
+  return dispatch(process, &ZooKeeperMasterContenderProcess::contend);
+}
+
+
+ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
+    const URL& url)
+  : group(new Group(url, MASTER_CONTENDER_ZK_SESSION_TIMEOUT)),
+    contender(NULL) {}
+
+
+ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
+    Owned<Group> _group)
+  : group(_group),
+    contender(NULL) {}
+
+
+ZooKeeperMasterContenderProcess::~ZooKeeperMasterContenderProcess()
+{
+  delete contender;
+}
+
+void ZooKeeperMasterContenderProcess::initialize(
+    const PID<Master>& _master)
+{
+  master = _master;
+}
+
+
+Future<Future<Nothing> > ZooKeeperMasterContenderProcess::contend()
+{
+  CHECK(master) << "Initialize the contender first";
+
+  if (contender != NULL) {
+    LOG(INFO) << "Withdrawing the previous membership before recontending";
+    delete contender;
+  }
+
+  contender = new LeaderContender(group.get(), master);
+  return contender->contend();
+}
+
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/master/contender.hpp
----------------------------------------------------------------------
diff --git a/src/master/contender.hpp b/src/master/contender.hpp
new file mode 100644
index 0000000..50fd4f3
--- /dev/null
+++ b/src/master/contender.hpp
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_CONTENDER_HPP__
+#define __MASTER_CONTENDER_HPP__
+
+#include <process/defer.hpp>
+#include <process/future.hpp>
+#include <process/pid.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include <stout/owned.hpp>
+
+#include "zookeeper/contender.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+namespace mesos {
+namespace internal {
+
+extern const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT;
+
+
+// Forward declarations.
+namespace master {
+class Master;
+}
+
+
+class ZooKeeperMasterContenderProcess;
+
+
+// An abstraction for contending to be a leading master.
+class MasterContender
+{
+public:
+  // Attempts to create a master contender using the specified
+  // Zookeeper.
+  // The Zookeeper address should be one of:
+  //   - zk://host1:port1,host2:port2,.../path
+  //   - zk://username:password@host1:port1,host2:port2,.../path
+  //   - file:///path/to/file (where file contains one of the above)
+  // Note that the returned contender still needs to be 'initialize()'d.
+  static Try<MasterContender*> create(const std::string& zk);
+
+  // Note that the contender's membership, if obtained, is scheduled
+  // to be cancelled during destruction.
+  virtual ~MasterContender() = 0;
+
+  // Initializes the contender with the PID of the master it contends
+  // on behalf of.
+  virtual void initialize(const process::PID<master::Master>& master) = 0;
+
+  // Returns a Future<Nothing> once the contender has entered the
+  // contest (by obtaining a membership) and an error otherwise.
+  // The inner Future returns Nothing when the contender is out of
+  // the contest (i.e. its membership is lost).
+  //
+  // This method can be used to contend again. Each call to this
+  // method causes the previous candidacy to be withdrawn before
+  // re-contending.
+  virtual process::Future<process::Future<Nothing> > contend() = 0;
+};
+
+
+// A basic implementation which assumes only one master is
+// contending.
+class StandaloneMasterContender : public MasterContender
+{
+public:
+  StandaloneMasterContender()
+    : initialized(false),
+      promise(NULL) {}
+  virtual ~StandaloneMasterContender();
+
+  // MasterContender implementation.
+  virtual void initialize(const process::PID<master::Master>& master);
+
+  // In this basic implementation the outer Future directly returns
+  // and inner Future stays pending because there is only one
+  // contender in the contest.
+  virtual process::Future<process::Future<Nothing> > contend();
+
+private:
+  bool initialized;
+  process::Promise<Nothing>* promise;
+};
+
+
+class ZooKeeperMasterContender : public MasterContender
+{
+public:
+  // Creates a contender that uses ZooKeeper to determine (i.e.,
+  // elect) a leading master.
+  ZooKeeperMasterContender(const zookeeper::URL& url);
+  ZooKeeperMasterContender(Owned<zookeeper::Group> group);
+
+  virtual ~ZooKeeperMasterContender();
+
+  // MasterContender implementation.
+  virtual void initialize(const process::PID<master::Master>& master);
+  virtual process::Future<process::Future<Nothing> > contend();
+
+private:
+  ZooKeeperMasterContenderProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MASTER_CONTENDER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/master/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector.cpp b/src/master/detector.cpp
new file mode 100644
index 0000000..2f73f66
--- /dev/null
+++ b/src/master/detector.cpp
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <set>
+#include <string>
+
+#include <tr1/functional>
+#include <tr1/memory> // TODO(benh): Replace shared_ptr with unique_ptr.
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/logging.hpp>
+#include <process/process.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+
+#include "master/detector.hpp"
+
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+using namespace process;
+using namespace zookeeper;
+
+using std::set;
+using std::string;
+
+namespace mesos {
+namespace internal {
+
+const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10);
+
+
+class StandaloneMasterDetectorProcess
+  : public Process<StandaloneMasterDetectorProcess>
+{
+public:
+  StandaloneMasterDetectorProcess() : leader(None()) {}
+  StandaloneMasterDetectorProcess(const UPID& _leader)
+    : leader(_leader) {}
+  ~StandaloneMasterDetectorProcess();
+
+  void appoint(const Result<UPID>& leader);
+  Future<Result<UPID> > detect(const Result<UPID>& previous = None());
+
+private:
+  // The leading master that's directly 'appoint()'ed.
+  Result<UPID> leader;
+
+  // Promises for the detection result.
+  set<Promise<Result<UPID> >*> promises;
+};
+
+
+class ZooKeeperMasterDetectorProcess
+  : public Process<ZooKeeperMasterDetectorProcess>
+{
+public:
+  ZooKeeperMasterDetectorProcess(const URL& url);
+  ZooKeeperMasterDetectorProcess(Owned<Group> group);
+  ~ZooKeeperMasterDetectorProcess();
+
+  virtual void initialize();
+
+  // ZooKeeperMasterDetector implementation.
+  Future<Result<UPID> > detect(const Result<UPID>& previous);
+
+private:
+  // Invoked when the group leadership has changed.
+  void detected(Future<Result<Group::Membership> > leader);
+
+  // Invoked when we have fetched the data associated with the leader.
+  void fetched(const Future<string>& data);
+
+  Owned<Group> group;
+  LeaderDetector detector;
+
+  // The leading Master.
+  Result<UPID> leader;
+  set<Promise<Result<UPID> >*> promises;
+};
+
+
+Try<MasterDetector*> MasterDetector::create(const string& master)
+{
+  if (master == "") {
+    return new StandaloneMasterDetector();
+  } else if (master.find("zk://") == 0) {
+    Try<URL> url = URL::parse(master);
+    if (url.isError()) {
+      return Try<MasterDetector*>::error(url.error());
+    }
+    if (url.get().path == "/") {
+      return Try<MasterDetector*>::error(
+          "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
+    }
+    return new ZooKeeperMasterDetector(url.get());
+  } else if (master.find("file://") == 0) {
+    const string& path = master.substr(7);
+    const Try<string> read = os::read(path);
+    if (read.isError()) {
+      return Error("Failed to read from file at '" + path + "'");
+    }
+
+    return create(strings::trim(read.get()));
+  }
+
+  // Okay, try and parse what we got as a PID.
+  UPID pid = master.find("master@") == 0
+    ? UPID(master)
+    : UPID("master@" + master);
+
+  if (!pid) {
+    return Try<MasterDetector*>::error(
+        "Failed to parse '" + master + "'");
+  }
+
+  return new StandaloneMasterDetector(pid);
+}
+
+
+MasterDetector::~MasterDetector() {}
+
+
+StandaloneMasterDetectorProcess::~StandaloneMasterDetectorProcess()
+{
+  foreach (Promise<Result<UPID> >* promise, promises) {
+    promise->set(Result<UPID>::error("MasterDetector is being destructed"));
+    delete promise;
+  }
+  promises.clear();
+}
+
+
+void StandaloneMasterDetectorProcess::appoint(
+    const Result<process::UPID>& _leader)
+{
+  leader = _leader;
+
+  foreach (Promise<Result<UPID> >* promise, promises) {
+    promise->set(leader);
+    delete promise;
+  }
+  promises.clear();
+}
+
+
+Future<Result<UPID> > StandaloneMasterDetectorProcess::detect(
+    const Result<UPID>& previous)
+{
+  // Directly return the current leader is not the
+  // same as the previous one.
+  if (leader.isError() != previous.isError() ||
+      leader.isNone() != previous.isNone() ||
+      leader.isSome() != previous.isSome()) {
+    return leader; // State change.
+  } else if (leader.isSome() && previous.isSome() &&
+             leader.get() != previous.get()) {
+    return leader; // Leadership change.
+  }
+
+  Promise<Result<UPID> >* promise = new Promise<Result<UPID> >();
+  promises.insert(promise);
+  return promise->future();
+}
+
+
+StandaloneMasterDetector::StandaloneMasterDetector()
+{
+  process = new StandaloneMasterDetectorProcess();
+  spawn(process);
+}
+
+
+StandaloneMasterDetector::StandaloneMasterDetector(const UPID& leader)
+{
+  process = new StandaloneMasterDetectorProcess(leader);
+  spawn(process);
+}
+
+
+StandaloneMasterDetector::~StandaloneMasterDetector()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+void StandaloneMasterDetector::appoint(const Result<process::UPID>& leader)
+{
+  return dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader);
+}
+
+
+Future<Result<UPID> > StandaloneMasterDetector::detect(
+    const Result<UPID>& previous)
+{
+  return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous);
+}
+
+
+// TODO(benh): Get ZooKeeper timeout from configuration.
+// TODO(xujyan): Use peer constructor after switching to C++ 11.
+ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
+    const URL& url)
+  : group(new Group(url.servers,
+                    MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
+                    url.path,
+                    url.authentication)),
+    detector(group.get()),
+    leader(None()) {}
+
+
+ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
+    Owned<Group> _group)
+  : group(_group),
+    detector(group.get()),
+    leader(None()) {}
+
+
+ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
+{
+  foreach (Promise<Result<UPID> >* promise, promises) {
+    promise->set(Result<UPID>::error("No longer detecting a master"));
+    delete promise;
+  }
+  promises.clear();
+}
+
+
+void ZooKeeperMasterDetectorProcess::initialize()
+{
+  detector.detect(None())
+    .onAny(defer(self(), &Self::detected, lambda::_1));
+}
+
+
+Future<Result<UPID> > ZooKeeperMasterDetectorProcess::detect(
+    const Result<UPID>& previous)
+{
+  // Directly return when the current leader and previous are not the
+  // same.
+  if (leader.isError() != previous.isError() ||
+      leader.isNone() != previous.isNone() ||
+      leader.isSome() != previous.isSome()) {
+    return leader; // State change.
+  } else if (leader.isSome() && previous.isSome() &&
+             leader.get() != previous.get()) {
+    return leader; // Leadership change.
+  }
+
+  Promise<Result<UPID> >* promise = new Promise<Result<UPID> >();
+  promises.insert(promise);
+  return promise->future();
+}
+
+
+void ZooKeeperMasterDetectorProcess::detected(
+    Future<Result<Group::Membership> > _leader)
+{
+  CHECK(_leader.isReady())
+    << "Not expecting LeaderDetector to fail or discard futures";
+
+  if (!_leader.get().isSome()) {
+    leader = _leader.get().isError()
+        ? Result<UPID>::error(_leader.get().error())
+        : Result<UPID>::none();
+
+    foreach (Promise<Result<UPID> >* promise, promises) {
+      promise->set(leader);
+      delete promise;
+    }
+    promises.clear();
+  } else {
+    // Fetch the data associated with the leader.
+    group->data(_leader.get().get())
+      .onAny(defer(self(), &Self::fetched, lambda::_1));
+  }
+
+  // Keep trying to detect leadership changes.
+  detector.detect(_leader.get())
+    .onAny(defer(self(), &Self::detected, lambda::_1));
+}
+
+
+void ZooKeeperMasterDetectorProcess::fetched(const Future<string>& data)
+{
+  if (data.isFailed()) {
+    leader = Error(data.failure());
+    foreach (Promise<Result<UPID> >* promise, promises) {
+      promise->set(leader);
+      delete promise;
+    }
+    promises.clear();
+    return;
+  }
+
+  CHECK(data.isReady()); // Not expecting Group to discard futures.
+
+  // Cache the master for subsequent requests.
+  leader = UPID(data.get());
+  LOG(INFO) << "A new leading master (UPID=" << leader.get() << ") is detected";
+
+  foreach (Promise<Result<UPID> >* promise, promises) {
+    promise->set(leader);
+    delete promise;
+  }
+  promises.clear();
+}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(const URL& url)
+{
+  process = new ZooKeeperMasterDetectorProcess(url);
+  spawn(process);
+}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(Owned<Group> group)
+{
+  process = new ZooKeeperMasterDetectorProcess(group);
+  spawn(process);
+}
+
+
+ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+Future<Result<UPID> > ZooKeeperMasterDetector::detect(
+    const Result<UPID>& previous)
+{
+  return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous);
+}
+
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/master/detector.hpp
----------------------------------------------------------------------
diff --git a/src/master/detector.hpp b/src/master/detector.hpp
new file mode 100644
index 0000000..ceb3a3f
--- /dev/null
+++ b/src/master/detector.hpp
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_DETECTOR_HPP__
+#define __MASTER_DETECTOR_HPP__
+
+#include <process/future.hpp>
+#include <process/pid.hpp>
+
+#include <stout/owned.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+namespace mesos {
+namespace internal {
+
+extern const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT;
+
+
+// Forward declarations.
+class StandaloneMasterDetectorProcess;
+class ZooKeeperMasterDetectorProcess;
+
+
+// An abstraction of a Master detector which can be used to
+// detect the leading master from a group.
+class MasterDetector
+{
+public:
+  // Attempts to create a master detector for the specified master.
+  // The master should be one of:
+  //   - host:port
+  //   - zk://host1:port1,host2:port2,.../path
+  //   - zk://username:password@host1:port1,host2:port2,.../path
+  //   - file:///path/to/file (where file contains one of the above)
+  static Try<MasterDetector*> create(const std::string& master);
+  virtual ~MasterDetector() = 0;
+
+  // Returns some PID after an election has occurred and the elected
+  // PID is different than that specified (if any), or NONE if an
+  // election occurs and no PID is elected (e.g., all PIDs are lost).
+  // The result is an error if the detector is not able to detect the
+  // leading master, possibly due to network disconnection.
+  //
+  // The future fails when the detector is destructed, it is never
+  // discarded.
+  //
+  // The 'previous' result (if any) should be passed back if this
+  // method is called repeatedly so the detector only returns when it
+  // gets a different result, either because an error is recovered or
+  // the elected membership is different from the 'previous'.
+  virtual process::Future<Result<process::UPID> > detect(
+      const Result<process::UPID>& previous = None()) = 0;
+};
+
+
+// A standalone implementation of the MasterDetector with no external
+// discovery mechanism so the user has to manually appoint a leader
+// to the detector for it to be detected.
+class StandaloneMasterDetector : public MasterDetector
+{
+public:
+  StandaloneMasterDetector();
+  // Use this constructor if the leader is known beforehand so it is
+  // unnecessary to call 'appoint()' separately.
+  StandaloneMasterDetector(const process::UPID& leader);
+  virtual ~StandaloneMasterDetector();
+
+  // Appoint the leading master so it can be *detected* by default.
+  // The leader can be NONE or ERROR.
+  // This method is used only by this basic implementation and not
+  // needed by child classes with other detection mechanisms such as
+  // Zookeeper.
+  //
+  // When used by Master, this method is called during its
+  // initialization process; when used by Slave and SchedulerDriver,
+  // the MasterDetector needs to have the leader installed prior to
+  // injection.
+  void appoint(const Result<process::UPID>& leader);
+
+  virtual process::Future<Result<process::UPID> > detect(
+      const Result<process::UPID>& previous = None());
+
+private:
+  StandaloneMasterDetectorProcess* process;
+};
+
+
+class ZooKeeperMasterDetector : public MasterDetector
+{
+public:
+  // Creates a detector which uses ZooKeeper to determine (i.e.,
+  // elect) a leading master.
+  ZooKeeperMasterDetector(const zookeeper::URL& url);
+  // A constructor overload for testing purposes.
+  ZooKeeperMasterDetector(Owned<zookeeper::Group> group);
+  virtual ~ZooKeeperMasterDetector();
+
+  // MasterDetector implementation.
+  virtual process::Future<Result<process::UPID> > detect(
+      const Result<process::UPID>& previous = None());
+
+private:
+  ZooKeeperMasterDetectorProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MASTER_DETECTOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
new file mode 100644
index 0000000..5e42374
--- /dev/null
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -0,0 +1,647 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <zookeeper.h>
+
+#include <gmock/gmock.h>
+
+#include <fstream>
+#include <map>
+#include <string>
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/pid.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/gtest.hpp>
+#include <stout/nothing.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/try.hpp>
+
+#include "master/contender.hpp"
+#include "master/detector.hpp"
+#include "master/master.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/isolator.hpp"
+#include "tests/mesos.hpp"
+#ifdef MESOS_HAS_JAVA
+#include "tests/zookeeper.hpp"
+#endif
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using namespace zookeeper;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+using process::UPID;
+
+using std::map;
+using std::string;
+using std::vector;
+
+using testing::_;
+using testing::AtMost;
+using testing::Return;
+
+
+class MasterContenderDetectorTest : public MesosTest {};
+
+
+TEST_F(MasterContenderDetectorTest, File)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Write "master" to a file and use the "file://" mechanism to
+  // create a master detector for the slave. Still requires a master
+  // detector for the master first.
+  slave::Flags flags = CreateSlaveFlags();
+
+  const string& path = path::join(flags.work_dir, "master");
+  ASSERT_SOME(os::write(path, stringify(master.get())));
+
+  Try<MasterDetector*> detector =
+    MasterDetector::create("file://" + path);
+
+  ASSERT_SOME(detector);
+
+  StartSlave(detector.get(), flags);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+TEST(BasicMasterContenderDetectorTest, Contender)
+{
+  PID<Master> master;
+  master.ip = 10000000;
+  master.port = 10000;
+
+  MasterContender* contender = new StandaloneMasterContender();
+
+  contender->initialize(master);
+
+  Future<Future<Nothing> > contended = contender->contend();
+  AWAIT_READY(contended);
+
+  Future<Nothing> lostCandidacy = contended.get();
+
+  // The candidacy is never lost.
+  EXPECT_TRUE(lostCandidacy.isPending());
+
+  delete contender;
+
+  // Deleting the contender also withdraws the previous candidacy.
+  AWAIT_READY(lostCandidacy);
+}
+
+
+TEST(BasicMasterContenderDetectorTest, Detector)
+{
+  PID<Master> master;
+  master.ip = 10000000;
+  master.port = 10000;
+
+  StandaloneMasterDetector detector;
+
+  Future<Result<UPID> > detected = detector.detect();
+
+  // No one has appointed the leader so we are pending.
+  EXPECT_TRUE(detected.isPending());
+
+  detector.appoint(master);
+
+  AWAIT_READY(detected);
+}
+
+
+#ifdef MESOS_HAS_JAVA
+class ZooKeeperMasterContenderDetectorTest : public ZooKeeperTest {};
+
+
+// A single contender gets elected automatically.
+TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender)
+{
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+      "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  Owned<zookeeper::Group> group(
+      new Group(url.get(), MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+
+  ZooKeeperMasterContender* contender = new ZooKeeperMasterContender(group);
+
+  PID<Master> master;
+  master.ip = 10000000;
+  master.port = 10000;
+
+  contender->initialize(master);
+  Future<Future<Nothing> > contended = contender->contend();
+  AWAIT_READY(contended);
+
+  ZooKeeperMasterDetector detector(url.get());
+
+  Future<Result<UPID> > leader = detector.detect();
+  EXPECT_SOME_EQ(master, leader.get());
+  Future<Nothing> lostCandidacy = contended.get();
+  leader = detector.detect(leader.get());
+
+  Future<Option<int64_t> > sessionId = group.get()->session();
+  AWAIT_READY(sessionId);
+  server->expireSession(sessionId.get().get());
+
+  // Session expiration causes candidacy to be lost and the
+  // Future<Nothing> to be fulfilled.
+  AWAIT_READY(lostCandidacy);
+  AWAIT_READY(leader);
+  EXPECT_NONE(leader.get());
+}
+
+
+// Two contenders, the first wins. Kill the first, then the second
+// is elected.
+TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
+{
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+      "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  ZooKeeperMasterContender* contender1 =
+    new ZooKeeperMasterContender(url.get());
+
+  PID<Master> master1;
+  master1.ip = 10000000;
+  master1.port = 10000;
+
+  contender1->initialize(master1);
+
+  Future<Future<Nothing> > contended1 = contender1->contend();
+  AWAIT_READY(contended1);
+
+  ZooKeeperMasterDetector detector1(url.get());
+
+  Future<Result<UPID> > leader1 = detector1.detect();
+  AWAIT_READY(leader1);
+  EXPECT_SOME_EQ(master1, leader1.get());
+
+  ZooKeeperMasterContender contender2(url.get());
+
+  PID<Master> master2;
+  master2.ip = 10000001;
+  master2.port = 10001;
+
+  contender2.initialize(master2);
+
+  Future<Future<Nothing> > contended2 = contender2.contend();
+  AWAIT_READY(contended2);
+
+  ZooKeeperMasterDetector detector2(url.get());
+  Future<Result<UPID> > leader2 = detector2.detect();
+  AWAIT_READY(leader2);
+  EXPECT_SOME_EQ(master1, leader2.get());
+
+  LOG(INFO) << "Killing the leading master";
+
+  // Destroying detector1 (below) causes leadership change.
+  delete contender1;
+
+  Future<Result<UPID> > leader3 = detector2.detect(master1);
+  AWAIT_READY(leader3);
+  EXPECT_SOME_EQ(master2, leader3.get());
+}
+
+
+// Master contention and detection fail when the network is down, it
+// recovers when the network is back up.
+TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
+{
+  Clock::pause();
+
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+      "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  ZooKeeperMasterContender contender(url.get());
+
+  PID<Master> master;
+  master.ip = 10000000;
+  master.port = 10000;
+
+  contender.initialize(master);
+
+  Future<Future<Nothing> > contended = contender.contend();
+  AWAIT_READY(contended);
+  Future<Nothing> lostCandidacy = contended.get();
+
+  ZooKeeperMasterDetector detector(url.get());
+
+  Future<Result<UPID> > leader = detector.detect();
+  EXPECT_SOME_EQ(master, leader.get());
+
+  leader = detector.detect(leader.get());
+
+  // Shut down ZooKeeper and expect things to fail after the timeout.
+  server->shutdownNetwork();
+
+  Clock::advance(std::max(
+      MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
+      MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+  Clock::settle();
+
+  AWAIT_EXPECT_FAILED(lostCandidacy);
+  AWAIT_READY(leader);
+  EXPECT_ERROR(leader.get());
+
+  // Retry.
+  contended = contender.contend();
+  leader = detector.detect(leader.get());
+
+  // Things will not change until the contender reconnects.
+  Clock::advance(Minutes(1));
+  Clock::settle();
+  EXPECT_TRUE(contended.isPending());
+  EXPECT_TRUE(leader.isPending());
+
+  server->startNetwork();
+  AWAIT_READY(contended);
+  AWAIT_READY(leader);
+}
+
+
+// Tests that detectors and contenders fail when we reach our
+// ZooKeeper session timeout. This is to enforce that we manually
+// expire the session when we do not get reconnected within the
+// timeout.
+TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
+{
+  // Use an arbitrary timeout value.
+  Duration sessionTimeout(Seconds(5));
+
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+        "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  Owned<zookeeper::Group> leaderGroup(new Group(url.get(), sessionTimeout));
+
+  // First we bring up three master contender/detector:
+  //   1. A leading contender.
+  //   2. A non-leading contender.
+  //   3. A non-contender (detector).
+
+  // 1. Simulate a leading contender.
+  ZooKeeperMasterContender leaderContender(leaderGroup);
+
+  PID<Master> leader;
+  leader.ip = 10000000;
+  leader.port = 10000;
+
+  leaderContender.initialize(leader);
+
+  Future<Future<Nothing> > contended = leaderContender.contend();
+  AWAIT_READY(contended);
+
+  ZooKeeperMasterDetector leaderDetector(leaderGroup);
+
+  Future<Result<UPID> > detected = leaderDetector.detect();
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // 2. Simulate a non-leading contender.
+  Owned<zookeeper::Group> followerGroup(new Group(url.get(), sessionTimeout));
+  ZooKeeperMasterContender followerContender(followerGroup);
+
+  PID<Master> follower;
+  follower.ip = 10000001;
+  follower.port = 10001;
+
+  followerContender.initialize(follower);
+
+  contended = followerContender.contend();
+  AWAIT_READY(contended);
+
+  ZooKeeperMasterDetector followerDetector(followerGroup);
+
+  detected = followerDetector.detect();
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // 3. Simulate a non-contender.
+  Owned<zookeeper::Group> nonContenderGroup(
+      new Group(url.get(), sessionTimeout));
+  ZooKeeperMasterDetector nonContenderDetector(nonContenderGroup);
+
+  detected = nonContenderDetector.detect();
+
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // Expecting the reconnecting event after we shut down the ZK.
+  Future<Nothing> leaderReconnecting = FUTURE_DISPATCH(
+      leaderGroup->process->self(),
+      &GroupProcess::reconnecting);
+
+  Future<Nothing> followerReconnecting = FUTURE_DISPATCH(
+      followerGroup->process->self(),
+      &GroupProcess::reconnecting);
+
+  Future<Nothing> nonContenderReconnecting = FUTURE_DISPATCH(
+      nonContenderGroup->process->self(),
+      &GroupProcess::reconnecting);
+
+  server->shutdownNetwork();
+
+  AWAIT_READY(leaderReconnecting);
+  AWAIT_READY(followerReconnecting);
+  AWAIT_READY(nonContenderReconnecting);
+
+  // Now the detectors re-detect.
+  Future<Result<UPID> > leaderNoMasterDetected = leaderDetector.detect(leader);
+  Future<Result<UPID> > followerNoMasterDetected =
+    followerDetector.detect(leader);
+  Future<Result<UPID> > nonContenderNoMasterDetected =
+    nonContenderDetector.detect(leader);
+
+  Clock::pause();
+
+  // We may need to advance multiple times because we could have
+  // advanced the clock before the timer in Group starts.
+  while (leaderNoMasterDetected.isPending() ||
+         followerNoMasterDetected.isPending() ||
+         nonContenderNoMasterDetected.isPending()) {
+    Clock::advance(sessionTimeout);
+    Clock::settle();
+  }
+
+  AWAIT_READY(leaderNoMasterDetected);
+  EXPECT_ERROR(leaderNoMasterDetected.get());
+  AWAIT_READY(followerNoMasterDetected);
+  EXPECT_ERROR(followerNoMasterDetected.get());
+  AWAIT_READY(nonContenderNoMasterDetected);
+  EXPECT_ERROR(nonContenderNoMasterDetected.get());
+
+  Clock::resume();
+}
+
+
+// Tests whether a leading master correctly detects a new master when
+// its ZooKeeper session is expired (the follower becomes the new
+// leader).
+TEST_F(ZooKeeperMasterContenderDetectorTest,
+       MasterDetectorExpireMasterZKSession)
+{
+  // Simulate a leading master.
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+      "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  PID<Master> leader;
+  leader.ip = 10000000;
+  leader.port = 10000;
+
+  // Create the group instance so we can expire its session.
+  Owned<zookeeper::Group> group(
+      new Group(url.get(), MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+
+  ZooKeeperMasterContender leaderContender(group);
+
+  leaderContender.initialize(leader);
+
+  Future<Future<Nothing> > leaderContended = leaderContender.contend();
+  AWAIT_READY(leaderContended);
+
+  Future<Nothing> leaderLostLeadership = leaderContended.get();
+
+  ZooKeeperMasterDetector leaderDetector(url.get());
+
+  Future<Result<UPID> > detected = leaderDetector.detect();
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // Keep detecting.
+  Future<Result<UPID> > newLeaderDetected =
+    leaderDetector.detect(detected.get());
+
+  // Simulate a following master.
+  PID<Master> follower;
+  follower.ip = 10000001;
+  follower.port = 10001;
+
+  ZooKeeperMasterDetector followerDetector(url.get());
+  ZooKeeperMasterContender followerContender(url.get());
+  followerContender.initialize(follower);
+
+  Future<Future<Nothing> > followerContended = followerContender.contend();
+  AWAIT_READY(followerContended);
+
+  LOG(INFO) << "The follower now is detecting the leader";
+  detected = followerDetector.detect(None());
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // Now expire the leader's zk session.
+  Future<Option<int64_t> > session = group->session();
+  AWAIT_READY(session);
+  EXPECT_SOME(session.get());
+
+  LOG(INFO) << "Now expire the ZK session: " << std::hex << session.get().get();
+
+  server->expireSession(session.get().get());
+
+  AWAIT_READY(leaderLostLeadership);
+
+  // Wait for session expiration and ensure the former leader detects
+  // a new leader.
+  AWAIT_READY(newLeaderDetected);
+  EXPECT_SOME(newLeaderDetected.get());
+  EXPECT_EQ(follower, newLeaderDetected.get().get());
+}
+
+
+// Tests whether a slave correctly DOES NOT disconnect from the
+// master when its ZooKeeper session is expired, but the master still
+// stays the leader when the slave re-connects with the ZooKeeper.
+TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession)
+{
+  // Simulate a leading master.
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+      "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  PID<Master> master;
+  master.ip = 10000000;
+  master.port = 10000;
+
+  ZooKeeperMasterContender masterContender(url.get());
+  masterContender.initialize(master);
+
+  Future<Future<Nothing> > leaderContended = masterContender.contend();
+  AWAIT_READY(leaderContended);
+
+  // Simulate a slave.
+  Owned<zookeeper::Group> group(
+        new Group(url.get(), MASTER_DETECTOR_ZK_SESSION_TIMEOUT));
+
+  ZooKeeperMasterDetector slaveDetector(group);
+
+  Future<Result<UPID> > detected = slaveDetector.detect();
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(master, detected.get());
+
+  detected = slaveDetector.detect(master);
+
+  // Now expire the slave's zk session.
+  Future<Option<int64_t> > session = group->session();
+  AWAIT_READY(session);
+
+  Future<Nothing> connected = FUTURE_DISPATCH(
+      group->process->self(),
+      &GroupProcess::connected);
+
+  server->expireSession(session.get().get());
+
+  // When connected is called, the leader has already expired and
+  // reconnected.
+  AWAIT_READY(connected);
+
+  // Still pending because there is no leader change.
+  EXPECT_TRUE(detected.isPending());
+}
+
+
+// Tests whether a slave correctly detects the new master when its
+// ZooKeeper session is expired and a new master is elected before the
+// slave reconnects with ZooKeeper.
+TEST_F(ZooKeeperMasterContenderDetectorTest,
+       MasterDetectorExpireSlaveZKSessionNewMaster)
+{
+  Try<zookeeper::URL> url = zookeeper::URL::parse(
+        "zk://" + server->connectString() + "/mesos");
+
+  ASSERT_SOME(url);
+
+  // Simulate a leading master.
+  Owned<zookeeper::Group> leaderGroup(
+      new Group(url.get(), MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+
+  // 1. Simulate a leading contender.
+  ZooKeeperMasterContender leaderContender(leaderGroup);
+  ZooKeeperMasterDetector leaderDetector(leaderGroup);
+
+  PID<Master> leader;
+  leader.ip = 10000000;
+  leader.port = 10000;
+
+  leaderContender.initialize(leader);
+
+  Future<Future<Nothing> > contended = leaderContender.contend();
+  AWAIT_READY(contended);
+
+  Future<Result<UPID> > detected = leaderDetector.detect(None());
+  AWAIT_READY(detected);
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // 2. Simulate a non-leading contender.
+  Owned<zookeeper::Group> followerGroup(
+      new Group(url.get(), MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+  ZooKeeperMasterContender followerContender(followerGroup);
+  ZooKeeperMasterDetector followerDetector(followerGroup);
+
+  PID<Master> follower;
+  follower.ip = 10000001;
+  follower.port = 10001;
+
+  followerContender.initialize(follower);
+
+  contended = followerContender.contend();
+  AWAIT_READY(contended);
+
+  detected = followerDetector.detect(None());
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  // 3. Simulate a non-contender.
+  Owned<zookeeper::Group> nonContenderGroup(
+      new Group(url.get(), MASTER_DETECTOR_ZK_SESSION_TIMEOUT));
+  ZooKeeperMasterDetector nonContenderDetector(nonContenderGroup);
+
+  detected = nonContenderDetector.detect();
+
+  EXPECT_SOME_EQ(leader, detected.get());
+
+  detected = nonContenderDetector.detect(leader);
+
+  // Now expire the slave's and leading master's zk sessions.
+  // NOTE: Here we assume that slave stays disconnected from the ZK
+  // when the leading master loses its session.
+  Future<Option<int64_t> > slaveSession = nonContenderGroup->session();
+  AWAIT_READY(slaveSession);
+
+  Future<Option<int64_t> > masterSession = leaderGroup->session();
+  AWAIT_READY(masterSession);
+
+  server->expireSession(slaveSession.get().get());
+  server->expireSession(masterSession.get().get());
+
+  // Wait for session expiration and ensure a new master is detected.
+  AWAIT_READY(detected);
+
+  EXPECT_SOME_EQ(follower, detected.get());
+}
+
+#endif // MESOS_HAS_JAVA

http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/tests/master_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_detector_tests.cpp b/src/tests/master_detector_tests.cpp
deleted file mode 100644
index 06c586d..0000000
--- a/src/tests/master_detector_tests.cpp
+++ /dev/null
@@ -1,628 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <zookeeper.h>
-
-#include <gmock/gmock.h>
-
-#include <fstream>
-#include <map>
-#include <string>
-#include <vector>
-
-#include <mesos/executor.hpp>
-#include <mesos/scheduler.hpp>
-
-#include <process/clock.hpp>
-#include <process/future.hpp>
-#include <process/pid.hpp>
-#include <process/protobuf.hpp>
-
-#include <stout/duration.hpp>
-#include <stout/gtest.hpp>
-#include <stout/nothing.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-#include <stout/try.hpp>
-
-#include "detector/detector.hpp"
-
-#include "master/master.hpp"
-
-#include "messages/messages.hpp"
-
-#include "slave/slave.hpp"
-
-#include "tests/isolator.hpp"
-#include "tests/mesos.hpp"
-#ifdef MESOS_HAS_JAVA
-#include "tests/zookeeper.hpp"
-#endif
-
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::tests;
-
-using mesos::internal::master::Master;
-
-using mesos::internal::slave::Slave;
-
-using process::Clock;
-using process::Future;
-using process::PID;
-using process::UPID;
-
-using std::map;
-using std::string;
-using std::vector;
-
-using testing::_;
-using testing::AtMost;
-using testing::Return;
-
-
-class MasterDetectorTest : public MesosTest {};
-
-
-TEST_F(MasterDetectorTest, File)
-{
-  Try<PID<Master> > master = StartMaster();
-  ASSERT_SOME(master);
-
-  Files files;
-  TestingIsolator isolator;
-
-  slave::Flags flags = CreateSlaveFlags();
-
-  Slave s(flags, true, &isolator, &files);
-  PID<Slave> slave = process::spawn(&s);
-
-  // Write "master" to a file and use the "file://" mechanism to
-  // create a master detector for the slave. Still requires a master
-  // detector for the master first.
-  BasicMasterDetector detector1(master.get(), vector<UPID>(), true);
-
-  const string& path = path::join(flags.work_dir, "master");
-  ASSERT_SOME(os::write(path, stringify(master.get())));
-
-  Try<MasterDetector*> detector =
-    MasterDetector::create("file://" + path, slave, false, true);
-
-  EXPECT_SOME(os::rm(path));
-
-  ASSERT_SOME(detector);
-
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&driver, _, _))
-    .Times(1);
-
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers));
-
-  driver.start();
-
-  AWAIT_READY(offers);
-
-  driver.stop();
-  driver.join();
-
-  Shutdown();
-
-  process::terminate(slave);
-  process::wait(slave);
-}
-
-
-class MockMasterDetectorListenerProcess
-  : public ProtobufProcess<MockMasterDetectorListenerProcess>
-{
-public:
-  MockMasterDetectorListenerProcess() {}
-  virtual ~MockMasterDetectorListenerProcess() {}
-
-  MOCK_METHOD1(newMasterDetected, void(const process::UPID&));
-  MOCK_METHOD0(noMasterDetected, void(void));
-
-protected:
-  virtual void initialize()
-  {
-    install<NewMasterDetectedMessage>(
-        &MockMasterDetectorListenerProcess::newMasterDetected,
-        &NewMasterDetectedMessage::pid);
-
-    install<NoMasterDetectedMessage>(
-        &MockMasterDetectorListenerProcess::noMasterDetected);
-  }
-};
-
-
-#ifdef MESOS_HAS_JAVA
-class ZooKeeperMasterDetectorTest : public ZooKeeperTest {};
-
-
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetector)
-{
-  MockMasterDetectorListenerProcess mock;
-  process::spawn(mock);
-
-  Future<Nothing> newMasterDetected;
-  EXPECT_CALL(mock, newMasterDetected(mock.self()))
-    .WillOnce(FutureSatisfy(&newMasterDetected));
-
-  std::string master = "zk://" + server->connectString() + "/mesos";
-
-  Try<MasterDetector*> detector =
-    MasterDetector::create(master, mock.self(), true, true);
-
-  ASSERT_SOME(detector);
-
-  AWAIT_READY(newMasterDetected);
-
-  MasterDetector::destroy(detector.get());
-
-  process::terminate(mock);
-  process::wait(mock);
-}
-
-
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetectors)
-{
-  MockMasterDetectorListenerProcess mock1;
-  process::spawn(mock1);
-
-  Future<Nothing> newMasterDetected1;
-  EXPECT_CALL(mock1, newMasterDetected(mock1.self()))
-    .WillOnce(FutureSatisfy(&newMasterDetected1));
-
-  std::string master = "zk://" + server->connectString() + "/mesos";
-
-  Try<MasterDetector*> detector1 =
-    MasterDetector::create(master, mock1.self(), true, true);
-
-  ASSERT_SOME(detector1);
-
-  AWAIT_READY(newMasterDetected1);
-
-  MockMasterDetectorListenerProcess mock2;
-  process::spawn(mock2);
-
-  Future<Nothing> newMasterDetected2;
-  EXPECT_CALL(mock2, newMasterDetected(mock1.self())) // N.B. mock1
-    .WillOnce(FutureSatisfy(&newMasterDetected2));
-
-  Try<MasterDetector*> detector2 =
-    MasterDetector::create(master, mock2.self(), true, true);
-
-  ASSERT_SOME(detector2);
-
-  AWAIT_READY(newMasterDetected2);
-
-  // Destroying detector1 (below) might cause another election so we
-  // need to set up expectations appropriately.
-  EXPECT_CALL(mock2, newMasterDetected(_))
-    .WillRepeatedly(Return());
-
-  MasterDetector::destroy(detector1.get());
-
-  process::terminate(mock1);
-  process::wait(mock1);
-
-  MasterDetector::destroy(detector2.get());
-
-  process::terminate(mock2);
-  process::wait(mock2);
-}
-
-
-// Disabled due to MESOS-455.
-TEST_F(ZooKeeperMasterDetectorTest, DISABLED_MasterDetectorShutdownNetwork)
-{
-  Clock::pause();
-
-  MockMasterDetectorListenerProcess mock;
-  process::spawn(mock);
-
-  Future<Nothing> newMasterDetected1;
-  EXPECT_CALL(mock, newMasterDetected(mock.self()))
-    .WillOnce(FutureSatisfy(&newMasterDetected1));
-
-  std::string master = "zk://" + server->connectString() + "/mesos";
-
-  Try<MasterDetector*> detector =
-    MasterDetector::create(master, mock.self(), true, true);
-
-  ASSERT_SOME(detector);
-
-  AWAIT_READY(newMasterDetected1);
-
-  Future<Nothing> noMasterDetected;
-  EXPECT_CALL(mock, noMasterDetected())
-    .WillOnce(FutureSatisfy(&noMasterDetected));
-
-  server->shutdownNetwork();
-
-  Clock::advance(Seconds(10)); // TODO(benh): Get session timeout from detector.
-
-  AWAIT_READY(noMasterDetected);
-
-  Future<Nothing> newMasterDetected2;
-  EXPECT_CALL(mock, newMasterDetected(mock.self()))
-    .WillOnce(FutureSatisfy(&newMasterDetected2));
-
-  server->startNetwork();
-
-  AWAIT_READY(newMasterDetected2);
-
-  MasterDetector::destroy(detector.get());
-
-  process::terminate(mock);
-  process::wait(mock);
-
-  Clock::resume();
-}
-
-
-// Tests that a detector sends a NoMasterDetectedMessage when we
-// reach our ZooKeeper session timeout. This is to enforce that we
-// manually expire the session when we don't get reconnected within
-// the ZOOKEEPER_SESSION_TIMEOUT.
-TEST_F(ZooKeeperTest, MasterDetectorTimedoutSession)
-{
-  Try<zookeeper::URL> url =
-    zookeeper::URL::parse("zk://" + server->connectString() + "/mesos");
-  ASSERT_SOME(url);
-
-  // First we bring up three master detector listeners:
-  //   1. A leading contender.
-  //   2. A non-leading contender.
-  //   3. A non-contender.
-
-  // 1. Simulate a leading contender.
-  MockMasterDetectorListenerProcess leader;
-
-  Future<Nothing> newMasterDetected;
-  EXPECT_CALL(leader, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected));
-
-  process::spawn(leader);
-
-  ZooKeeperMasterDetector leaderDetector(
-      url.get(), leader.self(), true, true);
-
-  AWAIT_READY(newMasterDetected);
-
-  // 2. Simulate a non-leading contender.
-  MockMasterDetectorListenerProcess follower;
-
-  EXPECT_CALL(follower, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected));
-
-  process::spawn(follower);
-
-  ZooKeeperMasterDetector followerDetector(
-      url.get(), follower.self(), true, true);
-
-  AWAIT_READY(newMasterDetected);
-
-  // 3. Simulate a non-contender.
-  MockMasterDetectorListenerProcess nonContender;
-
-  EXPECT_CALL(nonContender, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected));
-
-  process::spawn(nonContender);
-
-  ZooKeeperMasterDetector nonContenderDetector(
-      url.get(), nonContender.self(), false, true);
-
-  AWAIT_READY(newMasterDetected);
-
-  // Now we want to induce lost connections on each of the
-  // master detectors.
-  // Induce a reconnection on the leader.
-  Future<Nothing> leaderReconnecting = FUTURE_DISPATCH(
-      leaderDetector.process->self(),
-      &ZooKeeperMasterDetectorProcess::reconnecting);
-
-  dispatch(leaderDetector.process,
-           &ZooKeeperMasterDetectorProcess::reconnecting);
-
-  AWAIT_READY(leaderReconnecting);
-
-  // Induce a reconnection on the follower.
-  Future<Nothing> followerReconnecting = FUTURE_DISPATCH(
-      followerDetector.process->self(),
-      &ZooKeeperMasterDetectorProcess::reconnecting);
-
-  dispatch(followerDetector.process,
-           &ZooKeeperMasterDetectorProcess::reconnecting);
-
-  AWAIT_READY(followerReconnecting);
-
-  // Induce a reconnection on the non-contender.
-  Future<Nothing> nonContenderReconnecting = FUTURE_DISPATCH(
-      nonContenderDetector.process->self(),
-      &ZooKeeperMasterDetectorProcess::reconnecting);
-
-  dispatch(nonContenderDetector.process,
-           &ZooKeeperMasterDetectorProcess::reconnecting);
-
-  AWAIT_READY(nonContenderReconnecting);
-
-  // Now induce the reconnection timeout.
-  Future<Nothing> leaderNoMasterDetected;
-  EXPECT_CALL(leader, noMasterDetected())
-    .WillOnce(FutureSatisfy(&leaderNoMasterDetected));
-
-  Future<Nothing> followerNoMasterDetected;
-  EXPECT_CALL(follower, noMasterDetected())
-    .WillOnce(FutureSatisfy(&followerNoMasterDetected));
-
-  Future<Nothing> nonContenderNoMasterDetected;
-  EXPECT_CALL(nonContender, noMasterDetected())
-    .WillOnce(FutureSatisfy(&nonContenderNoMasterDetected));
-
-  Clock::pause();
-  Clock::advance(ZOOKEEPER_SESSION_TIMEOUT);
-  Clock::settle();
-
-  AWAIT_READY(leaderNoMasterDetected);
-  AWAIT_READY(followerNoMasterDetected);
-  AWAIT_READY(nonContenderNoMasterDetected);
-
-  process::terminate(leader);
-  process::wait(leader);
-
-  process::terminate(follower);
-  process::wait(follower);
-
-  process::terminate(nonContender);
-  process::wait(nonContender);
-}
-
-
-// Tests whether a leading master correctly detects a new master when
-// its ZooKeeper session is expired.
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetectorExpireMasterZKSession)
-{
-  // Simulate a leading master.
-  MockMasterDetectorListenerProcess leader;
-
-  Future<Nothing> newMasterDetected1, newMasterDetected2;
-  EXPECT_CALL(leader, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected1))
-    .WillOnce(FutureSatisfy(&newMasterDetected2));
-
-  EXPECT_CALL(leader, noMasterDetected())
-    .Times(0);
-
-  process::spawn(leader);
-
-  std::string znode = "zk://" + server->connectString() + "/mesos";
-
-  Try<zookeeper::URL> url = zookeeper::URL::parse(znode);
-  ASSERT_SOME(url);
-
-  // Leader's detector.
-  ZooKeeperMasterDetector leaderDetector(
-      url.get(), leader.self(), true, true);
-
-  AWAIT_READY(newMasterDetected1);
-
-  // Simulate a following master.
-  MockMasterDetectorListenerProcess follower;
-
-  Future<Nothing> newMasterDetected3;
-  EXPECT_CALL(follower, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected3))
-    .WillRepeatedly(Return());
-
-  EXPECT_CALL(follower, noMasterDetected())
-    .Times(0);
-
-  process::spawn(follower);
-
-  // Follower's detector.
-  ZooKeeperMasterDetector followerDetector(
-      url.get(),
-      follower.self(),
-      true,
-      true);
-
-  AWAIT_READY(newMasterDetected3);
-
-  // Now expire the leader's zk session.
-  Future<int64_t> session = leaderDetector.session();
-  AWAIT_READY(session);
-
-  server->expireSession(session.get());
-
-  // Wait for session expiration and ensure we receive a
-  // NewMasterDetected message.
-  AWAIT_READY(newMasterDetected2);
-
-  process::terminate(follower);
-  process::wait(follower);
-
-  process::terminate(leader);
-  process::wait(leader);
-}
-
-
-// Tests whether a slave correctly DOES NOT disconnect from the master
-// when its ZooKeeper session is expired, but the master still stays
-// the leader when the slave re-connects with the ZooKeeper.
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetectorExpireSlaveZKSession)
-{
-  // Simulate a leading master.
-  MockMasterDetectorListenerProcess master;
-
-  Future<Nothing> newMasterDetected1;
-  EXPECT_CALL(master, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected1));
-
-  EXPECT_CALL(master, noMasterDetected())
-    .Times(0);
-
-  process::spawn(master);
-
-  std::string znode = "zk://" + server->connectString() + "/mesos";
-
-  Try<zookeeper::URL> url = zookeeper::URL::parse(znode);
-  ASSERT_SOME(url);
-
-  // Leading master's detector.
-  ZooKeeperMasterDetector masterDetector(
-      url.get(), master.self(), true, true);
-
-  AWAIT_READY(newMasterDetected1);
-
-  // Simulate a slave.
-  MockMasterDetectorListenerProcess slave;
-
-  Future<Nothing> newMasterDetected2, newMasterDetected3;
-  EXPECT_CALL(slave, newMasterDetected(_))
-    .Times(1)
-    .WillOnce(FutureSatisfy(&newMasterDetected2));
-
-  EXPECT_CALL(slave, noMasterDetected())
-    .Times(0);
-
-  process::spawn(slave);
-
-  // Slave's master detector.
-  ZooKeeperMasterDetector slaveDetector(
-      url.get(), slave.self(), false, true);
-
-  AWAIT_READY(newMasterDetected2);
-
-  // Now expire the slave's zk session.
-  Future<int64_t> session = slaveDetector.session();
-  AWAIT_READY(session);
-
-  server->expireSession(session.get());
-
-  // Wait for enough time to ensure no NewMasterDetected message is sent.
-  os::sleep(Seconds(4)); // ZooKeeper needs extra time for session expiration.
-
-  process::terminate(slave);
-  process::wait(slave);
-
-  process::terminate(master);
-  process::wait(master);
-}
-
-
-// Tests whether a slave correctly detects the new master when its
-// ZooKeeper session is expired and a new master is elected before the
-// slave reconnects with ZooKeeper.
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetectorExpireSlaveZKSessionNewMaster)
-{
-  // Simulate a leading master.
-  MockMasterDetectorListenerProcess master1;
-
-  Future<Nothing> newMasterDetected1;
-  EXPECT_CALL(master1, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected1))
-    .WillRepeatedly(Return());
-
-  EXPECT_CALL(master1, noMasterDetected())
-    .Times(0);
-
-  process::spawn(master1);
-
-  std::string znode = "zk://" + server->connectString() + "/mesos";
-
-  Try<zookeeper::URL> url = zookeeper::URL::parse(znode);
-  ASSERT_SOME(url);
-
-  // Leading master's detector.
-  ZooKeeperMasterDetector masterDetector1(
-      url.get(), master1.self(), true, true);
-
-  AWAIT_READY(newMasterDetected1);
-
-  // Simulate a non-leading master.
-  MockMasterDetectorListenerProcess master2;
-
-  Future<Nothing> newMasterDetected2;
-  EXPECT_CALL(master2, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected2))
-    .WillRepeatedly(Return());
-
-  EXPECT_CALL(master2, noMasterDetected())
-    .Times(0);
-
-  process::spawn(master2);
-
-  // Non-leading master's detector.
-  ZooKeeperMasterDetector masterDetector2(
-      url.get(), master2.self(), true, true);
-
-  AWAIT_READY(newMasterDetected2);
-
-  // Simulate a slave.
-  MockMasterDetectorListenerProcess slave;
-
-  Future<Nothing> newMasterDetected3, newMasterDetected4;
-  EXPECT_CALL(slave, newMasterDetected(_))
-    .WillOnce(FutureSatisfy(&newMasterDetected3))
-    .WillOnce(FutureSatisfy(&newMasterDetected4));
-
-  EXPECT_CALL(slave, noMasterDetected())
-    .Times(AtMost(1));
-
-  process::spawn(slave);
-
-  // Slave's master detector.
-  ZooKeeperMasterDetector slaveDetector(
-      url.get(), slave.self(), false, true);
-
-  AWAIT_READY(newMasterDetected3);
-
-  // Now expire the slave's and leading master's zk sessions.
-  // NOTE: Here we assume that slave stays disconnected from the ZK when the
-  // leading master loses its session.
-  Future<int64_t> slaveSession = slaveDetector.session();
-  AWAIT_READY(slaveSession);
-
-  server->expireSession(slaveSession.get());
-
-  Future<int64_t> masterSession = masterDetector1.session();
-  AWAIT_READY(masterSession);
-
-  server->expireSession(masterSession.get());
-
-  // Wait for session expiration and ensure we receive a
-  // NewMasterDetected message.
-  AWAIT_READY_FOR(newMasterDetected4, Seconds(10));
-
-  process::terminate(slave);
-  process::wait(slave);
-
-  process::terminate(master2);
-  process::wait(master2);
-
-  process::terminate(master1);
-  process::wait(master1);
-}
-#endif // MESOS_HAS_JAVA