You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/11/19 19:57:49 UTC

[8/8] git commit: Replaced usage of old detector with new Master contender and detector abstractions.

Replaced usage of old detector with new Master contender and detector
abstractions.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/15510


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f9d1dd81
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f9d1dd81
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f9d1dd81

Branch: refs/heads/master
Commit: f9d1dd819b6cc3843e4d1287ac10276d62cbfed4
Parents: bcd1dc4
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Nov 19 10:39:27 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Nov 19 10:57:19 2013 -0800

----------------------------------------------------------------------
 include/mesos/scheduler.hpp         |  12 +-
 src/Makefile.am                     |   3 +-
 src/cli/resolve.cpp                 |  27 +-
 src/detector/detector.cpp           | 580 -------------------------------
 src/detector/detector.hpp           | 216 ------------
 src/local/local.cpp                 |  22 +-
 src/local/main.cpp                  |   2 -
 src/master/http.cpp                 |   9 +-
 src/master/main.cpp                 |  35 +-
 src/master/master.cpp               | 128 ++++---
 src/master/master.hpp               |  31 +-
 src/messages/messages.proto         |  11 -
 src/sched/sched.cpp                 | 236 +++++++------
 src/slave/http.cpp                  |   4 +-
 src/slave/main.cpp                  |  15 +-
 src/slave/slave.cpp                 | 158 ++++-----
 src/slave/slave.hpp                 |  22 +-
 src/tests/allocator_tests.cpp       |   3 +-
 src/tests/authentication_tests.cpp  |  53 +--
 src/tests/cluster.hpp               | 113 ++++--
 src/tests/fault_tolerance_tests.cpp | 123 ++++---
 src/tests/gc_tests.cpp              |   3 +-
 src/tests/isolator_tests.cpp        |   3 +-
 src/tests/master_tests.cpp          |  27 +-
 src/tests/mesos.cpp                 |  41 +++
 src/tests/mesos.hpp                 |  59 ++++
 src/tests/slave_recovery_tests.cpp  |  16 +-
 27 files changed, 686 insertions(+), 1266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 380e087..161cc65 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -40,6 +40,7 @@ namespace mesos {
 class SchedulerDriver;
 
 namespace internal {
+class MasterDetector;
 class SchedulerProcess;
 }
 
@@ -386,9 +387,12 @@ private:
   FrameworkInfo framework;
   std::string master;
 
-  // Libprocess process for communicating with master.
+  // Used for communicating with the master.
   internal::SchedulerProcess* process;
 
+  // URL for the master (e.g., zk://, file://, etc).
+  std::string url;
+
   // Mutex to enforce all non-callbacks are execute serially.
   pthread_mutex_t mutex;
 
@@ -397,6 +401,12 @@ private:
 
   // Current status of the driver.
   Status status;
+
+  const Credential* credential;
+
+protected:
+  // Used to detect (i.e., choose) the master.
+  internal::MasterDetector* detector;
 };
 
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 969aead..feda34b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -183,7 +183,6 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	launcher/launcher.cpp						\
 	exec/exec.cpp							\
 	common/lock.cpp							\
-	detector/detector.cpp						\
 	common/date_utils.cpp						\
 	common/resources.cpp						\
 	common/attributes.cpp						\
@@ -220,7 +219,7 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp		\
 	common/protobuf_utils.hpp					\
 	common/lock.hpp							\
 	common/type_utils.hpp common/thread.hpp common/units.hpp	\
-	detector/detector.hpp examples/utils.hpp files/files.hpp	\
+	examples/utils.hpp files/files.hpp				\
 	hdfs/hdfs.hpp							\
 	launcher/launcher.hpp linux/cgroups.hpp				\
 	linux/fs.hpp local/flags.hpp local/local.hpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/cli/resolve.cpp
----------------------------------------------------------------------
diff --git a/src/cli/resolve.cpp b/src/cli/resolve.cpp
index b05f510..dddadfc 100644
--- a/src/cli/resolve.cpp
+++ b/src/cli/resolve.cpp
@@ -29,7 +29,7 @@
 #include <stout/strings.hpp>
 #include <stout/try.hpp>
 
-#include "detector/detector.hpp"
+#include "master/detector.hpp"
 
 using namespace mesos;
 using namespace mesos::internal;
@@ -117,19 +117,32 @@ int main(int argc, char** argv)
     return -1;
   }
 
-  Future<UPID> pid = mesos::internal::detect(master.get(), !verbose);
+  Try<MasterDetector*> detector = MasterDetector::create(master.get());
+
+  if (detector.isError()) {
+    cerr << "Failed to create a master detector: " << detector.error() << endl;
+    return -1;
+  }
+
+  Future<Result<UPID> > pid = detector.get()->detect();
 
   if (!pid.await(timeout)) {
     cerr << "Failed to detect master from '" << master.get()
          << "' within " << timeout << endl;
     return -1;
-  } else if (pid.isFailed()) {
-    cerr << "Failed to detect master from '" << master.get()
-         << "': " << pid.failure() << endl;
-    return -1;
+  } else {
+    // Not expecting detect() to fail or discard the future.
+    CHECK(pid.isReady());
+    if (pid.get().isError()) {
+      cerr << "Failed to detect master from '" << master.get()
+           << "': " << pid.failure() << endl;
+      return -1;
+    }
   }
 
