You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ka...@apache.org on 2016/04/07 00:48:33 UTC

[03/11] mesos git commit: Moved contender and detector definitions into separate directories.

Moved contender and detector definitions into separate directories.

Updated Makefile.am.

Review: https://reviews.apache.org/r/44544/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cfbca013
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cfbca013
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cfbca013

Branch: refs/heads/master
Commit: cfbca0136af5fc07f87651bb0080f85a767a9925
Parents: a1d3d6b
Author: Anurag Singh <an...@gmail.com>
Authored: Wed Apr 6 15:08:18 2016 -0400
Committer: Kapil Arya <ka...@mesosphere.io>
Committed: Wed Apr 6 18:36:18 2016 -0400

----------------------------------------------------------------------
 src/Makefile.am                    |   6 +-
 src/master/contender/contender.cpp | 255 ++++++++++++++++
 src/master/contender/contender.hpp |  89 ++++++
 src/master/detector/detector.cpp   | 522 ++++++++++++++++++++++++++++++++
 src/master/detector/detector.hpp   |  98 ++++++
 5 files changed, 966 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index ba9cc8b..d095b98 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -624,8 +624,6 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   local/local.cpp							\
   logging/flags.cpp							\
   logging/logging.cpp							\
-  master/contender.cpp							\
-  master/detector.cpp							\
   master/flags.cpp							\
   master/http.cpp							\
   master/maintenance.cpp						\
@@ -642,6 +640,8 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   master/allocator/mesos/hierarchical.cpp				\
   master/allocator/mesos/metrics.cpp					\
   master/allocator/sorter/drf/sorter.cpp				\
+  master/contender/contender.cpp					\
+  master/detector/detector.cpp						\
   messages/messages.cpp							\
   module/manager.cpp							\
   sched/sched.cpp							\
@@ -739,8 +739,6 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   logging/flags.hpp							\
   logging/logging.hpp							\
   master/constants.hpp							\
-  master/contender.hpp							\
-  master/detector.hpp							\
   master/flags.hpp							\
   master/machine.hpp							\
   master/maintenance.hpp						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/master/contender/contender.cpp
