You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ka...@apache.org on 2016/04/07 00:48:33 UTC
[03/11] mesos git commit: Moved contender and detector definitions
into separate directories.
Moved contender and detector definitions into separate directories.
Updated Makefile.am.
Review: https://reviews.apache.org/r/44544/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cfbca013
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cfbca013
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cfbca013
Branch: refs/heads/master
Commit: cfbca0136af5fc07f87651bb0080f85a767a9925
Parents: a1d3d6b
Author: Anurag Singh <an...@gmail.com>
Authored: Wed Apr 6 15:08:18 2016 -0400
Committer: Kapil Arya <ka...@mesosphere.io>
Committed: Wed Apr 6 18:36:18 2016 -0400
----------------------------------------------------------------------
src/Makefile.am | 6 +-
src/master/contender/contender.cpp | 255 ++++++++++++++++
src/master/contender/contender.hpp | 89 ++++++
src/master/detector/detector.cpp | 522 ++++++++++++++++++++++++++++++++
src/master/detector/detector.hpp | 98 ++++++
5 files changed, 966 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index ba9cc8b..d095b98 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -624,8 +624,6 @@ libmesos_no_3rdparty_la_SOURCES += \
local/local.cpp \
logging/flags.cpp \
logging/logging.cpp \
- master/contender.cpp \
- master/detector.cpp \
master/flags.cpp \
master/http.cpp \
master/maintenance.cpp \
@@ -642,6 +640,8 @@ libmesos_no_3rdparty_la_SOURCES += \
master/allocator/mesos/hierarchical.cpp \
master/allocator/mesos/metrics.cpp \
master/allocator/sorter/drf/sorter.cpp \
+ master/contender/contender.cpp \
+ master/detector/detector.cpp \
messages/messages.cpp \
module/manager.cpp \
sched/sched.cpp \
@@ -739,8 +739,6 @@ libmesos_no_3rdparty_la_SOURCES += \
logging/flags.hpp \
logging/logging.hpp \
master/constants.hpp \
- master/contender.hpp \
- master/detector.hpp \
master/flags.hpp \
master/machine.hpp \
master/maintenance.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/master/contender/contender.cpp
----------------------------------------------------------------------
diff --git a/src/master/contender/contender.cpp b/src/master/contender/contender.cpp
new file mode 100644
index 0000000..95cec3e
--- /dev/null
+++ b/src/master/contender/contender.cpp
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <mesos/master/contender.hpp>
+
+#include <process/defer.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+
+#include <stout/check.hpp>
+#include <stout/lambda.hpp>
+#include <stout/protobuf.hpp>
+
+#include "master/constants.hpp"
+#include "master/contender.hpp"
+#include "master/master.hpp"
+
+#include "zookeeper/contender.hpp"
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+using std::string;
+
+using namespace process;
+using namespace zookeeper;
+
+namespace mesos {
+namespace master {
+namespace contender {
+
+using namespace internal;
+
+const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT = Seconds(10);
+
+
+class ZooKeeperMasterContenderProcess
+ : public Process<ZooKeeperMasterContenderProcess>
+{
+public:
+ explicit ZooKeeperMasterContenderProcess(const zookeeper::URL& url);
+ explicit ZooKeeperMasterContenderProcess(Owned<zookeeper::Group> group);
+ virtual ~ZooKeeperMasterContenderProcess();
+
+ // Explicitely use 'initialize' since we're overloading below.
+ using process::ProcessBase::initialize;
+
+ void initialize(const MasterInfo& masterInfo);
+
+ // MasterContender implementation.
+ virtual Future<Future<Nothing>> contend();
+
+private:
+ Owned<zookeeper::Group> group;
+ LeaderContender* contender;
+
+ // The master this contender contends on behalf of.
+ Option<MasterInfo> masterInfo;
+ Option<Future<Future<Nothing>>> candidacy;
+};
+
+
+Try<MasterContender*> MasterContender::create(const Option<string>& _mechanism)
+{
+ if (_mechanism.isNone()) {
+ return new StandaloneMasterContender();
+ }
+
+ string mechanism = _mechanism.get();
+
+ if (strings::startsWith(mechanism, "zk://")) {
+ Try<zookeeper::URL> url = zookeeper::URL::parse(mechanism);
+ if (url.isError()) {
+ return Error(url.error());
+ }
+ if (url.get().path == "/") {
+ return Error(
+ "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
+ }
+ return new ZooKeeperMasterContender(url.get());
+ } else if (strings::startsWith(mechanism, "file://")) {
+ // Load the configuration out of a file. While Mesos and related
+ // programs always use <stout/flags> to process the command line
+ // arguments (and therefore file://) this entrypoint is exposed by
+ // libmesos, with frameworks currently calling it and expecting it
+ // to do the argument parsing for them which roughly matches the
+ // argument parsing Mesos will do.
+ // TODO(cmaloney): Rework the libmesos exposed APIs to expose
+ // A "flags" endpoint where the framework can pass the command
+ // line arguments and they will be parsed by <stout/flags> and the
+ // needed flags extracted, and then change this interface to
+ // require final values from the flags. This means that a
+ // framework doesn't need to know how the flags are passed to
+ // match mesos' command line arguments if it wants, but if it
+ // needs to inspect/manipulate arguments, it can.
+ LOG(WARNING) << "Specifying master election mechanism / ZooKeeper URL to "
+ "be read out of a file via 'file://' is deprecated inside "
+ "Mesos and will be removed in a future release.";
+ const string& path = mechanism.substr(7);
+ const Try<string> read = os::read(path);
+ if (read.isError()) {
+ return Error("Failed to read from file at '" + path + "'");
+ }
+
+ return create(strings::trim(read.get()));
+ }
+
+ CHECK(!strings::startsWith(mechanism, "file://"));
+
+ return Error("Failed to parse '" + mechanism + "'");
+}
+
+
+MasterContender::~MasterContender() {}
+
+
+StandaloneMasterContender::~StandaloneMasterContender()
+{
+ if (promise != NULL) {
+ promise->set(Nothing()); // Leadership lost.
+ delete promise;
+ }
+}
+
+
+void StandaloneMasterContender::initialize(const MasterInfo& masterInfo)
+{
+ // We don't really need to store the master in this basic
+ // implementation so we just restore an 'initialized' flag to make
+ // sure it is called.
+ initialized = true;
+}
+
+
+Future<Future<Nothing>> StandaloneMasterContender::contend()
+{
+ if (!initialized) {
+ return Failure("Initialize the contender first");
+ }
+
+ if (promise != NULL) {
+ LOG(INFO) << "Withdrawing the previous membership before recontending";
+ promise->set(Nothing());
+ delete promise;
+ }
+
+ // Directly return a future that is always pending because it
+ // represents a membership/leadership that is not going to be lost
+ // until we 'withdraw'.
+ promise = new Promise<Nothing>();
+ return promise->future();
+}
+
+
+ZooKeeperMasterContender::ZooKeeperMasterContender(const zookeeper::URL& url)
+{
+ process = new ZooKeeperMasterContenderProcess(url);
+ spawn(process);
+}
+
+
+ZooKeeperMasterContender::ZooKeeperMasterContender(Owned<Group> group)
+{
+ process = new ZooKeeperMasterContenderProcess(group);
+ spawn(process);
+}
+
+
+ZooKeeperMasterContender::~ZooKeeperMasterContender()
+{
+ terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+void ZooKeeperMasterContender::initialize(const MasterInfo& masterInfo)
+{
+ process->initialize(masterInfo);
+}
+
+
+Future<Future<Nothing>> ZooKeeperMasterContender::contend()
+{
+ return dispatch(process, &ZooKeeperMasterContenderProcess::contend);
+}
+
+
+ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
+ const zookeeper::URL& url)
+ : ZooKeeperMasterContenderProcess(Owned<Group>(
+ new Group(url, MASTER_CONTENDER_ZK_SESSION_TIMEOUT))) {}
+
+
+ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
+ Owned<Group> _group)
+ : ProcessBase(ID::generate("zookeeper-master-contender")),
+ group(_group),
+ contender(NULL) {}
+
+
+ZooKeeperMasterContenderProcess::~ZooKeeperMasterContenderProcess()
+{
+ delete contender;
+}
+
+void ZooKeeperMasterContenderProcess::initialize(const MasterInfo& _masterInfo)
+{
+ masterInfo = _masterInfo;
+}
+
+
+Future<Future<Nothing>> ZooKeeperMasterContenderProcess::contend()
+{
+ if (masterInfo.isNone()) {
+ return Failure("Initialize the contender first");
+ }
+
+ // Should not recontend if the last election is still ongoing.
+ if (candidacy.isSome() && candidacy.get().isPending()) {
+ return candidacy.get();
+ }
+
+ if (contender != NULL) {
+ LOG(INFO) << "Withdrawing the previous membership before recontending";
+ delete contender;
+ }
+
+ // Serialize the MasterInfo to JSON.
+ JSON::Object json = JSON::protobuf(masterInfo.get());
+
+ contender = new LeaderContender(
+ group.get(),
+ stringify(json),
+ mesos::internal::master::MASTER_INFO_JSON_LABEL);
+ candidacy = contender->contend();
+ return candidacy.get();
+}
+
+} // namespace contender {
+} // namespace master {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/master/contender/contender.hpp
----------------------------------------------------------------------
diff --git a/src/master/contender/contender.hpp b/src/master/contender/contender.hpp
new file mode 100644
index 0000000..ba05551
--- /dev/null
+++ b/src/master/contender/contender.hpp
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __MASTER_CONTENDER_HPP__
+#define __MASTER_CONTENDER_HPP__
+
+#include <mesos/master/contender.hpp>
+
+#include <process/defer.hpp>
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/pid.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+
+#include "messages/messages.hpp"
+
+#include "zookeeper/contender.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+namespace mesos {
+namespace internal {
+
+extern const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT;
+
+class ZooKeeperMasterContenderProcess;
+
+// A basic implementation which assumes only one master is
+// contending.
+class StandaloneMasterContender : public MasterContender
+{
+public:
+ StandaloneMasterContender()
+ : initialized(false),
+ promise(NULL) {}
+
+ virtual ~StandaloneMasterContender();
+
+ // MasterContender implementation.
+ virtual void initialize(const MasterInfo& masterInfo);
+
+ // In this basic implementation the outer Future directly returns
+ // and inner Future stays pending because there is only one
+ // contender in the contest.
+ virtual process::Future<process::Future<Nothing> > contend();
+
+private:
+ bool initialized;
+ process::Promise<Nothing>* promise;
+};
+
+
+class ZooKeeperMasterContender : public MasterContender
+{
+public:
+ // Creates a contender that uses ZooKeeper to determine (i.e.,
+ // elect) a leading master.
+ explicit ZooKeeperMasterContender(const zookeeper::URL& url);
+ explicit ZooKeeperMasterContender(process::Owned<zookeeper::Group> group);
+
+ virtual ~ZooKeeperMasterContender();
+
+ // MasterContender implementation.
+ virtual void initialize(const MasterInfo& masterInfo);
+ virtual process::Future<process::Future<Nothing> > contend();
+
+private:
+ ZooKeeperMasterContenderProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MASTER_CONTENDER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/master/detector/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector/detector.cpp b/src/master/detector/detector.cpp
new file mode 100644
index 0000000..ad9c209
--- /dev/null
+++ b/src/master/detector/detector.cpp
@@ -0,0 +1,522 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <set>
+#include <string>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/logging.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/protobuf.hpp>
+
+#include <mesos/master/detector.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "master/constants.hpp"
+#include "master/detector.hpp"
+#include "master/master.hpp"
+
+#include "messages/messages.hpp"
+
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+using namespace process;
+using namespace zookeeper;
+
+using std::set;
+using std::string;
+
+namespace mesos {
+namespace master {
+namespace detector {
+
+const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10);
+
+// TODO(bmahler): Consider moving these kinds of helpers into
+// libprocess or a common header within mesos.
+namespace promises {
+
+// Helper for setting a set of Promises.
+template <typename T>
+void set(std::set<Promise<T>* >* promises, const T& t)
+{
+ foreach (Promise<T>* promise, *promises) {
+ promise->set(t);
+ delete promise;
+ }
+ promises->clear();
+}
+
+
+// Helper for failing a set of Promises.
+template <typename T>
+void fail(std::set<Promise<T>* >* promises, const string& failure)
+{
+ foreach (Promise<Option<MasterInfo> >* promise, *promises) {
+ promise->fail(failure);
+ delete promise;
+ }
+ promises->clear();
+}
+
+
+// Helper for discarding a set of Promises.
+template <typename T>
+void discard(std::set<Promise<T>* >* promises)
+{
+ foreach (Promise<T>* promise, *promises) {
+ promise->discard();
+ delete promise;
+ }
+ promises->clear();
+}
+
+
+// Helper for discarding an individual promise in the set.
+template <typename T>
+void discard(std::set<Promise<T>* >* promises, const Future<T>& future)
+{
+ foreach (Promise<T>* promise, *promises) {
+ if (promise->future() == future) {
+ promise->discard();
+ promises->erase(promise);
+ delete promise;
+ return;
+ }
+ }
+}
+
+} // namespace promises {
+
+
+class StandaloneMasterDetectorProcess
+ : public Process<StandaloneMasterDetectorProcess>
+{
+public:
+ StandaloneMasterDetectorProcess()
+ : ProcessBase(ID::generate("standalone-master-detector")) {}
+ explicit StandaloneMasterDetectorProcess(const MasterInfo& _leader)
+ : ProcessBase(ID::generate("standalone-master-detector")),
+ leader(_leader) {}
+
+ ~StandaloneMasterDetectorProcess()
+ {
+ promises::discard(&promises);
+ }
+
+ void appoint(const Option<MasterInfo>& leader_)
+ {
+ leader = leader_;
+
+ promises::set(&promises, leader);
+ }
+
+ Future<Option<MasterInfo> > detect(
+ const Option<MasterInfo>& previous = None())
+ {
+ if (leader != previous) {
+ return leader;
+ }
+
+ Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >();
+
+ promise->future()
+ .onDiscard(defer(self(), &Self::discard, promise->future()));
+
+ promises.insert(promise);
+ return promise->future();
+ }
+
+private:
+ void discard(const Future<Option<MasterInfo> >& future)
+ {
+ // Discard the promise holding this future.
+ promises::discard(&promises, future);
+ }
+
+ Option<MasterInfo> leader; // The appointed master.
+ set<Promise<Option<MasterInfo> >*> promises;
+};
+
+
+class ZooKeeperMasterDetectorProcess
+ : public Process<ZooKeeperMasterDetectorProcess>
+{
+public:
+ explicit ZooKeeperMasterDetectorProcess(const zookeeper::URL& url);
+ explicit ZooKeeperMasterDetectorProcess(Owned<Group> group);
+ ~ZooKeeperMasterDetectorProcess();
+
+ virtual void initialize();
+ Future<Option<MasterInfo> > detect(const Option<MasterInfo>& previous);
+
+private:
+ void discard(const Future<Option<MasterInfo> >& future);
+
+ // Invoked when the group leadership has changed.
+ void detected(const Future<Option<Group::Membership> >& leader);
+
+ // Invoked when we have fetched the data associated with the leader.
+ void fetched(
+ const Group::Membership& membership,
+ const Future<Option<string> >& data);
+
+ Owned<Group> group;
+ LeaderDetector detector;
+
+ // The leading Master.
+ Option<MasterInfo> leader;
+ set<Promise<Option<MasterInfo> >*> promises;
+
+ // Potential non-retryable error.
+ Option<Error> error;
+};
+
+
+Try<MasterDetector*> MasterDetector::create(const Option<string>& _mechanism)
+{
+ if (_mechanism.isNone()) {
+ return new StandaloneMasterDetector();
+ }
+
+ string mechanism = _mechanism.get();
+
+ if (strings::startsWith(mechanism, "zk://")) {
+ Try<zookeeper::URL> url = zookeeper::URL::parse(mechanism);
+ if (url.isError()) {
+ return Error(url.error());
+ }
+ if (url.get().path == "/") {
+ return Error(
+ "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
+ }
+ return new ZooKeeperMasterDetector(url.get());
+ } else if (strings::startsWith(mechanism, "file://")) {
+ // Load the configuration out of a file. While Mesos and related
+ // programs always use <stout/flags> to process the command line
+ // arguments (and therefore file://) this entrypoint is exposed by
+ // libmesos, with frameworks currently calling it and expecting it
+ // to do the argument parsing for them which roughly matches the
+ // argument parsing Mesos will do.
+ // TODO(cmaloney): Rework the libmesos exposed APIs to expose
+ // A "flags" endpoint where the framework can pass the command
+ // line arguments and they will be parsed by <stout/flags> and the
+ // needed flags extracted, and then change this interface to
+ // require final values from the flags. This means that a
+ // framework doesn't need to know how the flags are passed to
+ // match mesos' command line arguments if it wants, but if it
+ // needs to inspect/manipulate arguments, it can.
+ LOG(WARNING) << "Specifying master detection mechanism / ZooKeeper URL to "
+ "be read out of a file via 'file://' is deprecated inside "
+ "Mesos and will be removed in a future release.";
+ const string& path = mechanism.substr(7);
+ const Try<string> read = os::read(path);
+ if (read.isError()) {
+ return Error("Failed to read from file at '" + path + "'");
+ }
+
+ return create(strings::trim(read.get()));
+ }
+
+ CHECK(!strings::startsWith(mechanism, "file://"));
+
+ // Okay, try and parse what we got as a PID.
+ UPID pid = mechanism.find("master@") == 0
+ ? UPID(mechanism)
+ : UPID("master@" + mechanism);
+
+ if (!pid) {
+ return Error("Failed to parse '" + mechanism + "'");
+ }
+
+ return new StandaloneMasterDetector(
+ internal::protobuf::createMasterInfo(pid));
+}
+
+
+MasterDetector::~MasterDetector() {}
+
+
+StandaloneMasterDetector::StandaloneMasterDetector()
+{
+ process = new StandaloneMasterDetectorProcess();
+ spawn(process);
+}
+
+
+StandaloneMasterDetector::StandaloneMasterDetector(const MasterInfo& leader)
+{
+ process = new StandaloneMasterDetectorProcess(leader);
+ spawn(process);
+}
+
+
+StandaloneMasterDetector::StandaloneMasterDetector(const UPID& leader)
+{
+ process = new StandaloneMasterDetectorProcess(
+ mesos::internal::protobuf::createMasterInfo(leader));
+
+ spawn(process);
+}
+
+
+StandaloneMasterDetector::~StandaloneMasterDetector()
+{
+ terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+void StandaloneMasterDetector::appoint(const Option<MasterInfo>& leader)
+{
+ dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader);
+}
+
+
+void StandaloneMasterDetector::appoint(const UPID& leader)
+{
+ dispatch(process,
+ &StandaloneMasterDetectorProcess::appoint,
+ mesos::internal::protobuf::createMasterInfo(leader));
+}
+
+
+Future<Option<MasterInfo> > StandaloneMasterDetector::detect(
+ const Option<MasterInfo>& previous)
+{
+ return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous);
+}
+
+
+// TODO(benh): Get ZooKeeper timeout from configuration.
+ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
+ const zookeeper::URL& url)
+ : ZooKeeperMasterDetectorProcess(Owned<Group>(
+ new Group(url.servers,
+ MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
+ url.path,
+ url.authentication))) {}
+
+
+ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
+ Owned<Group> _group)
+ : ProcessBase(ID::generate("zookeeper-master-detector")),
+ group(_group),
+ detector(group.get()),
+ leader(None()) {}
+
+
+ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
+{
+ promises::discard(&promises);
+}
+
+
+void ZooKeeperMasterDetectorProcess::initialize()
+{
+ detector.detect()
+ .onAny(defer(self(), &Self::detected, lambda::_1));
+}
+
+
+void ZooKeeperMasterDetectorProcess::discard(
+ const Future<Option<MasterInfo> >& future)
+{
+ // Discard the promise holding this future.
+ promises::discard(&promises, future);
+}
+
+
+Future<Option<MasterInfo> > ZooKeeperMasterDetectorProcess::detect(
+ const Option<MasterInfo>& previous)
+{
+ // Return immediately if the detector is no longer operational due
+ // to a non-retryable error.
+ if (error.isSome()) {
+ return Failure(error.get().message);
+ }
+
+ if (leader != previous) {
+ return leader;
+ }
+
+ Promise<Option<MasterInfo> >* promise = new Promise<Option<MasterInfo> >();
+
+ promise->future()
+ .onDiscard(defer(self(), &Self::discard, promise->future()));
+
+ promises.insert(promise);
+ return promise->future();
+}
+
+
+void ZooKeeperMasterDetectorProcess::detected(
+ const Future<Option<Group::Membership> >& _leader)
+{
+ CHECK(!_leader.isDiscarded());
+
+ if (_leader.isFailed()) {
+ LOG(ERROR) << "Failed to detect the leader: " << _leader.failure();
+
+ // Setting this error stops the detection loop and the detector
+ // transitions to an erroneous state. Further calls to detect()
+ // will directly fail as a result.
+ error = Error(_leader.failure());
+ leader = None();
+
+ promises::fail(&promises, _leader.failure());
+
+ return;
+ }
+
+ if (_leader.get().isNone()) {
+ leader = None();
+
+ promises::set(&promises, leader);
+ } else {
+ // Fetch the data associated with the leader.
+ group->data(_leader.get().get())
+ .onAny(defer(self(), &Self::fetched, _leader.get().get(), lambda::_1));
+ }
+
+ // Keep trying to detect leadership changes.
+ detector.detect(_leader.get())
+ .onAny(defer(self(), &Self::detected, lambda::_1));
+}
+
+
+void ZooKeeperMasterDetectorProcess::fetched(
+ const Group::Membership& membership,
+ const Future<Option<string> >& data)
+{
+ CHECK(!data.isDiscarded());
+
+ if (data.isFailed()) {
+ leader = None();
+ promises::fail(&promises, data.failure());
+ return;
+ } else if (data.get().isNone()) {
+ // Membership is gone before we can read its data.
+ leader = None();
+ promises::set(&promises, leader);
+ return;
+ }
+
+ // Parse the data based on the membership label and cache the
+ // leader for subsequent requests.
+ Option<string> label = membership.label();
+ if (label.isNone()) {
+ // If we are here it means some masters are still creating znodes
+ // with the old format.
+ UPID pid = UPID(data.get().get());
+ LOG(WARNING) << "Leading master " << pid << " has data in old format";
+ leader = mesos::internal::protobuf::createMasterInfo(pid);
+ } else if (label.isSome() &&
+ label.get() == mesos::internal::master::MASTER_INFO_LABEL) {
+ MasterInfo info;
+ if (!info.ParseFromString(data.get().get())) {
+ leader = None();
+ promises::fail(&promises, "Failed to parse data into MasterInfo");
+ return;
+ }
+ LOG(WARNING) << "Leading master " << info.pid()
+ << " is using a Protobuf binary format when registering with "
+ << "ZooKeeper (" << label.get() << "): this will be deprecated"
+ << " as of Mesos 0.24 (see MESOS-2340)";
+ leader = info;
+ } else if (label.isSome() &&
+ label.get() == mesos::internal::master::MASTER_INFO_JSON_LABEL) {
+ Try<JSON::Object> object = JSON::parse<JSON::Object>(data.get().get());
+
+ if (object.isError()) {
+ leader = None();
+ promises::fail(
+ &promises,
+ "Failed to parse data into valid JSON: " + object.error());
+ return;
+ }
+
+ Try<mesos::MasterInfo> info =
+ ::protobuf::parse<mesos::MasterInfo>(object.get());
+
+ if (info.isError()) {
+ leader = None();
+ promises::fail(
+ &promises,
+ "Failed to parse JSON into a valid MasterInfo protocol buffer: " +
+ info.error());
+ return;
+ }
+
+ leader = info.get();
+ } else {
+ leader = None();
+ promises::fail(
+ &promises,
+ "Failed to parse data of unknown label '" + label.get() + "'");
+ return;
+ }
+
+ LOG(INFO) << "A new leading master (UPID="
+ << UPID(leader.get().pid()) << ") is detected";
+
+ promises::set(&promises, leader);
+}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(const zookeeper::URL& url)
+{
+ process = new ZooKeeperMasterDetectorProcess(url);
+ spawn(process);
+}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(Owned<Group> group)
+{
+ process = new ZooKeeperMasterDetectorProcess(group);
+ spawn(process);
+}
+
+
+ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
+{
+ terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+Future<Option<MasterInfo> > ZooKeeperMasterDetector::detect(
+ const Option<MasterInfo>& previous)
+{
+ return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous);
+}
+
+} // namespace detector {
+} // namespace master {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/cfbca013/src/master/detector/detector.hpp
----------------------------------------------------------------------
diff --git a/src/master/detector/detector.hpp b/src/master/detector/detector.hpp
new file mode 100644
index 0000000..8400265
--- /dev/null
+++ b/src/master/detector/detector.hpp
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __MASTER_DETECTOR_HPP__
+#define __MASTER_DETECTOR_HPP__
+
+#include <string>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/option.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+
+#include "messages/messages.hpp"
+
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+namespace mesos {
+namespace internal {
+
+extern const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT;
+
+// Forward declarations.
+class StandaloneMasterDetectorProcess;
+class ZooKeeperMasterDetectorProcess;
+
+// A standalone implementation of the MasterDetector with no external
+// discovery mechanism so the user has to manually appoint a leader
+// to the detector for it to be detected.
+class StandaloneMasterDetector : public MasterDetector
+{
+public:
+ StandaloneMasterDetector();
+ // Use this constructor if the leader is known beforehand so it is
+ // unnecessary to call 'appoint()' separately.
+ explicit StandaloneMasterDetector(const MasterInfo& leader);
+
+ // Same as above but takes UPID as the parameter.
+ explicit StandaloneMasterDetector(const process::UPID& leader);
+
+ virtual ~StandaloneMasterDetector();
+
+ // Appoint the leading master so it can be *detected*.
+ void appoint(const Option<MasterInfo>& leader);
+
+ // Same as above but takes 'UPID' as the parameter.
+ void appoint(const process::UPID& leader);
+
+ virtual process::Future<Option<MasterInfo> > detect(
+ const Option<MasterInfo>& previous = None());
+
+private:
+ StandaloneMasterDetectorProcess* process;
+};
+
+
+class ZooKeeperMasterDetector : public MasterDetector
+{
+public:
+ // Creates a detector which uses ZooKeeper to determine (i.e.,
+ // elect) a leading master.
+ explicit ZooKeeperMasterDetector(const zookeeper::URL& url);
+ // Used for testing purposes.
+ explicit ZooKeeperMasterDetector(process::Owned<zookeeper::Group> group);
+ virtual ~ZooKeeperMasterDetector();
+
+ // MasterDetector implementation.
+ // The detector transparently tries to recover from retryable
+ // errors until the group session expires, in which case the Future
+ // returns None.
+ virtual process::Future<Option<MasterInfo> > detect(
+ const Option<MasterInfo>& previous = None());
+
+private:
+ ZooKeeperMasterDetectorProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MASTER_DETECTOR_HPP__