-  cout << string(pid.get()).substr(7) << endl;
+  // The future is not satisfied unless the result is Some.
+  CHECK_SOME(pid.get());
+  cout << strings::remove(pid.get().get(), "master@") << endl;
 
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/detector/detector.cpp
----------------------------------------------------------------------
diff --git a/src/detector/detector.cpp b/src/detector/detector.cpp
deleted file mode 100644
index 8d9f118..0000000
--- a/src/detector/detector.cpp
+++ /dev/null
@@ -1,580 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <glog/logging.h>
-
-#include <fstream>
-#include <ios>
-#include <vector>
-
-#include <tr1/memory> // TODO(benh): Replace shared_ptr with unique_ptr.
-
-#include <process/delay.hpp>
-#include <process/dispatch.hpp>
-#include <process/future.hpp>
-#include <process/process.hpp>
-#include <process/protobuf.hpp>
-#include <process/timer.hpp>
-
-#include <stout/error.hpp>
-#include <stout/foreach.hpp>
-#include <stout/lambda.hpp>
-#include <stout/none.hpp>
-#include <stout/numify.hpp>
-#include <stout/option.hpp>
-#include <stout/try.hpp>
-
-#include "detector/detector.hpp"
-
-#include "logging/logging.hpp"
-
-#include "messages/messages.hpp"
-
-#include "zookeeper/authentication.hpp"
-#include "zookeeper/url.hpp"
-#include "zookeeper/watcher.hpp"
-#include "zookeeper/zookeeper.hpp"
-
-using process::Future;
-using process::Process;
-using process::Promise;
-using process::Timer;
-using process::UPID;
-using process::wait; // Necessary on some OS's to disambiguate.
-
-using std::pair;
-using std::string;
-using std::vector;
-
-namespace mesos {
-namespace internal {
-
-
-const Duration ZOOKEEPER_SESSION_TIMEOUT = Seconds(10);
-
-
-MasterDetector::~MasterDetector() {}
-
-
-Try<MasterDetector*> MasterDetector::create(
-    const string& master,
-    const UPID& pid,
-    bool contend,
-    bool quiet)
-{
-  if (master == "") {
-    if (contend) {
-      return new BasicMasterDetector(pid);
-    } else {
-      return Error("Cannot detect master");
-    }
-  } else if (master.find("zk://") == 0) {
-    Try<zookeeper::URL> url = zookeeper::URL::parse(master);
-    if (url.isError()) {
-      return Error(url.error());
-    }
-    if (url.get().path == "/") {
-      return Error(
-          "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
-    }
-    return new ZooKeeperMasterDetector(url.get(), pid, contend, quiet);
-  } else if (master.find("file://") == 0) {
-    const std::string& path = master.substr(7);
-    std::ifstream file(path.c_str());
-    if (!file.is_open()) {
-      return Error("Failed to open file at '" + path + "'");
-    }
-
-    std::string line;
-    getline(file, line);
-
-    if (!file) {
-      file.close();
-      return Error("Failed to read from file at '" + path + "'");
-    }
-
-    file.close();
-
-    return create(line, pid, contend, quiet);
-  }
-
-  // Okay, try and parse what we got as a PID.
-  process::UPID masterPid = master.find("master@") == 0
-    ? process::UPID(master)
-    : process::UPID("master@" + master);
-
-  if (!masterPid) {
-    return Error("Cannot parse '" + std::string(masterPid) + "'");
-  }
-
-  return new BasicMasterDetector(masterPid, pid);
-}
-
-
-void MasterDetector::destroy(MasterDetector *detector)
-{
-  if (detector != NULL)
-    delete detector;
-}
-
-
-BasicMasterDetector::BasicMasterDetector(const UPID& _master)
-  : master(_master)
-{
-  // Elect the master.
-  NewMasterDetectedMessage message;
-  message.set_pid(master);
-  process::post(master, message);
-}
-
-
-BasicMasterDetector::BasicMasterDetector(
-    const UPID& _master,
-    const UPID& pid,
-    bool elect)
-  : master(_master)
-{
-  if (elect) {
-    // Elect the master.
-    NewMasterDetectedMessage message;
-    message.set_pid(master);
-    process::post(master, message);
-  }
-
-  // Tell the pid about the master.
-  NewMasterDetectedMessage message;
-  message.set_pid(master);
-  process::post(pid, message);
-}
-
-
-BasicMasterDetector::BasicMasterDetector(
-    const UPID& _master,
-    const vector<UPID>& pids,
-    bool elect)
-  : master(_master)
-{
-  if (elect) {
-    // Elect the master.
-    NewMasterDetectedMessage message;
-    message.set_pid(master);
-    process::post(master, message);
-  }
-
-  // Tell each pid about the master.
-  foreach (const UPID& pid, pids) {
-    NewMasterDetectedMessage message;
-    message.set_pid(master);
-    process::post(pid, message);
-  }
-}
-
-
-BasicMasterDetector::~BasicMasterDetector() {}
-
-
-ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
-    const zookeeper::URL& _url,
-    const UPID& _pid,
-    bool _contend,
-    bool quiet)
-  : url(_url),
-    acl(url.authentication.isSome()
-        ? zookeeper::EVERYONE_READ_CREATOR_ALL
-        : ZOO_OPEN_ACL_UNSAFE),
-    pid(_pid),
-    contend(_contend),
-    watcher(NULL),
-    zk(NULL),
-    expire(false),
-    timer(),
-    currentMasterSeq(),
-    currentMasterPID()
-{
-  // Set verbosity level for underlying ZooKeeper library logging.
-  // TODO(benh): Put this in the C++ API.
-  zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
-}
-
-
-ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
-{
-  delete zk;
-  delete watcher;
-}
-
-
-void ZooKeeperMasterDetectorProcess::initialize()
-{
-  // Doing initialization here allows to avoid the race between
-  // instantiating the ZooKeeper instance and being spawned ourself.
-  watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
-  zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
-}
-
-
-int64_t ZooKeeperMasterDetectorProcess::session()
-{
-  CHECK_NOTNULL(zk);
-  return zk->getSessionId();
-}
-
-
-void ZooKeeperMasterDetectorProcess::connected(bool reconnect)
-{
-  if (!reconnect) {
-    LOG(INFO) << "Master detector (" << pid << ") connected to ZooKeeper ...";
-
-    if (url.authentication.isSome()) {
-      const std::string& scheme = url.authentication.get().scheme;
-      const std::string& credentials = url.authentication.get().credentials;
-      LOG(INFO) << "Authenticating to ZooKeeper using scheme '" << scheme << "'";
-      int code = zk->authenticate(scheme, credentials);
-      if (code != ZOK) {
-        LOG(FATAL) << "Failed to authenticate with ZooKeeper: "
-                   << zk->message(code);
-      }
-    }
-
-    // Assume the path (chroot) being used does not end with a "/".
-    CHECK(url.path.at(url.path.length() - 1) != '/');
-
-    // Create znode path (including intermediate znodes) as necessary.
-    LOG(INFO) << "Trying to create path '" << url.path << "' in ZooKeeper";
-
-    int code = zk->create(url.path, "", acl, 0, NULL, true);
-
-    // We fail all non-OK return codes except ZNODEEXISTS (since that
-    // means the path we were trying to create exists) and ZNOAUTH
-    // (since it's possible that the ACLs on 'dirname(url.path)' don't
-    // allow us to create a child znode but we are allowed to create
-    // children of 'url.path' itself, which will be determined below
-    // if we are contending). Note that it's also possible we got back
-    // a ZNONODE because we could not create one of the intermediate
-    // znodes (in which case we'll abort in the 'else' below since
-    // ZNONODE is non-retryable). TODO(benh): Need to check that we
-    // also can put a watch on the children of 'url.path'.
-    if (code != ZOK && code != ZNODEEXISTS && code != ZNOAUTH) {
-      LOG(FATAL) << "Failed to create '" << url.path
-                 << "' in ZooKeeper: " << zk->message(code);
-    }
-
-    if (contend) {
-      // We contend with the pid given in constructor.
-      string result;
-      int code = zk->create(url.path + "/", pid, acl,
-                            ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
-
-      if (code != ZOK) {
-        LOG(FATAL) << "Unable to create ephemeral child of '" << url.path
-                   << "' in ZooKeeper: %s" << zk->message(code);
-      }
-
-      LOG(INFO) << "Created ephemeral/sequence znode at '" << result << "'";
-    }
-
-    // Now determine who the master is (it may be us).
-    detectMaster();
-  } else {
-    LOG(INFO) << "Master detector (" << pid << ")  reconnected ...";
-
-    // Cancel and cleanup the reconnect timer (if necessary).
-    if (timer.isSome()) {
-      Timer::cancel(timer.get());
-      timer = None();
-    }
-
-    // If we decided to expire the session, make sure we delete the
-    // ZooKeeper instance so the session actually expires. We also
-    // create a new ZooKeeper instance for clients that want to
-    // continue detecting and/or contending (which is likely given
-    // that this code is getting executed).
-    if (expire) {
-      LOG(WARNING) << "Cleaning up after expired ZooKeeper session";
-
-      delete CHECK_NOTNULL(zk);
-      delete CHECK_NOTNULL(watcher);
-
-      watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
-      zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
-
-      expire = false;
-      return;
-    }
-
-    // We've reconnected and we didn't prematurely expire the session,
-    // but the master might have changed, so we should run an
-    // election. TODO(benh): Determine if this is really necessary or
-    // if the watch set via 'ZooKeeper::getChildren' in 'detectMaster'
-    // is sufficient (it should be).
-    detectMaster();
-  }
-}
-
-
-void ZooKeeperMasterDetectorProcess::reconnecting()
-{
-  LOG(INFO) << "Master detector (" << pid << ")  lost connection to ZooKeeper, "
-            << "attempting to reconnect ...";
-
-  // ZooKeeper won't tell us of a session expiration until we
-  // reconnect, which could occur much much later than the session was
-  // actually expired. This can lead to a prolonged split-brain
-  // scenario when network partitions occur. Rather than wait for a
-  // reconnection to occur (i.e., a network partition to be repaired)
-  // we create a local timer and "expire" our session prematurely if
-  // we haven't reconnected within the session expiration time
-  // out. Later, when we eventually do reconnect we can force the
-  // session to be expired if we decided locally to expire.
-  timer = process::delay(
-      ZOOKEEPER_SESSION_TIMEOUT, self(), &Self::timedout, zk->getSessionId());
-}
-
-
-void ZooKeeperMasterDetectorProcess::expired()
-{
-  LOG(WARNING) << "Master detector (" << pid << ")  ZooKeeper session expired!";
-
-  // Cancel and cleanup the reconnect timer (if necessary).
-  if (timer.isSome()) {
-    Timer::cancel(timer.get());
-    timer = None();
-  }
-
-  delete CHECK_NOTNULL(zk);
-  delete CHECK_NOTNULL(watcher);
-
-  watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
-  zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
-
-  expire = false;
-}
-
-
-void ZooKeeperMasterDetectorProcess::updated(const string& path)
-{
-  // A new master might have showed up and created a sequence
-  // identifier or a master may have died, determine who the master is now!
-  detectMaster();
-}
-
-
-void ZooKeeperMasterDetectorProcess::created(const string& path)
-{
-  LOG(FATAL) << "Unexpected ZooKeeper event (created) for '" << path << "'";
-}
-
-
-void ZooKeeperMasterDetectorProcess::deleted(const string& path)
-{
-  LOG(FATAL) << "Unexpected ZooKeeper event (deleted) for '" << path << "'";
-}
-
-
-void ZooKeeperMasterDetectorProcess::timedout(const int64_t& sessionId)
-{
-  CHECK_NOTNULL(zk);
-  if (timer.isSome() && zk->getSessionId() == sessionId) {
-    LOG(WARNING) << "Timed out waiting to reconnect to ZooKeeper "
-                 << "(sessionId=" << std::hex << sessionId << ")";
-    timer = None();
-    expire = true;
-
-    // TODO(bmahler): We always want to clear the sequence number
-    // prior to sending NoMasterDetectedMessage. It might be prudent
-    // to use a helper function to enforce this.
-    currentMasterSeq = "";  // Clear the master sequence number.
-    process::post(pid, NoMasterDetectedMessage());
-  }
-}
-
-
-void ZooKeeperMasterDetectorProcess::detectMaster()
-{
-  vector<string> results;
-
-  int code = zk->getChildren(url.path, true, &results);
-
-  if (code != ZOK) {
-    if (zk->retryable(code)) {
-      // NOTE: We don't expect a ZNONODE here because 'url.path' is always
-      // created in the connected() call. Despite that, we don't do a
-      // CHECK (code != ZNONODE) just to be safe in case the zk client library
-      // does return the code unexpectedly.
-      LOG(ERROR) << "Master detector (" << pid << ")  failed to get masters: "
-                 << zk->message(code);
-      return; // Try again when we reconnect.
-    } else {
-      LOG(FATAL) << "Non-retryable ZooKeeper error while getting masters: "
-                 << zk->message(code);
-    }
-  } else {
-    LOG(INFO) << "Master detector (" << pid << ")  found " << results.size()
-              << " registered masters";
-  }
-
-  string masterSeq;
-  long min = LONG_MAX;
-  foreach (const string& result, results) {
-    Try<int> i = numify<int>(result);
-    if (i.isError()) {
-      LOG(WARNING) << "Unexpected znode at '" << url.path
-                   << "': " << i.error();
-      continue;
-    }
-    if (i.get() < min) {
-      min = i.get();
-      masterSeq = result;
-    }
-  }
-
-  // No master present (lost or possibly hasn't come up yet).
-  if (masterSeq.empty()) {
-    LOG(INFO) << "Master detector (" << pid << ") couldn't find any masters";
-    currentMasterSeq = "";  // Clear the master sequence number.
-    process::post(pid, NoMasterDetectedMessage());
-  } else if (masterSeq != currentMasterSeq) {
-    // Okay, let's fetch the master pid from ZooKeeper.
-    string result;
-    code = zk->get(url.path + "/" + masterSeq, false, &result, NULL);
-
-    if (code != ZOK) {
-      // This is possible because the master might have failed since
-      // the invocation of ZooKeeper::getChildren above.
-      // It is fine to not send a NoMasterDetectedMessage here because,
-      // 1) If this is due to a connection loss or session expiration,
-      //    connected() or expired() will be called and the leader detection
-      //    code (detectMaster()) will be re-tried.
-      // 2) If this is due to no masters present (i.e., code == ZNONODE),
-      //    updated() will be called and the detectMaster() will be re-tried.
-      if (zk->retryable(code) || code == ZNONODE) {
-        LOG(ERROR) << "Master detector failed to fetch new master pid: "
-                   << zk->message(code);
-      } else {
-        LOG(FATAL) << "Non-retryable ZooKeeper error while fetching "
-                   << "new master pid: " << zk->message(code);
-      }
-    } else {
-      // Now let's parse what we fetched from ZooKeeper.
-      LOG(INFO) << "Master detector (" << pid << ")  got new master pid: "
-                << result;
-
-      UPID masterPid = result;
-
-      if (masterPid == UPID()) {
-        // TODO(benh): Maybe we should try again then!?!? Parsing
-        // might have failed because of DNS, and whoever is using the
-        // detector might sit "unconnected" indefinitely!
-        LOG(ERROR) << "Failed to parse new master pid!";
-        currentMasterSeq = "";  // Clear the master sequence number.
-        process::post(pid, NoMasterDetectedMessage());
-      } else {
-        currentMasterSeq = masterSeq;
-        currentMasterPID = masterPid;
-
-        NewMasterDetectedMessage message;
-        message.set_pid(currentMasterPID);
-        process::post(pid, message);
-      }
-    }
-  }
-}
-
-
-ZooKeeperMasterDetector::ZooKeeperMasterDetector(
-    const zookeeper::URL& url,
-    const UPID& pid,
-    bool contend,
-    bool quiet)
-{
-  process = new ZooKeeperMasterDetectorProcess(url, pid, contend, quiet);
-  spawn(process);
-}
-
-
-ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
-{
-  terminate(process);
-  wait(process);
-  delete process;
-}
-
-
-Future<int64_t> ZooKeeperMasterDetector::session()
-{
-  return dispatch(process, &ZooKeeperMasterDetectorProcess::session);
-}
-
-
-// A simple "listener" for doing one-time detections to support the
-// 'detect' function (see below). Note that this can't be
-// declared/defined inside of the 'detect' function because until
-// C++11 we can't have template arguments be local types (which
-// 'Listener' would be).
-class Listener : public ProtobufProcess<Listener>
-{
-public:
-  Future<UPID> future() { return promise.future(); }
-
-protected:
-  virtual void initialize()
-  {
-    // Stop listening if no one cares.
-    void(*terminate)(const UPID&, bool) = process::terminate;
-    promise.future().onDiscarded(lambda::bind(terminate, self(), true));
-
-    install<NewMasterDetectedMessage>(
-        &Listener::newMasterDetected,
-        &NewMasterDetectedMessage::pid);
-  }
-
-  void newMasterDetected(const UPID& pid)
-  {
-    promise.set(pid);
-    process::terminate(self());
-  }
-
-private:
-  Promise<UPID> promise;
-};
-
-
-Future<UPID> detect(const string& master, bool quiet)
-{
-  Listener* listener = new Listener();
-
-  // Save the future before we spawn.
-  Future<UPID> future = listener->future();
-
-  process::spawn(listener, true); // Let the GC clean up the Listener.
-
-  Try<MasterDetector*> detector =
-    MasterDetector::create(master, listener->self(), false, quiet);
-
-  if (detector.isError()) {
-    process::terminate(listener);
-    return Future<UPID>::failed(
-        "Failed to create a master detector: " + detector.error());
-  }
-
-  return future;
-}
-
-
-} // namespace internal {
-} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/detector/detector.hpp
----------------------------------------------------------------------
diff --git a/src/detector/detector.hpp b/src/detector/detector.hpp
deleted file mode 100644
index 3aaebfe..0000000
--- a/src/detector/detector.hpp
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __MASTER_DETECTOR_HPP__
-#define __MASTER_DETECTOR_HPP__
-
-#include <string>
-#include <iostream>
-#include <unistd.h>
-#include <climits>
-#include <cstdlib>
-
-#include <process/future.hpp>
-#include <process/pid.hpp>
-#include <process/process.hpp>
-#include <process/timer.hpp>
-
-#include <stout/try.hpp>
-
-#include "zookeeper/authentication.hpp"
-#include "zookeeper/url.hpp"
-#include "zookeeper/zookeeper.hpp"
-
-class Watcher;
-class ZooKeeper;
-
-namespace mesos {
-namespace internal {
-
-// Returns the current master PID (waiting until a master is elected).
-process::Future<process::UPID> detect(
-    const std::string& master,
-    bool quiet = true);
-
-
-// Forward declarations.
-class ZooKeeperMasterDetectorProcess;
-
-/**
- * Implements functionality for:
- *   a) detecting masters
- *   b) contending to be a master
- */
-class MasterDetector
-{
-public:
-  virtual ~MasterDetector() = 0;
-
-  /**
-   * Creates a master detector that, given the specified master (which
-   * may be a ZooKeeper URL), knows how to connect to the master, or
-   * contend to be a master. The master detector sends messages to the
-   * specified pid when a new master is elected, a master is lost,
-   * etc.
-   *
-   * @param master string possibly containing zk:// or file://
-   * @param pid libprocess pid to both receive our messages and be
-   *   used if we should contend
-   * @param contend true if should contend to be master
-   * @param quite true if should limit log output
-   * @return instance of MasterDetector
-   */
-  static Try<MasterDetector*> create(const std::string& master,
-                                     const process::UPID& pid,
-                                     bool contend = false,
-                                     bool quiet = true);
-
-  /**
-   * Cleans up and deallocates the detector.
-   */
-  static void destroy(MasterDetector* detector);
-};
-
-
-class BasicMasterDetector : public MasterDetector
-{
-public:
-  /**
-   * Create a new master detector where the specified pid contends to
-   * be the master and gets elected by default.
-   *
-   * @param master libprocess pid to send messages/updates and be the
-   * master
-   */
-  BasicMasterDetector(const process::UPID& master);
-
-  /**
-   * Create a new master detector where the 'master' pid is 
-   * the master (no contending).
-   *
-   * @param master libprocess pid to send messages/updates and be the
-   * master
-   * @param pid/pids libprocess pids to send messages/updates to regarding
-   * the master
-   * @param elect if true then contend and elect the specified master
-   */
-  BasicMasterDetector(const process::UPID& master,
-		      const process::UPID& pid,
-		      bool elect = false);
-
-  BasicMasterDetector(const process::UPID& master,
-		      const std::vector<process::UPID>& pids,
-		      bool elect = false);
-
-  virtual ~BasicMasterDetector();
-
-private:
-  const process::UPID master;
-};
-
-
-class ZooKeeperMasterDetector : public MasterDetector
-{
-public:
-  /**
-   * Uses ZooKeeper for both detecting masters and contending to be a
-   * master.
-   *
-   * @param url znode path of the master
-   * @param pid libprocess pid to send messages/updates to (and to
-   * use for contending to be a master)
-   * @param contend true if should contend to be master and false otherwise (not
-   * needed for slaves and frameworks)
-   * @param quiet verbosity logging level for underlying ZooKeeper library
-   */
-  ZooKeeperMasterDetector(const zookeeper::URL& url,
-                          const process::UPID& pid,
-                          bool contend,
-                          bool quiet);
-
-  virtual ~ZooKeeperMasterDetector();
-
-  /**
-   *  Returns the ZooKeeper session ID associated with this detector.
-   */
-  process::Future<int64_t> session();
-
-  // Visible for testing.
-  ZooKeeperMasterDetectorProcess* process;
-};
-
-
-// TODO(benh): Make this value configurable via flags and verify that
-// it is always LESS THAN the slave heartbeat timeout.
-extern const Duration ZOOKEEPER_SESSION_TIMEOUT;
-
-
-class ZooKeeperMasterDetectorProcess
-  : public process::Process<ZooKeeperMasterDetectorProcess>
-{
-public:
-  ZooKeeperMasterDetectorProcess(
-    const zookeeper::URL& url,
-    const process::UPID& pid,
-    bool contend,
-    bool quiet);
-
-  virtual ~ZooKeeperMasterDetectorProcess();
-
-  virtual void initialize();
-
-  // ZooKeeperMasterDetector implementation.
-  int64_t session();
-
-  // ZooKeeper events.
-  void connected(bool reconnect);
-  void reconnecting();
-  void expired();
-  void updated(const std::string& path);
-  void created(const std::string& path);
-  void deleted(const std::string& path);
-
-private:
-  // Handles reconnecting "timeouts" by prematurely expiring a session
-  // (only used for contending instances). TODO(benh): Remove 'const
-  // &' after fixing libprocess.
-  void timedout(const int64_t& sessionId);
-
-  // Attempts to detect a master.
-  void detectMaster();
-
-  const zookeeper::URL url;
-  const ACL_vector acl;
-
-  const process::UPID pid;
-  bool contend;
-
-  Watcher* watcher;
-  ZooKeeper* zk;
-
-  bool expire;
-  Option<process::Timer> timer;
-
-  std::string currentMasterSeq;
-  process::UPID currentMasterPID;
-};
-
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __MASTER_DETECTOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 180756a..83a7f91 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -27,12 +27,12 @@
 
 #include "local.hpp"
 
-#include "detector/detector.hpp"
-
 #include "logging/flags.hpp"
 #include "logging/logging.hpp"
 
 #include "master/allocator.hpp"
+#include "master/contender.hpp"
+#include "master/detector.hpp"
 #include "master/drf_sorter.hpp"
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
@@ -79,7 +79,8 @@ static state::protobuf::State* state = NULL;
 static Registrar* registrar = NULL;
 static Master* master = NULL;
 static map<Isolator*, Slave*> slaves;
-static MasterDetector* detector = NULL;
+static StandaloneMasterDetector* detector = NULL;
+static MasterContender* contender = NULL;
 static Files* files = NULL;
 
 
@@ -126,7 +127,11 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
     state = new state::protobuf::State(storage);
     registrar = new Registrar(state);
 
-    master = new Master(_allocator, registrar, files, flags);
+    contender = new StandaloneMasterContender();
+    detector = new StandaloneMasterDetector();
+    master =
+      new Master(_allocator, registrar, files, contender, detector, flags);
+    detector->appoint(master->self());
   }
 
   PID<Master> pid = process::spawn(master);
@@ -147,13 +152,13 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
     // Use a different work directory for each slave.
     flags.work_dir = path::join(flags.work_dir, stringify(i));
 
-    Slave* slave = new Slave(flags, true, isolator, files);
+    // NOTE: At this point detector is already initialized by the
+    // Master.
+    Slave* slave = new Slave(flags, true, detector, isolator, files);
     slaves[isolator] = slave;
     pids.push_back(process::spawn(slave));
   }
 
-  detector = new BasicMasterDetector(pid, pids, true);
-
   return pid;
 }
 