----------------------------------------------------------------------
diff --git a/src/master/contender/contender.cpp b/src/master/contender/contender.cpp
new file mode 100644
index 0000000..95cec3e
--- /dev/null
+++ b/src/master/contender/contender.cpp
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <mesos/master/contender.hpp>
+
+#include <process/defer.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+
+#include <stout/check.hpp>
+#include <stout/lambda.hpp>
+#include <stout/protobuf.hpp>
+
+#include "master/constants.hpp"
+#include "master/contender.hpp"
+#include "master/master.hpp"
+
+#include "zookeeper/contender.hpp"
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+using std::string;
+
+using namespace process;
+using namespace zookeeper;
+
+namespace mesos {
+namespace master {
+namespace contender {
+
+using namespace internal;
+
+const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT = Seconds(10);
+
+
+class ZooKeeperMasterContenderProcess
+  : public Process<ZooKeeperMasterContenderProcess>
+{
+public:
+  explicit ZooKeeperMasterContenderProcess(const zookeeper::URL& url);
+  explicit ZooKeeperMasterContenderProcess(Owned<zookeeper::Group> group);
+  virtual ~ZooKeeperMasterContenderProcess();
+
+  // Explicitely use 'initialize' since we're overloading below.
+  using process::ProcessBase::initialize;
+
+  void initialize(const MasterInfo& masterInfo);
+
+  // MasterContender implementation.
+  virtual Future<Future<Nothing>> contend();
+
+private:
+  Owned<zookeeper::Group> group;
+  LeaderContender* contender;
+
+  // The master this contender contends on behalf of.
+  Option<MasterInfo> masterInfo;
+  Option<Future<Future<Nothing>>> candidacy;
+};
+
+
+Try<MasterContender*> MasterContender::create(const Option<string>& _mechanism)
+{
+  if (_mechanism.isNone()) {
+    return new StandaloneMasterContender();
+  }
+
+  string mechanism = _mechanism.get();
+
+  if (strings::startsWith(mechanism, "zk://")) {
+    Try<zookeeper::URL> url = zookeeper::URL::parse(mechanism);
+    if (url.isError()) {
+      return Error(url.error());
+    }
+    if (url.get().path == "/") {
+      return Error(
+          "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
+    }
+    return new ZooKeeperMasterContender(url.get());
+  } else if (strings::startsWith(mechanism, "file://")) {
+    // Load the configuration out of a file. While Mesos and related
+    // programs always use <stout/flags> to process the command line
+    // arguments (and therefore file://) this entrypoint is exposed by
+    // libmesos, with frameworks currently calling it and expecting it
+    // to do the argument parsing for them which roughly matches the
+    // argument parsing Mesos will do.
+    // TODO(cmaloney): Rework the libmesos exposed APIs to expose
+    // A "flags" endpoint where the framework can pass the command
+    // line arguments and they will be parsed by <stout/flags> and the
+    // needed flags extracted, and then change this interface to
+    // require final values from the flags. This means that a
+    // framework doesn't need to know how the flags are passed to
+    // match mesos' command line arguments if it wants, but if it
+    // needs to inspect/manipulate arguments, it can.
+    LOG(WARNING) << "Specifying master election mechanism / ZooKeeper URL to "
+                    "be read out of a file via 'file://' is deprecated inside "
+                    "Mesos and will be removed in a future release.";
+    const string& path = mechanism.substr(7);
+    const Try<string> read = os::read(path);
+    if (read.isError()) {
+      return Error("Failed to read from file at '" + path + "'");
+    }
+
+    return create(strings::trim(read.get()));
+  }
+
+  CHECK(!strings::startsWith(mechanism, "file://"));
+
+  return Error("Failed to parse '" + mechanism + "'");
+}
+
+
+MasterContender::~MasterContender() {}
+
+
+StandaloneMasterContender::~StandaloneMasterContender()
+{
+  if (promise != NULL) {
+    promise->set(Nothing()); // Leadership lost.
+    delete promise;
+  }
+}
+
+
+void StandaloneMasterContender::initialize(const MasterInfo& masterInfo)
+{
+  // We don't really need to store the master in this basic
+  // implementation so we just restore an 'initialized' flag to make
+  // sure it is called.
+  initialized = true;
+}
+
+
+Future<Future<Nothing>> StandaloneMasterContender::contend()
+{
+  if (!initialized) {
+    return Failure("Initialize the contender first");
+  }
+
+  if (promise != NULL) {
+    LOG(INFO) << "Withdrawing the previous membership before recontending";
+    promise->set(Nothing());
+    delete promise;
+  }
+
+  // Directly return a future that is always pending because it
+  // represents a membership/leadership that is not going to be lost
+  // until we 'withdraw'.
+  promise = new Promise<Nothing>();
+  return promise->future();
+}
+
+
+ZooKeeperMasterContender::ZooKeeperMasterContender(const zookeeper::URL& url)
+{
+  process = new ZooKeeperMasterContenderProcess(url);
+  spawn(process);
+}
+
+
+ZooKeeperMasterContender::ZooKeeperMasterContender(Owned<Group> group)
+{
+  process = new ZooKeeperMasterContenderProcess(group);
+  spawn(process);
+}
+
+
+ZooKeeperMasterContender::~ZooKeeperMasterContender()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+void ZooKeeperMasterContender::initialize(const MasterInfo& masterInfo)
+{
+  process->initialize(masterInfo);
+}
+
+
+Future<Future<Nothing>> ZooKeeperMasterContender::contend()
+{
+  return dispatch(process, &ZooKeeperMasterContenderProcess::contend);
+}
+
+
+ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
+    const zookeeper::URL& url)
+  : ZooKeeperMasterContenderProcess(Owned<Group>(
+    new Group(url, MASTER_CONTENDER_ZK_SESSION_TIMEOUT))) {}
+
+
+ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
+    Owned<Group> _group)
+  : ProcessBase(ID::generate("zookeeper-master-contender")),
+    group(_group),
+    contender(NULL) {}
+
+
+ZooKeeperMasterContenderProcess::~ZooKeeperMasterContenderProcess()
+{
+  delete contender;
+}
+
+void ZooKeeperMasterContenderProcess::initialize(const MasterInfo& _masterInfo)
+{
+  masterInfo = _masterInfo;
+}
+
+
+Future<Future<Nothing>> ZooKeeperMasterContenderProcess::contend()
+{
+  if (masterInfo.isNone()) {
+    return Failure("Initialize the contender first");
+  }
+
+  // Should not recontend if the last election is still ongoing.
+  if (candidacy.isSome() && candidacy.get().isPending()) {
+    return candidacy.get();
+  }
+
+  if (contender != NULL) {
+    LOG(INFO) << "Withdrawing the previous membership before recontending";
+    delete contender;
+  }
+
+  // Serialize the MasterInfo to JSON.
+  JSON::Object json = JSON::protobuf(masterInfo.get());
+
+  contender = new LeaderContender(
+      group.get(),
+      stringify(json),
+      mesos::internal::master::MASTER_INFO_JSON_LABEL);
+  candidacy = contender->contend();
+  return candidacy.get();
+}
+
+} // namespace contender  {
+} // namespace master {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/master/contender/contender.hpp
----------------------------------------------------------------------
diff --git a/src/master/contender/contender.hpp b/src/master/contender/contender.hpp
new file mode 100644
index 0000000..ba05551
--- /dev/null
+++ b/src/master/contender/contender.hpp
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __MASTER_CONTENDER_HPP__
+#define __MASTER_CONTENDER_HPP__
+
+#include <mesos/master/contender.hpp>
+
+#include <process/defer.hpp>
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/pid.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+
+#include "messages/messages.hpp"
+
+#include "zookeeper/contender.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+namespace mesos {
+namespace internal {
+
+extern const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT;
+
+class ZooKeeperMasterContenderProcess;
+
+// A basic implementation which assumes only one master is
+// contending.
+class StandaloneMasterContender : public MasterContender
+{
+public:
+  StandaloneMasterContender()
+    : initialized(false),
+      promise(NULL) {}
+
+  virtual ~StandaloneMasterContender();
+
+  // MasterContender implementation.
+  virtual void initialize(const MasterInfo& masterInfo);
+
+  // In this basic implementation the outer Future directly returns
+  // and inner Future stays pending because there is only one
+  // contender in the contest.
+  virtual process::Future<process::Future<Nothing> > contend();
+
+private:
+  bool initialized;
+  process::Promise<Nothing>* promise;
+};
+
+
+class ZooKeeperMasterContender : public MasterContender
+{
+public:
+  // Creates a contender that uses ZooKeeper to determine (i.e.,
+  // elect) a leading master.
+  explicit ZooKeeperMasterContender(const zookeeper::URL& url);
+  explicit ZooKeeperMasterContender(process::Owned<zookeeper::Group> group);
+
+  virtual ~ZooKeeperMasterContender();
+
+  // MasterContender implementation.
+  virtual void initialize(const MasterInfo& masterInfo);
+  virtual process::Future<process::Future<Nothing> > contend();
+
+private:
+  ZooKeeperMasterContenderProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MASTER_CONTENDER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/master/detector/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector/detector.cpp b/src/master/detector/detector.cpp
new file mode 100644
index 0000000..ad9c209
--- /dev/null
+++ b/src/master/detector/detector.cpp
@@ -0,0 +1,522 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <set>
+#include <string>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/logging.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/protobuf.hpp>
+
+#include <mesos/master/detector.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "master/constants.hpp"
+#include "master/detector.hpp"
+#include "master/master.hpp"
+
+#include "messages/messages.hpp"
+
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+using namespace process;
+using namespace zookeeper;
+
+using std::set;
+using std::string;
+
+namespace mesos {
+namespace master {
+namespace detector {
+
+const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10);
+
+// TODO(bmahler): Consider moving these kinds of helpers into
+// libprocess or a common header within mesos.
+namespace promises {
+
+// Helper for setting a set of Promises.
+template <typename T>
+void set(std::set<Promise<T>* >* promises, const T& t)
+{
+  foreach (Promise<T>* promise, *promises) {
+    promise->set(t);
+    delete promise;
+  }
+  promises->clear();
+}
+
+
+// Helper for failing a set of Promises.
+template <typename T>
+void fail(std::set<Promise<T>* >* promises, const string& failure)
+{
+  foreach (Promise<Option<MasterInfo> >* promise, *promises) {
+    promise->fail(failure);
+    delete promise;
+  }
+  promises->clear();
+}
+
+
+// Helper for discarding a set of Promises.
+template <typename T>
+void discard(std::set<Promise<T>* >* promises)
+{
+  foreach (Promise<T>* promise, *promises) {
+    promise->discard();
+    delete promise;
+  }
+  promises->clear();
+}
+
+
+// Helper for discarding an individual promise in the set.
+template <typename T>
+void discard(std::set<Promise<T>* >* promises, const Future<T>& future)
+{
+  foreach (Promise<T>* promise, *promises) {
+    if (promise->future() == future) {
+      promise->discard();
+      promises->erase(promise);
+      delete promise;
+      return;
+    }
+  }
+}
+
+} // namespace promises {
+
+
+class StandaloneMasterDetectorProcess
+  : public Process<StandaloneMasterDetectorProcess>
+{
+public:
+  StandaloneMasterDetectorProcess()
+    : ProcessBase(ID::generate("standalone-master-detector")) {}
+  explicit StandaloneMasterDetectorProcess(const MasterInfo& _leader)
+    : ProcessBase(ID::generate("standalone-master-detector")),
+      leader(_leader) {}
+
+  ~StandaloneMasterDetectorProcess()
+  {
+    promises::discard(&promises);
+  }
+
+  void appoint(const Option<MasterInfo>& leader_)
+  {
+    leader = leader_;
+
+    promises::set(&promises, leader);
+  }
+
+  Future<Option<MasterInfo> > detect(
+      const Option<MasterInfo>& previous = None())
+  {
+    if (leader != previous) {
+      return leader;
+    }
+
+    Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >();
+
+    promise->future()
+      .onDiscard(defer(self(), &Self::discard, promise->future()));
+
+    promises.insert(promise);
+    return promise->future();
+  }
+
+private:
+  void discard(const Future<Option<MasterInfo> >& future)
+  {
+    // Discard the promise holding this future.
+    promises::discard(&promises, future);
+  }
+
+  Option<MasterInfo> leader; // The appointed master.
+  set<Promise<Option<MasterInfo> >*> promises;
+};
+
+
+class ZooKeeperMasterDetectorProcess
+  : public Process<ZooKeeperMasterDetectorProcess>
+{
+public:
+  explicit ZooKeeperMasterDetectorProcess(const zookeeper::URL& url);
+  explicit ZooKeeperMasterDetectorProcess(Owned<Group> group);
+  ~ZooKeeperMasterDetectorProcess();
+
+  virtual void initialize();
+  Future<Option<MasterInfo> > detect(const Option<MasterInfo>& previous);
+
+private:
+  void discard(const Future<Option<MasterInfo> >& future);
+
+  // Invoked when the group leadership has changed.
+  void detected(const Future<Option<Group::Membership> >& leader);
+
+  // Invoked when we have fetched the data associated with the leader.
+  void fetched(
+      const Group::Membership& membership,
+      const Future<Option<string> >& data);
+
+  Owned<Group> group;
+  LeaderDetector detector;
+
+  // The leading Master.
+  Option<MasterInfo> leader;
+  set<Promise<Option<MasterInfo> >*> promises;
+
+  // Potential non-retryable error.
+  Option<Error> error;
+};
+
+
+Try<MasterDetector*> MasterDetector::create(const Option<string>& _mechanism)
+{
+  if (_mechanism.isNone()) {
+    return new StandaloneMasterDetector();
+  }
+
+  string mechanism = _mechanism.get();
+
+  if (strings::startsWith(mechanism, "zk://")) {
+    Try<zookeeper::URL> url = zookeeper::URL::parse(mechanism);
+    if (url.isError()) {
+      return Error(url.error());
+    }
+    if (url.get().path == "/") {
+      return Error(
+          "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
+    }
+    return new ZooKeeperMasterDetector(url.get());
+  } else if (strings::startsWith(mechanism, "file://")) {
+    // Load the configuration out of a file. While Mesos and related
+    // programs always use <stout/flags> to process the command line
+    // arguments (and therefore file://) this entrypoint is exposed by
+    // libmesos, with frameworks currently calling it and expecting it
+    // to do the argument parsing for them which roughly matches the
+    // argument parsing Mesos will do.
+    // TODO(cmaloney): Rework the libmesos exposed APIs to expose
+    // A "flags" endpoint where the framework can pass the command
+    // line arguments and they will be parsed by <stout/flags> and the
+    // needed flags extracted, and then change this interface to
+    // require final values from the flags. This means that a
+    // framework doesn't need to know how the flags are passed to
+    // match mesos' command line arguments if it wants, but if it
+    // needs to inspect/manipulate arguments, it can.
+    LOG(WARNING) << "Specifying master detection mechanism / ZooKeeper URL to "
+                    "be read out of a file via 'file://' is deprecated inside "
+                    "Mesos and will be removed in a future release.";
+    const string& path = mechanism.substr(7);
+    const Try<string> read = os::read(path);
+    if (read.isError()) {
+      return Error("Failed to read from file at '" + path + "'");
+    }
+
+    return create(strings::trim(read.get()));
+  }
+
+  CHECK(!strings::startsWith(mechanism, "file://"));
+
+  // Okay, try and parse what we got as a PID.
+  UPID pid = mechanism.find("master@") == 0
+             ? UPID(mechanism)
+             : UPID("master@" + mechanism);
+
+  if (!pid) {
+    return Error("Failed to parse '" + mechanism + "'");
+  }
+
+  return new StandaloneMasterDetector(
+      internal::protobuf::createMasterInfo(pid));
+}
+
+
+MasterDetector::~MasterDetector() {}
+
+
+StandaloneMasterDetector::StandaloneMasterDetector()
+{
+  process = new StandaloneMasterDetectorProcess();
+  spawn(process);
+}
+
+
+StandaloneMasterDetector::StandaloneMasterDetector(const MasterInfo& leader)
+{
+  process = new StandaloneMasterDetectorProcess(leader);
+  spawn(process);
+}
+
+
+StandaloneMasterDetector::StandaloneMasterDetector(const UPID& leader)
+{
+  process = new StandaloneMasterDetectorProcess(
+      mesos::internal::protobuf::createMasterInfo(leader));
+
+  spawn(process);
+}
+
+
+StandaloneMasterDetector::~StandaloneMasterDetector()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+void StandaloneMasterDetector::appoint(const Option<MasterInfo>& leader)
+{
+  dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader);
+}
+
+
+void StandaloneMasterDetector::appoint(const UPID& leader)
+{
+  dispatch(process,
+           &StandaloneMasterDetectorProcess::appoint,
+           mesos::internal::protobuf::createMasterInfo(leader));
+}
+
+
+Future<Option<MasterInfo> > StandaloneMasterDetector::detect(
+    const Option<MasterInfo>& previous)
+{
+  return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous);
+}
+
+
+// TODO(benh): Get ZooKeeper timeout from configuration.
+ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
+    const zookeeper::URL& url)
+  : ZooKeeperMasterDetectorProcess(Owned<Group>(
+    new Group(url.servers,
+              MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
+              url.path,
+              url.authentication))) {}
+
+
+ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
+    Owned<Group> _group)
+  : ProcessBase(ID::generate("zookeeper-master-detector")),
+    group(_group),
+    detector(group.get()),
+    leader(None()) {}
+
+
+ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
+{
+  promises::discard(&promises);
+}
+
+
+void ZooKeeperMasterDetectorProcess::initialize()
+{
+  detector.detect()
+    .onAny(defer(self(), &Self::detected, lambda::_1));
+}
+
+
+void ZooKeeperMasterDetectorProcess::discard(
+    const Future<Option<MasterInfo> >& future)
+{
+  // Discard the promise holding this future.
+  promises::discard(&promises, future);
+}
+
+
+Future<Option<MasterInfo> > ZooKeeperMasterDetectorProcess::detect(
+    const Option<MasterInfo>& previous)
+{
+  // Return immediately if the detector is no longer operational due
+  // to a non-retryable error.
+  if (error.isSome()) {
+    return Failure(error.get().message);
+  }
+
+  if (leader != previous) {
+    return leader;
+  }
+
+  Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >();
+
+  promise->future()
+    .onDiscard(defer(self(), &Self::discard, promise->future()));
+
+  promises.insert(promise);
+  return promise->future();
+}
+
+
+void ZooKeeperMasterDetectorProcess::detected(
+    const Future<Option<Group::Membership> >& _leader)
+{
+  CHECK(!_leader.isDiscarded());
+
+  if (_leader.isFailed()) {
+    LOG(ERROR) << "Failed to detect the leader: " << _leader.failure();
+
+    // Setting this error stops the detection loop and the detector
+    // transitions to an erroneous state. Further calls to detect()
+    // will directly fail as a result.
+    error = Error(_leader.failure());
+    leader = None();
+
+    promises::fail(&promises, _leader.failure());
+
+    return;
+  }
+
+  if (_leader.get().isNone()) {
+    leader = None();
+
+    promises::set(&promises, leader);
+  } else {
+    // Fetch the data associated with the leader.
+    group->data(_leader.get().get())
+      .onAny(defer(self(), &Self::fetched, _leader.get().get(), lambda::_1));
+  }
+
+  // Keep trying to detect leadership changes.
+  detector.detect(_leader.get())
+    .onAny(defer(self(), &Self::detected, lambda::_1));
+}
+
+
+void ZooKeeperMasterDetectorProcess::fetched(
+    const Group::Membership& membership,
+    const Future<Option<string> >& data)
+{
+  CHECK(!data.isDiscarded());
+
+  if (data.isFailed()) {
+    leader = None();
+    promises::fail(&promises, data.failure());
+    return;
+  } else if (data.get().isNone()) {
+    // Membership is gone before we can read its data.
+    leader = None();
+    promises::set(&promises, leader);
+    return;
+  }
+
+  // Parse the data based on the membership label and cache the
+  // leader for subsequent requests.
+  Option<string> label = membership.label();
+  if (label.isNone()) {
+    // If we are here it means some masters are still creating znodes
+    // with the old format.
+    UPID pid = UPID(data.get().get());
+    LOG(WARNING) << "Leading master " << pid << " has data in old format";
+    leader = mesos::internal::protobuf::createMasterInfo(pid);
+  } else if (label.isSome() &&
+             label.get() == mesos::internal::master::MASTER_INFO_LABEL) {
+    MasterInfo info;
+    if (!info.ParseFromString(data.get().get())) {
+      leader = None();
+      promises::fail(&promises, "Failed to parse data into MasterInfo");
+      return;
+    }
+    LOG(WARNING) << "Leading master " << info.pid()
+                 << " is using a Protobuf binary format when registering with "
+                 << "ZooKeeper (" << label.get() << "): this will be deprecated"
+                 << " as of Mesos 0.24 (see MESOS-2340)";
+    leader = info;
+  } else if (label.isSome() &&
+             label.get() == mesos::internal::master::MASTER_INFO_JSON_LABEL) {
+    Try<JSON::Object> object = JSON::parse<JSON::Object>(data.get().get());
+
+    if (object.isError()) {
+      leader = None();
+      promises::fail(
+          &promises,
+          "Failed to parse data into valid JSON: " + object.error());
+      return;
+    }
+
+    Try<mesos::MasterInfo> info =
+      ::protobuf::parse<mesos::MasterInfo>(object.get());
+
+    if (info.isError()) {
+      leader = None();
+      promises::fail(
+          &promises,
+          "Failed to parse JSON into a valid MasterInfo protocol buffer: " +
+          info.error());
+      return;
+    }
+
+    leader = info.get();
+  } else {
+    leader = None();
+    promises::fail(
+        &promises,
+        "Failed to parse data of unknown label '" + label.get() + "'");
+    return;
+  }
+
+  LOG(INFO) << "A new leading master (UPID="
+            << UPID(leader.get().pid()) << ") is detected";
+
+  promises::set(&promises, leader);
+}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(const zookeeper::URL& url)
+{
+  process = new ZooKeeperMasterDetectorProcess(url);
+  spawn(process);
+}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(Owned<Group> group)
+{
+  process = new ZooKeeperMasterDetectorProcess(group);
+  spawn(process);
+}
+
+
+ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+Future<Option<MasterInfo> > ZooKeeperMasterDetector::detect(
+    const Option<MasterInfo>& previous)
+{
+  return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous);
+}
+
+} // namespace detector {
+} // namespace master {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/master/detector/detector.hpp
----------------------------------------------------------------------
diff --git a/src/master/detector/detector.hpp b/src/master/detector/detector.hpp
new file mode 100644
index 0000000..8400265
--- /dev/null
+++ b/src/master/detector/detector.hpp
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __MASTER_DETECTOR_HPP__
+#define __MASTER_DETECTOR_HPP__
+
+#include <string>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/option.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+
+#include "messages/messages.hpp"
+
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+namespace mesos {
+namespace internal {
+
+extern const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT;
+
+// Forward declarations.
+class StandaloneMasterDetectorProcess;
+class ZooKeeperMasterDetectorProcess;
+
+// A standalone implementation of the MasterDetector with no external
+// discovery mechanism so the user has to manually appoint a leader
+// to the detector for it to be detected.
+class StandaloneMasterDetector : public MasterDetector
+{
+public:
+  StandaloneMasterDetector();
+  // Use this constructor if the leader is known beforehand so it is
+  // unnecessary to call 'appoint()' separately.
+  explicit StandaloneMasterDetector(const MasterInfo& leader);
+
+  // Same as above but takes UPID as the parameter.
+  explicit StandaloneMasterDetector(const process::UPID& leader);
+
+  virtual ~StandaloneMasterDetector();
+
+  // Appoint the leading master so it can be *detected*.
+  void appoint(const Option<MasterInfo>& leader);
+
+  // Same as above but takes 'UPID' as the parameter.
+  void appoint(const process::UPID& leader);
+
+  virtual process::Future<Option<MasterInfo> > detect(
+      const Option<MasterInfo>& previous = None());
+
+private:
+  StandaloneMasterDetectorProcess* process;
+};
+
+
+class ZooKeeperMasterDetector : public MasterDetector
+{
+public:
+  // Creates a detector which uses ZooKeeper to determine (i.e.,
+  // elect) a leading master.
+  explicit ZooKeeperMasterDetector(const zookeeper::URL& url);
+  // Used for testing purposes.
+  explicit ZooKeeperMasterDetector(process::Owned<zookeeper::Group> group);
+  virtual ~ZooKeeperMasterDetector();
+
+  // MasterDetector implementation.
+  // The detector transparently tries to recover from retryable
+  // errors until the group session expires, in which case the Future
+  // returns None.
+  virtual process::Future<Option<MasterInfo> > detect(
+      const Option<MasterInfo>& previous = None());
+
+private:
+  ZooKeeperMasterDetectorProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MASTER_DETECTOR_HPP__