@@ -186,6 +191,9 @@ void shutdown()
     delete detector;
     detector = NULL;
 
+    delete contender;
+    contender = NULL;
+
     delete files;
     files = NULL;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/local/main.cpp
----------------------------------------------------------------------
diff --git a/src/local/main.cpp b/src/local/main.cpp
index 5995c53..da431b7 100644
--- a/src/local/main.cpp
+++ b/src/local/main.cpp
@@ -22,8 +22,6 @@
 #include <stout/os.hpp>
 #include <stout/stringify.hpp>
 
-#include "detector/detector.hpp"
-
 #include "local/flags.hpp"
 #include "local/local.hpp"
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index deb5c97..218906a 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -298,7 +298,7 @@ Future<Response> Master::Http::redirect(const Request& request)
   LOG(INFO) << "HTTP request for '" << request.path << "'";
 
   // If there's no leader, redirect to this master's base url.
-  UPID pid = master.leader != UPID() ? master.leader : master.self();
+  UPID pid = master.leader.isSome() ? master.leader.get() : master.self();
 
   Try<string> hostname = net::getHostname(pid.ip);
   if (hostname.isError()) {
@@ -316,7 +316,7 @@ Future<Response> Master::Http::stats(const Request& request)
 
   JSON::Object object;
   object.values["uptime"] = (Clock::now() - master.startTime).secs();
-  object.values["elected"] = master.elected; // Note: using int not bool.
+  object.values["elected"] = master.elected(); // Note: using int not bool.
   object.values["total_schedulers"] = master.frameworks.size();
   object.values["active_schedulers"] = master.getActiveFrameworks().size();
   object.values["activated_slaves"] = master.slaves.size();
@@ -393,9 +393,8 @@ Future<Response> Master::Http::state(const Request& request)
     object.values["cluster"] = master.flags.cluster.get();
   }
 
-  // TODO(benh): Use an Option for the leader PID.
-  if (master.leader != UPID()) {
-    object.values["leader"] = string(master.leader);
+  if (master.leader.isSome()) {
+    object.values["leader"] = string(master.leader.get());
   }
 
   if (master.flags.log_dir.isSome()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index 45caf9d..60c86b3 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -30,12 +30,12 @@
 
 #include "common/build.hpp"
 
-#include "detector/detector.hpp"
-
 #include "logging/flags.hpp"
 #include "logging/logging.hpp"
 
 #include "master/allocator.hpp"
+#include "master/contender.hpp"
+#include "master/detector.hpp"
 #include "master/drf_sorter.hpp"
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
@@ -46,8 +46,11 @@
 #include "state/storage.hpp"
 
 
+#include "zookeeper/detector.hpp"
+
 using namespace mesos::internal;
 using namespace mesos::internal::master;
+using namespace zookeeper;
 
 using mesos::MasterInfo;
 
@@ -146,13 +149,28 @@ int main(int argc, char** argv)
   Registrar* registrar = new Registrar(state);
 
   Files files;
-  Master* master = new Master(allocator, registrar, &files, flags);
-  process::spawn(master);
 
-  Try<MasterDetector*> detector =
-    MasterDetector::create(zk, master->self(), true, flags.quiet);
+  MasterContender* contender;
+  MasterDetector* detector;
+
+  Try<MasterContender*> contender_ = MasterContender::create(zk);
+  CHECK_SOME(contender_) << "Failed to create a master contender";
+  contender = contender_.get();
+
+  Try<MasterDetector*> detector_ = MasterDetector::create(zk);
+  CHECK_SOME(detector_) << "Failed to create a master detector";
+  detector = detector_.get();
 
-  CHECK_SOME(detector) << "Failed to create a master detector";
+  Master* master = new Master(
+      allocator, registrar, &files, contender, detector, flags);
+
+  if (zk == "") {
+    // It means we are using the standalone detector so we need to
+    // appoint this Master as the leader.
+    dynamic_cast<StandaloneMasterDetector*>(detector)->appoint(master->self());
+  }
+
+  process::spawn(master);
 
   process::wait(master->self());
   delete master;
@@ -163,7 +181,8 @@ int main(int argc, char** argv)
   delete state;
   delete storage;
 
-  MasterDetector::destroy(detector.get());
+  delete contender;
+  delete detector;
 
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index abab6ce..f65b344 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -189,27 +189,19 @@ private:
 Master::Master(
     Allocator* _allocator,
     Registrar* _registrar,
-    Files* _files)
-  : ProcessBase("master"),
-    http(*this),
-    flags(),
-    allocator(_allocator),
-    registrar(_registrar),
-    files(_files),
-    completedFrameworks(MAX_COMPLETED_FRAMEWORKS) {}
-
-
-Master::Master(
-    Allocator* _allocator,
-    Registrar* _registrar,
     Files* _files,
+    MasterContender* _contender,
+    MasterDetector* _detector,
     const Flags& _flags)
   : ProcessBase("master"),
     http(*this),
     flags(_flags),
+    leader(None()),
     allocator(_allocator),
     registrar(_registrar),
     files(_files),
+    contender(_contender),
+    detector(_detector),
     completedFrameworks(MAX_COMPLETED_FRAMEWORKS) {}
 
 
@@ -294,7 +286,6 @@ void Master::initialize()
 
   // The master ID is currently comprised of the current date, the IP
   // address and port from self() and the OS PID.
-
   Try<string> id =
     strings::format("%s-%u-%u-%d", DateUtils::currentDate(),
                     self().ip, self().port, getpid());
@@ -410,8 +401,6 @@ void Master::initialize()
   whitelistWatcher = new WhitelistWatcher(flags.whitelist, allocator);
   spawn(whitelistWatcher);
 
-  elected = false;
-
   nextFrameworkId = 0;
   nextSlaveId = 0;
   nextOfferId = 0;
@@ -436,13 +425,6 @@ void Master::initialize()
       &Master::submitScheduler,
       &SubmitSchedulerRequest::name);
 
-  install<NewMasterDetectedMessage>(
-      &Master::newMasterDetected,
-      &NewMasterDetectedMessage::pid);
-
-  install<NoMasterDetectedMessage>(
-      &Master::noMasterDetected);
-
   install<RegisterFrameworkMessage>(
       &Master::registerFramework,
       &RegisterFrameworkMessage::framework);
@@ -558,6 +540,12 @@ void Master::initialize()
         .onAny(defer(self(), &Self::fileAttached, lambda::_1, log.get()));
     }
   }
+
+  contender->initialize(self());
+
+  // Start contending to be a leading master.
+  contender->contend()
+    .onAny(defer(self(), &Master::contended, lambda::_1));
 }
 
 
@@ -692,34 +680,80 @@ void Master::submitScheduler(const string& name)
 }
 
 
-void Master::newMasterDetected(const UPID& pid)
+void Master::contended(const Future<Future<Nothing> >& _contended)
 {
-  // Check and see if we are (1) still waiting to be the elected
-  // master, (2) newly elected master, (3) no longer elected master,
-  // or (4) still elected master.
-
-  leader = pid;
-
-  if (leader != self() && !elected) {
-    LOG(INFO) << "Waiting to be master!";
-  } else if (leader == self() && !elected) {
-    LOG(INFO) << "Elected as master!";
-    elected = true;
-  } else if (leader != self() && elected) {
-    LOG(FATAL) << "No longer elected master ... committing suicide!";
-  } else if (leader == self() && elected) {
-    LOG(INFO) << "Still acting as master!";
+  if (_contended.isFailed()) {
+    CHECK(!elected()) << "Failed to contend so we should not be elected";
+    LOG(ERROR) << "Failed to contend when not elected: "
+               << _contended.failure() << "; contend again...";
+    contender->contend()
+      .onAny(defer(self(), &Master::contended, lambda::_1));
+    return;
+  }
+
+  CHECK(_contended.isReady()) <<
+    "Not expecting MasterContender to discard this future";
+
+  // Now that we know we have our candidacy registered, we start
+  // detecting who is the leader.
+  detector->detect()
+    .onAny(defer(self(), &Master::detected, lambda::_1));
+
+  // Watch for candidacy change.
+  _contended.get()
+    .onAny(defer(self(), &Master::lostCandidacy, lambda::_1));
+}
+
+
+void Master::lostCandidacy(const Future<Nothing>& lost)
+{
+  CHECK(!lost.isDiscarded())
+    << "Not expecting MasterContender to discard this future";
+
+  if (lost.isFailed()) {
+    LOG(ERROR) << "Failed to watch for candidacy: " << lost.failure();
+  }
+
+  if (elected()) {
+    EXIT(1) << "Lost leadership... committing suicide!";
+  } else {
+    LOG(INFO) << "Lost candidacy as a follower... Contend again";
+    contender->contend()
+      .onAny(defer(self(), &Master::contended, lambda::_1));
   }
 }
 
 
-void Master::noMasterDetected()
+void Master::detected(const Future<Result<UPID> >& _leader)
 {
-  if (elected) {
-    LOG(FATAL) << "No longer elected master ... committing suicide!";
+  CHECK(_leader.isReady())
+    << "Not expecting MasterContender to fail or discard this future";
+
+  bool wasElected = elected();
+  leader = _leader.get();
+
+  if (leader.isError()) {
+    if (wasElected) {
+      EXIT(1) << "Failed to detect the leading master while elected: "
+                 << leader.error() << "; committing suicide!";
+    } else {
+      LOG(ERROR) << "Failed to detect the leading master when not elected: "
+                 << leader.error();
+    }
   } else {
-    LOG(FATAL) << "No master detected (?) ... committing suicide!";
+    LOG(INFO) << "The newly elected leader is "
+              << (leader.isSome() ? leader.get() : "NONE");
+
+    if (!wasElected && elected()) {
+      LOG(INFO) << "Elected as the leading master!";
+    } else if (wasElected && !elected()) {
+      EXIT(1) << "Lost leadership... committing suicide!";
+    }
   }
+
+  // Keep detecting.
+  detector->detect(leader)
+    .onAny(defer(self(), &Master::detected, lambda::_1));
 }
 
 
@@ -736,7 +770,7 @@ void Master::registerFramework(
     return;
   }
 
-  if (!elected) {
+  if (!elected()) {
     LOG(WARNING) << "Ignoring register framework message since not elected yet";
     return;
   }
@@ -815,7 +849,7 @@ void Master::reregisterFramework(
     return;
   }
 
-  if (!elected) {
+  if (!elected()) {
     LOG(WARNING) << "Ignoring re-register framework message since "
                  << "not elected yet";
     return;
@@ -1197,7 +1231,7 @@ void Master::schedulerMessage(const SlaveID& slaveId,
 
 void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
 {
-  if (!elected) {
+  if (!elected()) {
     LOG(WARNING) << "Ignoring register slave message from "
                  << slaveInfo.hostname() << " since not elected yet";
     return;
@@ -1243,7 +1277,7 @@ void Master::reregisterSlave(
     const vector<ExecutorInfo>& executorInfos,
     const vector<Task>& tasks)
 {
-  if (!elected) {
+  if (!elected()) {
     LOG(WARNING) << "Ignoring re-register slave message from "
                  << slaveInfo.hostname() << " since not elected yet";
     return;

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e377af8..c86c1f1 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -46,6 +46,8 @@
 #include "files/files.hpp"
 
 #include "master/constants.hpp"
+#include "master/contender.hpp"
+#include "master/detector.hpp"
 #include "master/flags.hpp"
 #include "master/registrar.hpp"
 
@@ -85,21 +87,15 @@ class Master : public ProtobufProcess<Master>
 public:
   Master(allocator::Allocator* allocator,
          Registrar* registrar,
-         Files* files);
-
-  Master(allocator::Allocator* allocator,
-         Registrar* registrar,
          Files* files,
-         const Flags& flags);
+         MasterContender* contender,
+         MasterDetector* detector,
+         const Flags& flags = Flags());
 
   virtual ~Master();
 
   void submitScheduler(
       const std::string& name);
-  void newMasterDetected(
-      const UPID& pid);
-  void noMasterDetected();
-  void masterDetectionFailure();
   void registerFramework(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo);
@@ -195,6 +191,15 @@ protected:
   // Return connected frameworks that are not in the process of being removed
   std::vector<Framework*> getActiveFrameworks() const;
 
+  // Invoked when the contender has entered the contest.
+  void contended(const Future<Future<Nothing> >& contended);
+
+  // Invoked when the contender has lost the candidacy.
+  void lostCandidacy(const Future<Nothing>& lost);
+
+  // Invoked when there is a newly elected leading master.
+  void detected(const Future<Result<UPID> >& pid);
+
   // Process a launch tasks request (for a non-cancelled offer) by
   // launching the desired tasks (if the offer contains a valid set of
   // tasks) and reporting any unused resources to the allocator.
@@ -301,15 +306,19 @@ private:
 
   const Flags flags;
 
-  UPID leader; // Current leading master.
+  Result<UPID> leader; // Current leading master.
 
-  bool elected;
+  // Whether we are the current leading master.
+  bool elected() const { return leader.isSome() && leader.get() == self(); }
 
   allocator::Allocator* allocator;
   WhitelistWatcher* whitelistWatcher;
   Registrar* registrar;
   Files* files;
 
+  MasterContender* contender;
+  MasterDetector* detector;
+
   MasterInfo info;
 
   hashmap<FrameworkID, Framework*> frameworks;

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 71f68a0..1f264d5 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -332,17 +332,6 @@ message FrameworkExpiredMessage {
 message ShutdownMessage {}
 
 
-// Master detector messages.
-
-
-message NoMasterDetectedMessage {}
-
-
-message NewMasterDetectedMessage {
-  required string pid = 2;
-}
-
-
 message AuthenticateMessage {
   required string pid = 1; // PID that needs to be authenticated.
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 3abe72f..51f95bb 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -34,11 +34,14 @@
 
 #include <process/defer.hpp>
 #include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
 #include <process/dispatch.hpp>
 #include <process/id.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 
+#include <stout/check.hpp>
 #include <stout/duration.hpp>
 #include <stout/error.hpp>
 #include <stout/flags.hpp>
@@ -55,10 +58,12 @@
 #include "common/lock.hpp"
 #include "common/type_utils.hpp"
 
-#include "detector/detector.hpp"
+#include "master/detector.hpp"
 
 #include "local/local.hpp"
 
+#include "master/detector.hpp"
+
 #include "logging/flags.hpp"
 #include "logging/logging.hpp"
 
@@ -66,6 +71,7 @@
 
 using namespace mesos;
 using namespace mesos::internal;
+using namespace mesos::internal::master;
 
 using namespace process;
 
@@ -92,22 +98,20 @@ public:
                    Scheduler* _scheduler,
                    const FrameworkInfo& _framework,
                    const Option<Credential>& _credential,
-                   const string& _url,
+                   MasterDetector* _detector,
                    pthread_mutex_t* _mutex,
                    pthread_cond_t* _cond)
     : ProcessBase(ID::generate("scheduler")),
       driver(_driver),
       scheduler(_scheduler),
       framework(_framework),
-      url(_url),
       mutex(_mutex),
       cond(_cond),
       failover(_framework.has_id() && !framework.id().value().empty()),
-      master(UPID()),
+      master(None()),
       connected(false),
       aborted(false),
-      // TODO(benh): Add Try().
-      detector(Error("uninitialized")),
+      detector(_detector),
       credential(_credential),
       authenticatee(NULL),
       authenticating(None()),
@@ -123,24 +127,8 @@ public:
 protected:
   virtual void initialize()
   {
-    // The master detector needs to be created after this process is
-    // running so that the "master detected" message is not dropped.
     // TODO(benh): Get access to flags so that we can decide whether
     // or not to make ZooKeeper verbose.
-    detector = MasterDetector::create(url, self(), false);
-    if (detector.isError()) {
-      driver->abort();
-      scheduler->error(driver, detector.error());
-      return;
-    }
-
-    install<NewMasterDetectedMessage>(
-        &SchedulerProcess::newMasterDetected,
-        &NewMasterDetectedMessage::pid);
-
-    install<NoMasterDetectedMessage>(
-        &SchedulerProcess::noMasterDetected);
-
     install<FrameworkRegisteredMessage>(
         &SchedulerProcess::registered,
         &FrameworkRegisteredMessage::framework_id,
@@ -179,31 +167,30 @@ protected:
     install<FrameworkErrorMessage>(
         &SchedulerProcess::error,
         &FrameworkErrorMessage::message);
-  }
 
-  virtual void finalize()
-  {
-    if (detector.isSome()) {
-      MasterDetector::destroy(detector.get());
-    }
+    // Start detecting masters.
+    detector->detect(master)
+      .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1));
   }
 
-  void newMasterDetected(const UPID& pid)
+  void detected(const Future<Result<UPID> >& pid)
   {
     if (aborted) {
-      VLOG(1) << "Ignoring new master detected message because "
-              << "the driver is aborted!";
+      VLOG(1) << "Ignoring the master change because the driver is aborted!";
       return;
     }
 
-    VLOG(1) << "New master at " << pid;
-
-    master = pid;
-    link(master);
+    // Not expecting MasterDetector to discard or fail the future.
+    CHECK(pid.isReady());
+    master = pid.get();
 
-    // If master failed over, inform the scheduler about the
-    // disconnection.
     if (connected) {
+      // There are three cases here:
+      //   1. The master failed.
+      //   2. The master failed over to a new master.
+      //   3. The master failed over to the same master.
+      // In any case, we will reconnect (possibly immediately), so we
+      // must notify schedulers of the disconnection.
       Stopwatch stopwatch;
       if (FLAGS_v >= 1) {
         stopwatch.start();
@@ -216,47 +203,35 @@ protected:
 
     connected = false;
 
-    if (credential.isSome()) {
-      // Authenticate with the master.
-      authenticate();
-    } else {
-      // Proceed with registration without authentication.
-      LOG(INFO) << "No credentials provided."
-                << " Attempting to register without authentication";
-
-      doReliableRegistration();
-    }
-  }
+    if (master.isSome()) {
+      VLOG(1) << "New master detected at " << master.get();
+      link(master.get());
 
-  void noMasterDetected()
-  {
-    if (aborted) {
-      VLOG(1) << "Ignoring no master detected message because "
-              << "the driver is aborted!";
-      return;
-    }
-
-    VLOG(1) << "No master detected, waiting for another master";
+      if (credential.isSome()) {
+        // Authenticate with the master.
+        authenticate();
+      } else {
+        // Proceed with registration without authentication.
+        LOG(INFO) << "No credentials provided."
+                  << " Attempting to register without authentication";
 
-    // Inform the scheduler about the disconnection if the driver
-    // was previously registered with the master.
-    if (connected) {
-      Stopwatch stopwatch;
-      if (FLAGS_v >= 1) {
-        stopwatch.start();
+        doReliableRegistration();
       }
-
-      scheduler->disconnected(driver);
-
-      VLOG(1) << "Scheduler::disconnected took " << stopwatch.elapsed();
+    } else if (master.isNone()) {
+      // In this case, we don't actually invoke Scheduler::error
+      // since we might get reconnected to a master imminently.
+      VLOG(1) << "No master detected";
+    } else {
+      LOG(ERROR) << "Failed to detect master: " << master.error();
     }
 
-    // In this case, we don't actually invoke Scheduler::error
-    // since we might get reconnected to a master imminently.
-    connected = false;
-    master = UPID();
+    // Keep detecting masters.
+    LOG(INFO) << "Detecting new master";
+    detector->detect(master)
+      .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1));
   }
 
+
   void authenticate()
   {
     if (aborted) {
@@ -266,7 +241,7 @@ protected:
 
     authenticated = false;
 
-    if (!master) {
+    if (!master.isSome()) {
       return;
     }
 
@@ -282,7 +257,7 @@ protected:
       return;
     }
 
-    LOG(INFO) << "Authenticating with master " << master;
+    LOG(INFO) << "Authenticating with master " << master.get();
 
     CHECK_SOME(credential);
 
@@ -302,7 +277,7 @@ protected:
     //     'Authenticatee'.
     // --> '~Authenticatee()' is invoked by 'AuthenticateeProcess'.
     // TODO(vinod): Consider using 'Shared' to 'Owned' upgrade.
-    authenticating = authenticatee->authenticate(master)
+    authenticating = authenticatee->authenticate(master.get())
       .onAny(defer(self(), &Self::_authenticate));
 
     delay(Seconds(5),
@@ -324,9 +299,21 @@ protected:
     CHECK_SOME(authenticating);
     const Future<bool>& future = authenticating.get();
 
+    if (!master.isSome()) {
+      LOG(INFO) << "Ignoring _authenticate because the master is lost";
+      authenticating = None();
+      // Set it to false because we do not want further retries until
+      // a new master is detected.
+      // We obviously do not need to reauthenticate either even if
+      // 'reauthenticate' is currently true because the master is
+      // lost.
+      reauthenticate = false;
+      return;
+    }
+
     if (reauthenticate || !future.isReady()) {
       LOG(WARNING)
-        << "Failed to authenticate with master " << master << ": "
+        << "Failed to authenticate with master " << master.get() << ": "
         << (reauthenticate ? "master changed" :
            (future.isFailed() ? future.failure() : "future discarded"));
 
@@ -339,12 +326,12 @@ protected:
     }
 
     if (!future.get()) {
-      LOG(ERROR) << "Master " << master << " refused authentication";
+      LOG(ERROR) << "Master " << master.get() << " refused authentication";
       error("Master refused authentication");
       return;
     }
 
-    LOG(INFO) << "Successfully authenticated with master " << master;
+    LOG(INFO) << "Successfully authenticated with master " << master.get();
 
     authenticated = true;
     authenticating = None();
@@ -433,7 +420,7 @@ protected:
 
   void doReliableRegistration()
   {
-    if (connected || !master) {
+    if (connected || !master.isSome()) {
       return;
     }
 
@@ -445,13 +432,13 @@ protected:
       // Touched for the very first time.
       RegisterFrameworkMessage message;
       message.mutable_framework()->MergeFrom(framework);
-      send(master, message);
+      send(master.get(), message);
     } else {
       // Not the first time, or failing over.
       ReregisterFrameworkMessage message;
       message.mutable_framework()->MergeFrom(framework);
       message.set_failover(failover);
-      send(master, message);
+      send(master.get(), message);
     }
 
     delay(Seconds(1), self(), &Self::doReliableRegistration);
@@ -582,10 +569,10 @@ protected:
       return;
     }
 
-    if (from != master) {
+    if (!master.isSome() || from != master.get()) {
       LOG(WARNING) << "Ignoring lost slave message from " << from
                    << " because it is not from the registered master ("
-                   << master << ")";
+                   << (master.isSome() ? master.get() : "NONE/ERROR") << ")";
       return;
     }
 
@@ -657,7 +644,8 @@ protected:
     if (connected && !failover) {
       UnregisterFrameworkMessage message;
       message.mutable_framework_id()->MergeFrom(framework.id());
-      send(master, message);
+      CHECK_SOME(master);
+      send(master.get(), message);
     }
 
     Lock lock(mutex);
@@ -683,7 +671,8 @@ protected:
 
     DeactivateFrameworkMessage message;
     message.mutable_framework_id()->MergeFrom(framework.id());
-    send(master, message);
+    CHECK_SOME(master);
+    send(master.get(), message);
 
     Lock lock(mutex);
     pthread_cond_signal(cond);
@@ -699,7 +688,8 @@ protected:
     KillTaskMessage message;
     message.mutable_framework_id()->MergeFrom(framework.id());
     message.mutable_task_id()->MergeFrom(taskId);
-    send(master, message);
+    CHECK_SOME(master);
+    send(master.get(), message);
   }
 
   void requestResources(const vector<Request>& requests)
@@ -714,7 +704,8 @@ protected:
     foreach (const Request& request, requests) {
       message.add_requests()->MergeFrom(request);
     }
-    send(master, message);
+    CHECK_SOME(master);
+    send(master.get(), message);
   }
 
   void launchTasks(const OfferID& offerId,
@@ -822,7 +813,8 @@ protected:
     // Remove the offer since we saved all the PIDs we might use.
     savedOffers.erase(offerId);
 
-    send(master, message);
+    CHECK_SOME(master);
+    send(master.get(), message);
   }
 
   void reviveOffers()
@@ -834,7 +826,8 @@ protected:
 
     ReviveOffersMessage message;
     message.mutable_framework_id()->MergeFrom(framework.id());
-    send(master, message);
+    CHECK_SOME(master);
+    send(master.get(), message);
   }
 
   void sendFrameworkMessage(const ExecutorID& executorId,
@@ -874,7 +867,8 @@ protected:
       message.mutable_framework_id()->MergeFrom(framework.id());
       message.mutable_executor_id()->MergeFrom(executorId);
       message.set_data(data);
-      send(master, message);
+      CHECK_SOME(master);
+      send(master.get(), message);
     }
   }
 
@@ -892,7 +886,8 @@ protected:
       message.add_statuses()->MergeFrom(status);
     }
 
-    send(master, message);
+    CHECK_SOME(master);
+    send(master.get(), message);
   }
 
 private:
@@ -901,16 +896,15 @@ private:
   MesosSchedulerDriver* driver;
   Scheduler* scheduler;
   FrameworkInfo framework;
-  string url; // URL for the master (e.g., zk://, file://, etc).
   pthread_mutex_t* mutex;
   pthread_cond_t* cond;
   bool failover;
-  UPID master;
+  Result<UPID> master;
 
   volatile bool connected; // Flag to indicate if framework is registered.
   volatile bool aborted; // Flag to indicate if the driver is aborted.
 
-  Try<MasterDetector*> detector;
+  MasterDetector* detector;
 
   hashmap<OfferID, hashmap<SlaveID, UPID> > savedOffers;
   hashmap<SlaveID, UPID> savedSlavePids;
@@ -955,7 +949,9 @@ MesosSchedulerDriver::MesosSchedulerDriver(
     framework(_framework),
     master(_master),
     process(NULL),
-    status(DRIVER_NOT_STARTED)
+    status(DRIVER_NOT_STARTED),
+    credential(NULL),
+    detector(NULL)
 {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
 
@@ -1007,13 +1003,7 @@ MesosSchedulerDriver::MesosSchedulerDriver(
 
   CHECK(process == NULL);
 
-  if (pid.isSome()) {
-    process = new SchedulerProcess(
-        this, scheduler, framework, None(), pid.get(), &mutex, &cond);
-  } else {
-    process = new SchedulerProcess(
-        this, scheduler, framework, None(), master, &mutex, &cond);
-  }
+  url = pid.isSome() ? static_cast<string>(pid.get()) : master;
 }
 
 
@@ -1023,12 +1013,14 @@ MesosSchedulerDriver::MesosSchedulerDriver(
     Scheduler* _scheduler,
     const FrameworkInfo& _framework,
     const string& _master,
-    const Credential& credential)
+    const Credential& _credential)
   : scheduler(_scheduler),
     framework(_framework),
     master(_master),
     process(NULL),
-    status(DRIVER_NOT_STARTED)
+    status(DRIVER_NOT_STARTED),
+    credential(new Credential(_credential)),
+    detector(NULL)
 {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
 
@@ -1080,13 +1072,7 @@ MesosSchedulerDriver::MesosSchedulerDriver(
 
   CHECK(process == NULL);
 
-  if (pid.isSome()) {
-    process = new SchedulerProcess(
-        this, scheduler, framework, credential, pid.get(), &mutex, &cond);
-  } else {
-    process = new SchedulerProcess(
-        this, scheduler, framework, credential, master, &mutex, &cond);
-  }
+  url = pid.isSome() ? static_cast<string>(pid.get()) : master;
 }
 
 
@@ -1116,6 +1102,10 @@ MesosSchedulerDriver::~MesosSchedulerDriver()
   pthread_mutex_destroy(&mutex);
   pthread_cond_destroy(&cond);
 
+  if (detector != NULL) {
+    delete detector;
+  }
+
   // Check and see if we need to shutdown a local cluster.
   if (master == "local" || master == "localquiet") {
     local::shutdown();
@@ -1131,7 +1121,31 @@ Status MesosSchedulerDriver::start()
     return status;
   }
 
-  CHECK(process != NULL);
+  if (detector == NULL) {
+    Try<MasterDetector*> detector_ = MasterDetector::create(url);
+
+    if (detector_.isError()) {
+      status = DRIVER_ABORTED;
+      string message = "Failed to create a master detector for '" +
+      master + "': " + detector_.error();
+      scheduler->error(this, message);
+      return status;
+    }
+
+    // Save the detector so we can delete it later.
+    detector = detector_.get();
+  }
+
+  CHECK(process == NULL);
+
+  if (credential == NULL) {
+    process = new SchedulerProcess(
+        this, scheduler, framework, None(), detector, &mutex, &cond);
+  } else {
+    const Credential& cred = *credential;
+    process = new SchedulerProcess(
+        this, scheduler, framework, cred, detector, &mutex, &cond);
+  }
 
   spawn(process);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 62fbb37..2f0bd8d 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -288,7 +288,7 @@ Future<Response> Slave::Http::stats(const Request& request)
   object.values["lost_tasks"] = slave.stats.tasks[TASK_LOST];
   object.values["valid_status_updates"] = slave.stats.validStatusUpdates;
   object.values["invalid_status_updates"] = slave.stats.invalidStatusUpdates;
-  object.values["registered"] = slave.master ? "1" : "0";
+  object.values["registered"] = slave.master.isSome() ? "1" : "0";
   object.values["recovery_errors"] = slave.recoveryErrors;
 
   return OK(object, request.query.get("jsonp"));
@@ -317,7 +317,7 @@ Future<Response> Slave::Http::state(const Request& request)
   object.values["failed_tasks"] = slave.stats.tasks[TASK_FAILED];
   object.values["lost_tasks"] = slave.stats.tasks[TASK_LOST];
 
-  Try<string> masterHostname = net::getHostname(slave.master.ip);
+  Try<string> masterHostname = net::getHostname(slave.master.get().ip);
   if (masterHostname.isSome()) {
     object.values["master_hostname"] = masterHostname.get();
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index 750a127..e83cd9e 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -27,7 +27,7 @@
 
 #include "common/build.hpp"
 
-#include "detector/detector.hpp"
+#include "master/detector.hpp"
 
 #include "logging/logging.hpp"
 
@@ -127,19 +127,18 @@ int main(int argc, char** argv)
   LOG(INFO) << "Build: " << build::DATE << " by " << build::USER;
   LOG(INFO) << "Starting Mesos slave";
 
-  Files files;
-  Slave* slave = new Slave(flags, false, isolator, &files);
-  process::spawn(slave);
-
-  Try<MasterDetector*> detector =
-    MasterDetector::create(master.get(), slave->self(), false, flags.quiet);
+  Try<MasterDetector*> detector = MasterDetector::create(master.get());
 
   CHECK_SOME(detector) << "Failed to create a master detector";
 
+  Files files;
+  Slave* slave = new Slave(flags, false,  detector.get(), isolator, &files);
+  process::spawn(slave);
+
   process::wait(slave->self());
   delete slave;
 
-  MasterDetector::destroy(detector.get());
+  delete detector.get();
   Isolator::destroy(isolator);
 
   return 0;

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index bb98fce..a9be378 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -73,6 +73,7 @@ using namespace state;
 
 Slave::Slave(const slave::Flags& _flags,
              bool _local,
+             MasterDetector* _detector,
              Isolator* _isolator,
              Files* _files)
   : ProcessBase(ID::generate("slave")),
@@ -80,7 +81,9 @@ Slave::Slave(const slave::Flags& _flags,
     http(*this),
     flags(_flags),
     local(_local),
+    master(None()),
     completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
+    detector(_detector),
     isolator(_isolator),
     files(_files),
     monitor(_isolator),
@@ -281,13 +284,6 @@ void Slave::initialize()
   startTime = Clock::now();
 
   // Install protobuf handlers.
-  install<NewMasterDetectedMessage>(
-      &Slave::newMasterDetected,
-      &NewMasterDetectedMessage::pid);
-
-  install<NoMasterDetectedMessage>(
-      &Slave::noMasterDetected);
-
   install<SlaveRegisteredMessage>(
       &Slave::registered,
       &SlaveRegisteredMessage::slave_id);
@@ -433,10 +429,10 @@ void Slave::shutdown(const UPID& from)
   // Allow shutdown message only if
   // 1) Its a message received from the registered master or
   // 2) If its called locally (e.g tests)
-  if (from && from != master) {
+  if (from && (!master.isSome() || from != master.get())) {
     LOG(WARNING) << "Ignoring shutdown message from " << from
-                 << " because it is not from the registered master ("
-                 << master << ")";
+                 << " because it is not from the registered master: "
+                 << (master.isSome() ? master.get() : "None/Error");
     return;
   }
 
@@ -483,75 +479,67 @@ Nothing Slave::detachFile(const string& path)
 }
 
 
-void Slave::newMasterDetected(const UPID& pid)
+void Slave::detected(const Future<Result<UPID> >& pid)
 {
-  LOG(INFO) << "New master detected at " << pid;
-
-  master = pid;
-  link(master);
-
-  // Inform the status updates manager about the new master.
-  statusUpdateManager->newMasterDetected(master);
+  CHECK(state == DISCONNECTED ||
+        state == RUNNING ||
+        state == TERMINATING) << state;
 
-  if (flags.recover == "cleanup") {
-    LOG(INFO) << "Skipping registration because slave is in 'cleanup' mode";
-    return;
+  if (state != TERMINATING) {
+    state = DISCONNECTED;
   }
 
-  switch (state) {
-    case RECOVERING:
-      LOG(INFO) << "Postponing registration until recovery is complete";
-      break;
-    case DISCONNECTED:
-    case RUNNING:
-      state = DISCONNECTED;
-      doReliableRegistration();
-      break;
-    case TERMINATING:
-      LOG(INFO) << "Skipping registration because slave is terminating";
-      break;
-    default:
-      LOG(FATAL) << "Unexpected slave state " << state;
-      break;
-  }
-}
+  // Not expecting MasterDetector to discard or fail futures.
+  CHECK(pid.isReady());
+  master = pid.get();
 
+  if (master.isSome()) {
+    LOG(INFO) << "New master detected at " << master.get();
+    link(master.get());
 
-void Slave::noMasterDetected()
-{
-  LOG(INFO) << "Lost master(s) ... waiting";
-  master = UPID();
+    // Inform the status updates manager about the new master.
+    statusUpdateManager->newMasterDetected(master.get());
 
-  CHECK(state == RECOVERING || state == DISCONNECTED ||
-        state == RUNNING || state == TERMINATING)
-    << state;
+    if (state == TERMINATING) {
+      LOG(INFO) << "Skipping registration because slave is terminating";
+      return;
+    }
 
-  // We only change state if the slave is in RUNNING state because
-  // if the slave is in:
-  // RECOVERY: Slave needs to finish recovery before changing states.
-  // DISCONNECTED: Redundant.
-  // TERMINATING: Slave is shutting down.
-  // TODO(vinod): Subscribe to master detector after recovery.
-  // Similarly, unsubscribe from master detector during termination.
-  // Currently it is tricky because master detector is injected into
-  // the slave from outside.
-  if (state == RUNNING) {
-    state = DISCONNECTED;
+    // The slave does not (re-)register if it is in the cleanup mode
+    // because we do not want to accept new tasks.
+    if (flags.recover == "cleanup") {
+      LOG(INFO)
+        << "Skipping registration because slave was started in cleanup mode";
+      return;
+    }
+
+    doReliableRegistration();
+  } else if (master.isNone()) {
+    LOG(INFO) << "Lost leading master";
+  } else {
+    LOG(ERROR) << "Failed to detect a master: " << master.error();
   }
+
+  // Keep detecting masters.
+  LOG(INFO) << "Detecting new master";
+  detector->detect(master)
+    .onAny(defer(self(), &Slave::detected, lambda::_1));
 }
 
 
 void Slave::registered(const UPID& from, const SlaveID& slaveId)
 {
-  if (from != master) {
+  if (!master.isSome() || from != master.get()) {
     LOG(WARNING) << "Ignoring registration message from " << from
-                 << " because it is not the expected master " << master;
+                 << " because it is not the expected master: "
+                 << (master.isSome() ? master.get() : "NONE/ERROR");
     return;
   }
 
   switch(state) {
     case DISCONNECTED: {
-      LOG(INFO) << "Registered with master " << master
+      CHECK_SOME(master);
+      LOG(INFO) << "Registered with master " << master.get()
                 << "; given slave ID " << slaveId;
 
       state = RUNNING;
@@ -575,7 +563,8 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
        EXIT(1) << "Registered but got wrong id: " << slaveId
                << "(expected: " << info.id() << "). Committing suicide";
       }
-      LOG(WARNING) << "Already registered with master " << master;
+      CHECK_SOME(master);
+      LOG(WARNING) << "Already registered with master " << master.get();
       break;
     case TERMINATING:
       LOG(WARNING) << "Ignoring registration because slave is terminating";
@@ -590,15 +579,17 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
 
 void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
 {
-  if (from != master) {
+  if (!master.isSome() || from != master.get()) {
     LOG(WARNING) << "Ignoring re-registration message from " << from
-                 << " because it is not the expected master " << master;
+                 << " because it is not the expected master: "
+                 << (master.isSome() ? master.get() : "NONE/ERROR");
     return;
   }
 
   switch(state) {
     case DISCONNECTED:
-      LOG(INFO) << "Re-registered with master " << master;
+      CHECK_SOME(master);
+      LOG(INFO) << "Re-registered with master " << master.get();
 
       state = RUNNING;
       if (!(info.id() == slaveId)) {
@@ -612,7 +603,8 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
         EXIT(1) << "Re-registered but got wrong id: " << slaveId
                 << "(expected: " << info.id() << "). Committing suicide";
       }
-      LOG(WARNING) << "Already re-registered with master " << master;
+      CHECK_SOME(master);
+      LOG(WARNING) << "Already re-registered with master " << master.get();
       break;
     case TERMINATING:
       LOG(WARNING) << "Ignoring re-registration because slave is terminating";
@@ -633,7 +625,7 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
 
 void Slave::doReliableRegistration()
 {
-  if (!master) {
+  if (!master.isSome()) {
     LOG(INFO) << "Skipping registration because no master present";
     return;
   }
@@ -649,7 +641,7 @@ void Slave::doReliableRegistration()
     // (Vinod): Is the above comment true?
     RegisterSlaveMessage message;
     message.mutable_slave()->CopyFrom(info);
-    send(master, message);
+    send(master.get(), message);
   } else {
     // Re-registering, so send tasks running.
     ReregisterSlaveMessage message;
@@ -698,7 +690,9 @@ void Slave::doReliableRegistration()
         }
       }
     }
-    send(master, message);
+
+    CHECK_SOME(master);
+    send(master.get(), message);
   }
 
   // Retry registration if necessary.
@@ -1117,10 +1111,11 @@ void Slave::shutdownFramework(
   // Allow shutdownFramework() only if
   // its called directly (e.g. Slave::finalize()) or
   // its a message from the currently registered master.
-  if (from && from != master) {
+  if (from && (!master.isSome() || from != master.get())) {
     LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
-                 << " from " << from << " because it is not from the registered"
-                 << " master (" << master << ")";
+                 << " from " << from
+                 << " because it is not from the registered master ("
+                 << (master.isSome() ? master.get() : "NONE/ERROR") << ")";
     return;
   }
 
@@ -1944,7 +1939,7 @@ void Slave::exited(const UPID& pid)
 {
   LOG(INFO) << pid << " exited";
 
-  if (master == pid) {
+  if (!master.isSome() || master.get() == pid) {
     LOG(WARNING) << "Master disconnected!"
                  << " Waiting for a new master to be elected";
     // TODO(benh): After so long waiting for a master, commit suicide.
@@ -2236,7 +2231,7 @@ void Slave::executorTerminated(
         message.mutable_executor_id()->MergeFrom(executorId);
         message.set_status(status);
 
-        send(master, message);
+        if (master.isSome()) { send(master.get(), message); }
       }
 
       // Remove the executor if either the framework is terminating or
@@ -2747,6 +2742,9 @@ void Slave::__recover(const Future<Nothing>& future)
 
   LOG(INFO) << "Finished recovery";
 
+  CHECK_EQ(RECOVERING, state);
+  state = DISCONNECTED;
+
   // Schedule all old slave directories for garbage collection.
   // TODO(vinod): Do this as part of recovery. This needs a fix
   // in the recovery code, to recover all slaves instead of only
@@ -2790,18 +2788,12 @@ void Slave::__recover(const Future<Nothing>& future)
   // in 'cleanup' mode.
   if (frameworks.empty() && flags.recover == "cleanup") {
     terminate(self());
-  } else if (flags.recover == "reconnect") {
-    // Re-register if reconnecting.
-    // NOTE: Since the slave in cleanup mode never re-registers, if
-    // the master fails over it will not forward the updates from
-    // the "unknown" slave to the scheduler. This could lead to the
-    // slave waiting indefinitely for acknowledgements. The master's
-    // registrar could help in handling this correctly.
-    state = DISCONNECTED;
-    if (master) {
-      doReliableRegistration();
-    }
+    return;
   }
+
+  // Start detecting masters.
+  detector->detect(master)
+    .onAny(defer(self(), &Slave::detected, lambda::_1));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index b39eaf4..6d7c3e8 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -30,6 +30,7 @@
 #include <mesos/resources.hpp>
 
 #include <process/http.hpp>
+#include <process/future.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 
@@ -38,11 +39,14 @@
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
 #include <stout/multihashmap.hpp>
+#include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/owned.hpp>
 #include <stout/path.hpp>
 #include <stout/uuid.hpp>
 
+#include "master/detector.hpp"
+
 #include "slave/constants.hpp"
 #include "slave/flags.hpp"
 #include "slave/gc.hpp"
@@ -59,9 +63,11 @@
 
 #include "messages/messages.hpp"
 
-
 namespace mesos {
 namespace internal {
+
+class MasterDetector; // Forward declaration.
+
 namespace slave {
 
 using namespace process;
@@ -71,22 +77,19 @@ class StatusUpdateManager;
 struct Executor;
 struct Framework;
 
-
 class Slave : public ProtobufProcess<Slave>
 {
 public:
   Slave(const Flags& flags,
         bool local,
-        Isolator *isolator,
+        MasterDetector* detector,
+        Isolator* isolator,
         Files* files);
 
   virtual ~Slave();
 
   void shutdown(const process::UPID& from);
 
-  void newMasterDetected(const UPID& pid);
-  void noMasterDetected();
-  void masterDetectionFailure();
   void registered(const process::UPID& from, const SlaveID& slaveId);
   void reregistered(const process::UPID& from, const SlaveID& slaveId);
   void doReliableRegistration();
@@ -215,6 +218,9 @@ protected:
 
   Nothing detachFile(const std::string& path);
 
+  // Invoked whenever the detector detects a change in masters.
+  void detected(const Future<Result<UPID> >& pid);
+
   // Helper routine to lookup a framework.
   Framework* getFramework(const FrameworkID& frameworkId);
 
@@ -308,7 +314,7 @@ private:
 
   SlaveInfo info;
 
-  UPID master;
+  Result<UPID> master;
 
   Resources resources;
   Attributes attributes;
@@ -317,6 +323,8 @@ private:
 
   boost::circular_buffer<Owned<Framework> > completedFrameworks;
 
+  MasterDetector* detector;
+
   Isolator* isolator;
   Files* files;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator_tests.cpp b/src/tests/allocator_tests.cpp
index b0beb72..61ab235 100644
--- a/src/tests/allocator_tests.cpp
+++ b/src/tests/allocator_tests.cpp
@@ -30,9 +30,8 @@
 #include <process/gmock.hpp>
 #include <process/pid.hpp>
 
-#include "detector/detector.hpp"
-
 #include "master/allocator.hpp"
+#include "master/detector.hpp"
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/authentication_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/authentication_tests.cpp b/src/tests/authentication_tests.cpp
index 48a9323..cc8b7a9 100644
--- a/src/tests/authentication_tests.cpp
+++ b/src/tests/authentication_tests.cpp
@@ -277,8 +277,9 @@ TEST_F(AuthenticationTest, MasterFailover)
   ASSERT_SOME(master);
 
   MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+  Owned<StandaloneMasterDetector> detector =
+    new StandaloneMasterDetector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, detector.get());
 
   // Drop the authenticate message from the scheduler.
   Future<AuthenticateMessage> authenticateMessage =
@@ -287,7 +288,6 @@ TEST_F(AuthenticationTest, MasterFailover)
   driver.start();
 
   AWAIT_READY(authenticateMessage);
-  UPID frameworkPid = authenticateMessage.get().pid();
 
   // While the authentication is in progress simulate a failed over
   // master by restarting the master.
@@ -299,12 +299,8 @@ TEST_F(AuthenticationTest, MasterFailover)
   EXPECT_CALL(sched, registered(&driver, _, _))
     .WillOnce(FutureSatisfy(&registered));
 
-  // Send a new master detected message to inform the scheduler
-  // about the new master.
-  NewMasterDetectedMessage newMasterDetectedMsg;
-  newMasterDetectedMsg.set_pid(master.get());
-
-  process::post(frameworkPid, newMasterDetectedMsg);
+  // Appoint a new master and inform the scheduler about it.
+  detector->appoint(master.get());
 
   // Scheduler should successfully register with the new master.
   AWAIT_READY(registered);
@@ -325,11 +321,9 @@ TEST_F(AuthenticationTest, LeaderElection)
   ASSERT_SOME(master);
 
   MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  Future<AuthenticateMessage> authenticateMessage =
-    FUTURE_PROTOBUF(AuthenticateMessage(), _, _);
+  Owned<StandaloneMasterDetector> detector =
+    new StandaloneMasterDetector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, detector.get());
 
   // Drop the AuthenticationStepMessage from authenticator.
   Future<AuthenticationStepMessage> authenticationStepMessage =
@@ -337,10 +331,6 @@ TEST_F(AuthenticationTest, LeaderElection)
 
   driver.start();
 
-  // Grab the framework pid.
-  AWAIT_READY(authenticateMessage);
-  UPID frameworkPid = authenticateMessage.get().pid();
-
   // Drop the intermediate SASL message so that authentication fails.
   AWAIT_READY(authenticationStepMessage);
 
@@ -348,12 +338,8 @@ TEST_F(AuthenticationTest, LeaderElection)
   EXPECT_CALL(sched, registered(&driver, _, _))
     .WillOnce(FutureSatisfy(&registered));
 
-  // Send a new master detected message to inform the scheduler
-  // about the new master after a leader election.
-  NewMasterDetectedMessage newMasterDetectedMsg;
-  newMasterDetectedMsg.set_pid(master.get());
-
-  process::post(frameworkPid, newMasterDetectedMsg);
+  // Appoint a new master and inform the scheduler about it.
+  detector->appoint(master.get());
 
   // Scheduler should successfully register with the new master.
   AWAIT_READY(registered);
@@ -375,11 +361,9 @@ TEST_F(AuthenticationTest, SchedulerFailover)
 
   // Launch the first (i.e., failing) scheduler.
   MockScheduler sched1;
-  MesosSchedulerDriver driver1(
-      &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  Future<AuthenticateMessage> authenticateMessage =
-    FUTURE_PROTOBUF(AuthenticateMessage(), _, _);
+  Owned<StandaloneMasterDetector> detector =
+    new StandaloneMasterDetector(master.get());
+  TestingMesosSchedulerDriver driver1(&sched1, detector.get());
 
   Future<FrameworkID> frameworkId;
   EXPECT_CALL(sched1, registered(&driver1, _, _))
@@ -387,10 +371,6 @@ TEST_F(AuthenticationTest, SchedulerFailover)
 
   driver1.start();
 
-  // Grab the framework pid.
-  AWAIT_READY(authenticateMessage);
-  UPID frameworkPid = authenticateMessage.get().pid();
-
   AWAIT_READY(frameworkId);
 
   // Drop the AuthenticationStepMessage from authenticator
@@ -400,11 +380,8 @@ TEST_F(AuthenticationTest, SchedulerFailover)
 
   EXPECT_CALL(sched1, disconnected(&driver1));
 
-  // Send a NewMasterDetected message to elicit authentication.
-  NewMasterDetectedMessage newMasterDetectedMsg;
-  newMasterDetectedMsg.set_pid(master.get());
-
-  process::post(frameworkPid, newMasterDetectedMsg);
+  // Appoint a new master and inform the scheduler about it.
+  detector->appoint(master.get());
 
   AWAIT_READY(authenticationStepMessage);