You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/11/19 19:57:47 UTC
[6/8] git commit: Added Master contender and detector abstractions.
Added Master contender and detector abstractions.
From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/13087
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bcd1dc4e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bcd1dc4e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bcd1dc4e
Branch: refs/heads/master
Commit: bcd1dc4e10a1cff4fdc4e92daff108b6fa0475d3
Parents: 9fa1d47
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Nov 19 10:39:15 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Nov 19 10:57:19 2013 -0800
----------------------------------------------------------------------
src/Makefile.am | 71 +--
src/master/contender.cpp | 206 +++++++
src/master/contender.hpp | 127 ++++
src/master/detector.cpp | 361 ++++++++++++
src/master/detector.hpp | 129 ++++
src/tests/master_contender_detector_tests.cpp | 647 +++++++++++++++++++++
src/tests/master_detector_tests.cpp | 628 --------------------
7 files changed, 1508 insertions(+), 661 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 3c48aee..969aead 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -161,7 +161,9 @@ libmesos_no_3rdparty_la_SOURCES = \
sasl/auxprop.cpp \
sched/sched.cpp \
local/local.cpp \
+ master/contender.cpp \
master/constants.cpp \
+ master/detector.cpp \
master/drf_sorter.cpp \
master/http.cpp \
master/master.cpp \
@@ -224,7 +226,10 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp \
linux/fs.hpp local/flags.hpp local/local.hpp \
logging/flags.hpp logging/logging.hpp \
master/allocator.hpp \
- master/constants.hpp master/drf_sorter.hpp master/flags.hpp \
+ master/contender.hpp \
+ master/constants.hpp \
+ master/detector.hpp \
+ master/drf_sorter.hpp master/flags.hpp \
master/hierarchical_allocator_process.hpp \
master/registrar.hpp \
master/master.hpp master/sorter.hpp \
@@ -779,38 +784,38 @@ balloon_executor_LDADD = libmesos.la
check_PROGRAMS += mesos-tests
-mesos_tests_SOURCES = \
- tests/allocator_tests.cpp \
- tests/attributes_tests.cpp \
- tests/authentication_tests.cpp \
- tests/environment.cpp \
- tests/examples_tests.cpp \
- tests/exception_tests.cpp \
- tests/fault_tolerance_tests.cpp \
- tests/files_tests.cpp \
- tests/flags.cpp \
- tests/gc_tests.cpp \
- tests/isolator_tests.cpp \
- tests/log_tests.cpp \
- tests/logging_tests.cpp \
- tests/main.cpp \
- tests/master_detector_tests.cpp \
- tests/master_tests.cpp \
- tests/mesos.cpp \
- tests/monitor_tests.cpp \
- tests/paths_tests.cpp \
- tests/protobuf_io_tests.cpp \
- tests/reaper_tests.cpp \
- tests/registrar_tests.cpp \
- tests/resource_offers_tests.cpp \
- tests/resources_tests.cpp \
- tests/sasl_tests.cpp \
- tests/script.cpp \
- tests/slave_recovery_tests.cpp \
- tests/sorter_tests.cpp \
- tests/state_tests.cpp \
- tests/status_update_manager_tests.cpp \
- tests/utils.cpp \
+mesos_tests_SOURCES = \
+ tests/allocator_tests.cpp \
+ tests/attributes_tests.cpp \
+ tests/authentication_tests.cpp \
+ tests/environment.cpp \
+ tests/examples_tests.cpp \
+ tests/exception_tests.cpp \
+ tests/fault_tolerance_tests.cpp \
+ tests/files_tests.cpp \
+ tests/flags.cpp \
+ tests/gc_tests.cpp \
+ tests/isolator_tests.cpp \
+ tests/log_tests.cpp \
+ tests/logging_tests.cpp \
+ tests/main.cpp \
+ tests/master_contender_detector_tests.cpp \
+ tests/master_tests.cpp \
+ tests/mesos.cpp \
+ tests/monitor_tests.cpp \
+ tests/paths_tests.cpp \
+ tests/protobuf_io_tests.cpp \
+ tests/reaper_tests.cpp \
+ tests/registrar_tests.cpp \
+ tests/resource_offers_tests.cpp \
+ tests/resources_tests.cpp \
+ tests/sasl_tests.cpp \
+ tests/script.cpp \
+ tests/slave_recovery_tests.cpp \
+ tests/sorter_tests.cpp \
+ tests/state_tests.cpp \
+ tests/status_update_manager_tests.cpp \
+ tests/utils.cpp \
tests/zookeeper_url_tests.cpp
mesos_tests_CPPFLAGS = $(MESOS_CPPFLAGS)
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/master/contender.cpp
----------------------------------------------------------------------
diff --git a/src/master/contender.cpp b/src/master/contender.cpp
new file mode 100644
index 0000000..84b0552
--- /dev/null
+++ b/src/master/contender.cpp
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <process/defer.hpp>
+#include <process/process.hpp>
+
+#include <stout/check.hpp>
+#include <stout/lambda.hpp>
+
+#include "master/contender.hpp"
+#include "master/master.hpp"
+
+#include "zookeeper/contender.hpp"
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+using std::string;
+
+using namespace process;
+using namespace zookeeper;
+
+namespace mesos {
+namespace internal {
+
+using namespace master;
+
+const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT = Seconds(10);
+
+
+class ZooKeeperMasterContenderProcess
+ : public Process<ZooKeeperMasterContenderProcess>
+{
+public:
+ ZooKeeperMasterContenderProcess(const zookeeper::URL& url);
+ ZooKeeperMasterContenderProcess(Owned<zookeeper::Group> group);
+ ~ZooKeeperMasterContenderProcess();
+
+ void initialize(const PID<Master>& master);
+
+ // MasterContender implementation.
+ virtual Future<Future<Nothing> > contend();
+
+private:
+ Owned<zookeeper::Group> group;
+ LeaderContender* contender;
+ PID<Master> master;
+};
+
+
+Try<MasterContender*> MasterContender::create(const string& zk)
+{
+ if (zk == "") {
+ return new StandaloneMasterContender();
+ } else if (strings::startsWith(zk, "zk://")) {
+ Try<zookeeper::URL> url = URL::parse(zk);
+ if (url.isError()) {
+ return Try<MasterContender*>::error(url.error());
+ }
+ if (url.get().path == "/") {
+ return Try<MasterContender*>::error(
+ "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
+ }
+ return new ZooKeeperMasterContender(url.get());
+ } else if (strings::startsWith(zk, "file://")) {
+ const string& path = zk.substr(7);
+ const Try<string> read = os::read(path);
+ if (read.isError()) {
+ return Error("Failed to read from file at '" + path + "'");
+ }
+
+ return create(strings::trim(read.get()));
+ }
+
+ return Try<MasterContender*>::error("Failed to parse '" + zk + "'");
+}
+
+
+MasterContender::~MasterContender() {}
+
+
+StandaloneMasterContender::~StandaloneMasterContender()
+{
+ if (promise != NULL) {
+ promise->set(Nothing()); // Leadership lost.
+ delete promise;
+ }
+}
+
+
+void StandaloneMasterContender::initialize(
+ const PID<master::Master>& master)
+{
+ // We don't really need to store the master in this basic
+ // implementation so we just restore an 'initialized' flag to make
+ // sure it is called.
+ initialized = true;
+}
+
+Future<Future<Nothing> > StandaloneMasterContender::contend()
+{
+ CHECK(initialized) << "Initialize the contender first";
+
+ if (promise != NULL) {
+ LOG(INFO) << "Withdrawing the previous membership before recontending";
+ promise->set(Nothing());
+ delete promise;
+ }
+
+ // Directly return a future that is always pending because it
+ // represents a membership/leadership that is not going to be lost
+ // until we 'withdraw'.
+ promise = new Promise<Nothing>();
+ return promise->future();
+}
+
+
+ZooKeeperMasterContender::ZooKeeperMasterContender(const URL& url)
+{
+ process = new ZooKeeperMasterContenderProcess(url);
+ spawn(process);
+}
+
+
+ZooKeeperMasterContender::ZooKeeperMasterContender(Owned<Group> group)
+{
+ process = new ZooKeeperMasterContenderProcess(group);
+ spawn(process);
+}
+
+
+ZooKeeperMasterContender::~ZooKeeperMasterContender()
+{
+ terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+void ZooKeeperMasterContender::initialize(
+ const PID<master::Master>& master)
+{
+ process->initialize(master);
+}
+
+
+Future<Future<Nothing> > ZooKeeperMasterContender::contend()
+{
+ return dispatch(process, &ZooKeeperMasterContenderProcess::contend);
+}
+
+
+ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
+ const URL& url)
+ : group(new Group(url, MASTER_CONTENDER_ZK_SESSION_TIMEOUT)),
+ contender(NULL) {}
+
+
+ZooKeeperMasterContenderProcess::ZooKeeperMasterContenderProcess(
+ Owned<Group> _group)
+ : group(_group),
+ contender(NULL) {}
+
+
+ZooKeeperMasterContenderProcess::~ZooKeeperMasterContenderProcess()
+{
+ delete contender;
+}
+
+void ZooKeeperMasterContenderProcess::initialize(
+ const PID<Master>& _master)
+{
+ master = _master;
+}
+
+
+Future<Future<Nothing> > ZooKeeperMasterContenderProcess::contend()
+{
+ CHECK(master) << "Initialize the contender first";
+
+ if (contender != NULL) {
+ LOG(INFO) << "Withdrawing the previous membership before recontending";
+ delete contender;
+ }
+
+ contender = new LeaderContender(group.get(), master);
+ return contender->contend();
+}
+
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/master/contender.hpp
----------------------------------------------------------------------
diff --git a/src/master/contender.hpp b/src/master/contender.hpp
new file mode 100644
index 0000000..50fd4f3
--- /dev/null
+++ b/src/master/contender.hpp
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_CONTENDER_HPP__
+#define __MASTER_CONTENDER_HPP__
+
+#include <process/defer.hpp>
+#include <process/future.hpp>
+#include <process/pid.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include <stout/owned.hpp>
+
+#include "zookeeper/contender.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+namespace mesos {
+namespace internal {
+
+extern const Duration MASTER_CONTENDER_ZK_SESSION_TIMEOUT;
+
+
+// Forward declarations.
+namespace master {
+class Master;
+}
+
+
+class ZooKeeperMasterContenderProcess;
+
+
+// An abstraction for contending to be a leading master.
+class MasterContender
+{
+public:
+ // Attempts to create a master contender using the specified
+ // Zookeeper.
+ // The Zookeeper address should be one of:
+ // - zk://host1:port1,host2:port2,.../path
+ // - zk://username:password@host1:port1,host2:port2,.../path
+ // - file:///path/to/file (where file contains one of the above)
+ // Note that the returned contender still needs to be 'initialize()'d.
+ static Try<MasterContender*> create(const std::string& zk);
+
+ // Note that the contender's membership, if obtained, is scheduled
+ // to be cancelled during destruction.
+ virtual ~MasterContender() = 0;
+
+ // Initializes the contender with the PID of the master it contends
+ // on behalf of.
+ virtual void initialize(const process::PID<master::Master>& master) = 0;
+
+ // Returns a Future<Nothing> once the contender has entered the
+ // contest (by obtaining a membership) and an error otherwise.
+ // The inner Future returns Nothing when the contender is out of
+ // the contest (i.e. its membership is lost).
+ //
+ // This method can be used to contend again. Each call to this
+ // method causes the previous candidacy to be withdrawn before
+ // re-contending.
+ virtual process::Future<process::Future<Nothing> > contend() = 0;
+};
+
+
+// A basic implementation which assumes only one master is
+// contending.
+class StandaloneMasterContender : public MasterContender
+{
+public:
+ StandaloneMasterContender()
+ : initialized(false),
+ promise(NULL) {}
+ virtual ~StandaloneMasterContender();
+
+ // MasterContender implementation.
+ virtual void initialize(const process::PID<master::Master>& master);
+
+ // In this basic implementation the outer Future directly returns
+ // and inner Future stays pending because there is only one
+ // contender in the contest.
+ virtual process::Future<process::Future<Nothing> > contend();
+
+private:
+ bool initialized;
+ process::Promise<Nothing>* promise;
+};
+
+
+class ZooKeeperMasterContender : public MasterContender
+{
+public:
+ // Creates a contender that uses ZooKeeper to determine (i.e.,
+ // elect) a leading master.
+ ZooKeeperMasterContender(const zookeeper::URL& url);
+ ZooKeeperMasterContender(Owned<zookeeper::Group> group);
+
+ virtual ~ZooKeeperMasterContender();
+
+ // MasterContender implementation.
+ virtual void initialize(const process::PID<master::Master>& master);
+ virtual process::Future<process::Future<Nothing> > contend();
+
+private:
+ ZooKeeperMasterContenderProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MASTER_CONTENDER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/master/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector.cpp b/src/master/detector.cpp
new file mode 100644
index 0000000..2f73f66
--- /dev/null
+++ b/src/master/detector.cpp
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <set>
+#include <string>
+
+#include <tr1/functional>
+#include <tr1/memory> // TODO(benh): Replace shared_ptr with unique_ptr.
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/logging.hpp>
+#include <process/process.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+
+#include "master/detector.hpp"
+
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+using namespace process;
+using namespace zookeeper;
+
+using std::set;
+using std::string;
+
+namespace mesos {
+namespace internal {
+
+const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT = Seconds(10);
+
+
+class StandaloneMasterDetectorProcess
+ : public Process<StandaloneMasterDetectorProcess>
+{
+public:
+ StandaloneMasterDetectorProcess() : leader(None()) {}
+ StandaloneMasterDetectorProcess(const UPID& _leader)
+ : leader(_leader) {}
+ ~StandaloneMasterDetectorProcess();
+
+ void appoint(const Result<UPID>& leader);
+ Future<Result<UPID> > detect(const Result<UPID>& previous = None());
+
+private:
+ // The leading master that's directly 'appoint()'ed.
+ Result<UPID> leader;
+
+ // Promises for the detection result.
+ set<Promise<Result<UPID> >*> promises;
+};
+
+
+class ZooKeeperMasterDetectorProcess
+ : public Process<ZooKeeperMasterDetectorProcess>
+{
+public:
+ ZooKeeperMasterDetectorProcess(const URL& url);
+ ZooKeeperMasterDetectorProcess(Owned<Group> group);
+ ~ZooKeeperMasterDetectorProcess();
+
+ virtual void initialize();
+
+ // ZooKeeperMasterDetector implementation.
+ Future<Result<UPID> > detect(const Result<UPID>& previous);
+
+private:
+ // Invoked when the group leadership has changed.
+ void detected(Future<Result<Group::Membership> > leader);
+
+ // Invoked when we have fetched the data associated with the leader.
+ void fetched(const Future<string>& data);
+
+ Owned<Group> group;
+ LeaderDetector detector;
+
+ // The leading Master.
+ Result<UPID> leader;
+ set<Promise<Result<UPID> >*> promises;
+};
+
+
+Try<MasterDetector*> MasterDetector::create(const string& master)
+{
+ if (master == "") {
+ return new StandaloneMasterDetector();
+ } else if (master.find("zk://") == 0) {
+ Try<URL> url = URL::parse(master);
+ if (url.isError()) {
+ return Try<MasterDetector*>::error(url.error());
+ }
+ if (url.get().path == "/") {
+ return Try<MasterDetector*>::error(
+ "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
+ }
+ return new ZooKeeperMasterDetector(url.get());
+ } else if (master.find("file://") == 0) {
+ const string& path = master.substr(7);
+ const Try<string> read = os::read(path);
+ if (read.isError()) {
+ return Error("Failed to read from file at '" + path + "'");
+ }
+
+ return create(strings::trim(read.get()));
+ }
+
+ // Okay, try and parse what we got as a PID.
+ UPID pid = master.find("master@") == 0
+ ? UPID(master)
+ : UPID("master@" + master);
+
+ if (!pid) {
+ return Try<MasterDetector*>::error(
+ "Failed to parse '" + master + "'");
+ }
+
+ return new StandaloneMasterDetector(pid);
+}
+
+
+MasterDetector::~MasterDetector() {}
+
+
+StandaloneMasterDetectorProcess::~StandaloneMasterDetectorProcess()
+{
+ foreach (Promise<Result<UPID> >* promise, promises) {
+ promise->set(Result<UPID>::error("MasterDetector is being destructed"));
+ delete promise;
+ }
+ promises.clear();
+}
+
+
+void StandaloneMasterDetectorProcess::appoint(
+ const Result<process::UPID>& _leader)
+{
+ leader = _leader;
+
+ foreach (Promise<Result<UPID> >* promise, promises) {
+ promise->set(leader);
+ delete promise;
+ }
+ promises.clear();
+}
+
+
+Future<Result<UPID> > StandaloneMasterDetectorProcess::detect(
+ const Result<UPID>& previous)
+{
+ // Directly return the current leader is not the
+ // same as the previous one.
+ if (leader.isError() != previous.isError() ||
+ leader.isNone() != previous.isNone() ||
+ leader.isSome() != previous.isSome()) {
+ return leader; // State change.
+ } else if (leader.isSome() && previous.isSome() &&
+ leader.get() != previous.get()) {
+ return leader; // Leadership change.
+ }
+
+ Promise<Result<UPID> >* promise = new Promise<Result<UPID> >();
+ promises.insert(promise);
+ return promise->future();
+}
+
+
+StandaloneMasterDetector::StandaloneMasterDetector()
+{
+ process = new StandaloneMasterDetectorProcess();
+ spawn(process);
+}
+
+
+StandaloneMasterDetector::StandaloneMasterDetector(const UPID& leader)
+{
+ process = new StandaloneMasterDetectorProcess(leader);
+ spawn(process);
+}
+
+
+StandaloneMasterDetector::~StandaloneMasterDetector()
+{
+ terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+void StandaloneMasterDetector::appoint(const Result<process::UPID>& leader)
+{
+ return dispatch(process, &StandaloneMasterDetectorProcess::appoint, leader);
+}
+
+
+Future<Result<UPID> > StandaloneMasterDetector::detect(
+ const Result<UPID>& previous)
+{
+ return dispatch(process, &StandaloneMasterDetectorProcess::detect, previous);
+}
+
+
+// TODO(benh): Get ZooKeeper timeout from configuration.
+// TODO(xujyan): Use peer constructor after switching to C++ 11.
+ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
+ const URL& url)
+ : group(new Group(url.servers,
+ MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
+ url.path,
+ url.authentication)),
+ detector(group.get()),
+ leader(None()) {}
+
+
+ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
+ Owned<Group> _group)
+ : group(_group),
+ detector(group.get()),
+ leader(None()) {}
+
+
+ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
+{
+ foreach (Promise<Result<UPID> >* promise, promises) {
+ promise->set(Result<UPID>::error("No longer detecting a master"));
+ delete promise;
+ }
+ promises.clear();
+}
+
+
+void ZooKeeperMasterDetectorProcess::initialize()
+{
+ detector.detect(None())
+ .onAny(defer(self(), &Self::detected, lambda::_1));
+}
+
+
+Future<Result<UPID> > ZooKeeperMasterDetectorProcess::detect(
+ const Result<UPID>& previous)
+{
+ // Directly return when the current leader and previous are not the
+ // same.
+ if (leader.isError() != previous.isError() ||
+ leader.isNone() != previous.isNone() ||
+ leader.isSome() != previous.isSome()) {
+ return leader; // State change.
+ } else if (leader.isSome() && previous.isSome() &&
+ leader.get() != previous.get()) {
+ return leader; // Leadership change.
+ }
+
+ Promise<Result<UPID> >* promise = new Promise<Result<UPID> >();
+ promises.insert(promise);
+ return promise->future();
+}
+
+
+void ZooKeeperMasterDetectorProcess::detected(
+ Future<Result<Group::Membership> > _leader)
+{
+ CHECK(_leader.isReady())
+ << "Not expecting LeaderDetector to fail or discard futures";
+
+ if (!_leader.get().isSome()) {
+ leader = _leader.get().isError()
+ ? Result<UPID>::error(_leader.get().error())
+ : Result<UPID>::none();
+
+ foreach (Promise<Result<UPID> >* promise, promises) {
+ promise->set(leader);
+ delete promise;
+ }
+ promises.clear();
+ } else {
+ // Fetch the data associated with the leader.
+ group->data(_leader.get().get())
+ .onAny(defer(self(), &Self::fetched, lambda::_1));
+ }
+
+ // Keep trying to detect leadership changes.
+ detector.detect(_leader.get())
+ .onAny(defer(self(), &Self::detected, lambda::_1));
+}
+
+
+void ZooKeeperMasterDetectorProcess::fetched(const Future<string>& data)
+{
+ if (data.isFailed()) {
+ leader = Error(data.failure());
+ foreach (Promise<Result<UPID> >* promise, promises) {
+ promise->set(leader);
+ delete promise;
+ }
+ promises.clear();
+ return;
+ }
+
+ CHECK(data.isReady()); // Not expecting Group to discard futures.
+
+ // Cache the master for subsequent requests.
+ leader = UPID(data.get());
+ LOG(INFO) << "A new leading master (UPID=" << leader.get() << ") is detected";
+
+ foreach (Promise<Result<UPID> >* promise, promises) {
+ promise->set(leader);
+ delete promise;
+ }
+ promises.clear();
+}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(const URL& url)
+{
+ process = new ZooKeeperMasterDetectorProcess(url);
+ spawn(process);
+}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(Owned<Group> group)
+{
+ process = new ZooKeeperMasterDetectorProcess(group);
+ spawn(process);
+}
+
+
+ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
+{
+ terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+Future<Result<UPID> > ZooKeeperMasterDetector::detect(
+ const Result<UPID>& previous)
+{
+ return dispatch(process, &ZooKeeperMasterDetectorProcess::detect, previous);
+}
+
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/master/detector.hpp
----------------------------------------------------------------------
diff --git a/src/master/detector.hpp b/src/master/detector.hpp
new file mode 100644
index 0000000..ceb3a3f
--- /dev/null
+++ b/src/master/detector.hpp
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_DETECTOR_HPP__
+#define __MASTER_DETECTOR_HPP__
+
+#include <process/future.hpp>
+#include <process/pid.hpp>
+
+#include <stout/owned.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+
+#include "zookeeper/detector.hpp"
+#include "zookeeper/group.hpp"
+#include "zookeeper/url.hpp"
+
+namespace mesos {
+namespace internal {
+
+extern const Duration MASTER_DETECTOR_ZK_SESSION_TIMEOUT;
+
+
+// Forward declarations.
+class StandaloneMasterDetectorProcess;
+class ZooKeeperMasterDetectorProcess;
+
+
+// An abstraction of a Master detector which can be used to
+// detect the leading master from a group.
+class MasterDetector
+{
+public:
+ // Attempts to create a master detector for the specified master.
+ // The master should be one of:
+ // - host:port
+ // - zk://host1:port1,host2:port2,.../path
+ // - zk://username:password@host1:port1,host2:port2,.../path
+ // - file:///path/to/file (where file contains one of the above)
+ static Try<MasterDetector*> create(const std::string& master);
+ virtual ~MasterDetector() = 0;
+
+ // Returns some PID after an election has occurred and the elected
+ // PID is different than that specified (if any), or NONE if an
+ // election occurs and no PID is elected (e.g., all PIDs are lost).
+ // The result is an error if the detector is not able to detect the
+ // leading master, possibly due to network disconnection.
+ //
+ // The future fails when the detector is destructed, it is never
+ // discarded.
+ //
+ // The 'previous' result (if any) should be passed back if this
+ // method is called repeatedly so the detector only returns when it
+ // gets a different result, either because an error is recovered or
+ // the elected membership is different from the 'previous'.
+ virtual process::Future<Result<process::UPID> > detect(
+ const Result<process::UPID>& previous = None()) = 0;
+};
+
+
+// A standalone implementation of the MasterDetector with no external
+// discovery mechanism so the user has to manually appoint a leader
+// to the detector for it to be detected.
+class StandaloneMasterDetector : public MasterDetector
+{
+public:
+ StandaloneMasterDetector();
+ // Use this constructor if the leader is known beforehand so it is
+ // unnecessary to call 'appoint()' separately.
+ StandaloneMasterDetector(const process::UPID& leader);
+ virtual ~StandaloneMasterDetector();
+
+ // Appoint the leading master so it can be *detected* by default.
+ // The leader can be NONE or ERROR.
+ // This method is used only by this basic implementation and not
+ // needed by child classes with other detection mechanisms such as
+ // Zookeeper.
+ //
+ // When used by Master, this method is called during its
+ // initialization process; when used by Slave and SchedulerDriver,
+ // the MasterDetector needs to have the leader installed prior to
+ // injection.
+ void appoint(const Result<process::UPID>& leader);
+
+ virtual process::Future<Result<process::UPID> > detect(
+ const Result<process::UPID>& previous = None());
+
+private:
+ StandaloneMasterDetectorProcess* process;
+};
+
+
+class ZooKeeperMasterDetector : public MasterDetector
+{
+public:
+ // Creates a detector which uses ZooKeeper to determine (i.e.,
+ // elect) a leading master.
+ ZooKeeperMasterDetector(const zookeeper::URL& url);
+ // A constructor overload for testing purposes.
+ ZooKeeperMasterDetector(Owned<zookeeper::Group> group);
+ virtual ~ZooKeeperMasterDetector();
+
+ // MasterDetector implementation.
+ virtual process::Future<Result<process::UPID> > detect(
+ const Result<process::UPID>& previous = None());
+
+private:
+ ZooKeeperMasterDetectorProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MASTER_DETECTOR_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/tests/master_contender_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_contender_detector_tests.cpp b/src/tests/master_contender_detector_tests.cpp
new file mode 100644
index 0000000..5e42374
--- /dev/null
+++ b/src/tests/master_contender_detector_tests.cpp
@@ -0,0 +1,647 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <zookeeper.h>
+
+#include <gmock/gmock.h>
+
+#include <fstream>
+#include <map>
+#include <string>
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/pid.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/gtest.hpp>
+#include <stout/nothing.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/try.hpp>
+
+#include "master/contender.hpp"
+#include "master/detector.hpp"
+#include "master/master.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/isolator.hpp"
+#include "tests/mesos.hpp"
+#ifdef MESOS_HAS_JAVA
+#include "tests/zookeeper.hpp"
+#endif
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::tests;
+
+using namespace zookeeper;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+using process::UPID;
+
+using std::map;
+using std::string;
+using std::vector;
+
+using testing::_;
+using testing::AtMost;
+using testing::Return;
+
+
+class MasterContenderDetectorTest : public MesosTest {};
+
+
+TEST_F(MasterContenderDetectorTest, File)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Write "master" to a file and use the "file://" mechanism to
+ // create a master detector for the slave. Still requires a master
+ // detector for the master first.
+ slave::Flags flags = CreateSlaveFlags();
+
+ const string& path = path::join(flags.work_dir, "master");
+ ASSERT_SOME(os::write(path, stringify(master.get())));
+
+ Try<MasterDetector*> detector =
+ MasterDetector::create("file://" + path);
+
+ ASSERT_SOME(detector);
+
+ StartSlave(detector.get(), flags);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.start();
+
+ AWAIT_READY(offers);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+TEST(BasicMasterContenderDetectorTest, Contender)
+{
+ PID<Master> master;
+ master.ip = 10000000;
+ master.port = 10000;
+
+ MasterContender* contender = new StandaloneMasterContender();
+
+ contender->initialize(master);
+
+ Future<Future<Nothing> > contended = contender->contend();
+ AWAIT_READY(contended);
+
+ Future<Nothing> lostCandidacy = contended.get();
+
+ // The candidacy is never lost.
+ EXPECT_TRUE(lostCandidacy.isPending());
+
+ delete contender;
+
+ // Deleting the contender also withdraws the previous candidacy.
+ AWAIT_READY(lostCandidacy);
+}
+
+
+TEST(BasicMasterContenderDetectorTest, Detector)
+{
+ PID<Master> master;
+ master.ip = 10000000;
+ master.port = 10000;
+
+ StandaloneMasterDetector detector;
+
+ Future<Result<UPID> > detected = detector.detect();
+
+ // No one has appointed the leader so we are pending.
+ EXPECT_TRUE(detected.isPending());
+
+ detector.appoint(master);
+
+ AWAIT_READY(detected);
+}
+
+
+#ifdef MESOS_HAS_JAVA
+class ZooKeeperMasterContenderDetectorTest : public ZooKeeperTest {};
+
+
+// A single contender gets elected automatically.
+TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContender)
+{
+ Try<zookeeper::URL> url = zookeeper::URL::parse(
+ "zk://" + server->connectString() + "/mesos");
+
+ ASSERT_SOME(url);
+
+ Owned<zookeeper::Group> group(
+ new Group(url.get(), MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+
+ ZooKeeperMasterContender* contender = new ZooKeeperMasterContender(group);
+
+ PID<Master> master;
+ master.ip = 10000000;
+ master.port = 10000;
+
+ contender->initialize(master);
+ Future<Future<Nothing> > contended = contender->contend();
+ AWAIT_READY(contended);
+
+ ZooKeeperMasterDetector detector(url.get());
+
+ Future<Result<UPID> > leader = detector.detect();
+ EXPECT_SOME_EQ(master, leader.get());
+ Future<Nothing> lostCandidacy = contended.get();
+ leader = detector.detect(leader.get());
+
+ Future<Option<int64_t> > sessionId = group.get()->session();
+ AWAIT_READY(sessionId);
+ server->expireSession(sessionId.get().get());
+
+ // Session expiration causes candidacy to be lost and the
+ // Future<Nothing> to be fulfilled.
+ AWAIT_READY(lostCandidacy);
+ AWAIT_READY(leader);
+ EXPECT_NONE(leader.get());
+}
+
+
+// Two contenders, the first wins. Kill the first, then the second
+// is elected.
+TEST_F(ZooKeeperMasterContenderDetectorTest, MasterContenders)
+{
+ Try<zookeeper::URL> url = zookeeper::URL::parse(
+ "zk://" + server->connectString() + "/mesos");
+
+ ASSERT_SOME(url);
+
+ ZooKeeperMasterContender* contender1 =
+ new ZooKeeperMasterContender(url.get());
+
+ PID<Master> master1;
+ master1.ip = 10000000;
+ master1.port = 10000;
+
+ contender1->initialize(master1);
+
+ Future<Future<Nothing> > contended1 = contender1->contend();
+ AWAIT_READY(contended1);
+
+ ZooKeeperMasterDetector detector1(url.get());
+
+ Future<Result<UPID> > leader1 = detector1.detect();
+ AWAIT_READY(leader1);
+ EXPECT_SOME_EQ(master1, leader1.get());
+
+ ZooKeeperMasterContender contender2(url.get());
+
+ PID<Master> master2;
+ master2.ip = 10000001;
+ master2.port = 10001;
+
+ contender2.initialize(master2);
+
+ Future<Future<Nothing> > contended2 = contender2.contend();
+ AWAIT_READY(contended2);
+
+ ZooKeeperMasterDetector detector2(url.get());
+ Future<Result<UPID> > leader2 = detector2.detect();
+ AWAIT_READY(leader2);
+ EXPECT_SOME_EQ(master1, leader2.get());
+
+ LOG(INFO) << "Killing the leading master";
+
+ // Destroying detector1 (below) causes leadership change.
+ delete contender1;
+
+ Future<Result<UPID> > leader3 = detector2.detect(master1);
+ AWAIT_READY(leader3);
+ EXPECT_SOME_EQ(master2, leader3.get());
+}
+
+
+// Master contention and detection fail when the network is down, it
+// recovers when the network is back up.
+TEST_F(ZooKeeperMasterContenderDetectorTest, ContenderDetectorShutdownNetwork)
+{
+ Clock::pause();
+
+ Try<zookeeper::URL> url = zookeeper::URL::parse(
+ "zk://" + server->connectString() + "/mesos");
+
+ ASSERT_SOME(url);
+
+ ZooKeeperMasterContender contender(url.get());
+
+ PID<Master> master;
+ master.ip = 10000000;
+ master.port = 10000;
+
+ contender.initialize(master);
+
+ Future<Future<Nothing> > contended = contender.contend();
+ AWAIT_READY(contended);
+ Future<Nothing> lostCandidacy = contended.get();
+
+ ZooKeeperMasterDetector detector(url.get());
+
+ Future<Result<UPID> > leader = detector.detect();
+ EXPECT_SOME_EQ(master, leader.get());
+
+ leader = detector.detect(leader.get());
+
+ // Shut down ZooKeeper and expect things to fail after the timeout.
+ server->shutdownNetwork();
+
+ Clock::advance(std::max(
+ MASTER_DETECTOR_ZK_SESSION_TIMEOUT,
+ MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+ Clock::settle();
+
+ AWAIT_EXPECT_FAILED(lostCandidacy);
+ AWAIT_READY(leader);
+ EXPECT_ERROR(leader.get());
+
+ // Retry.
+ contended = contender.contend();
+ leader = detector.detect(leader.get());
+
+ // Things will not change until the contender reconnects.
+ Clock::advance(Minutes(1));
+ Clock::settle();
+ EXPECT_TRUE(contended.isPending());
+ EXPECT_TRUE(leader.isPending());
+
+ server->startNetwork();
+ AWAIT_READY(contended);
+ AWAIT_READY(leader);
+}
+
+
+// Tests that detectors and contenders fail when we reach our
+// ZooKeeper session timeout. This is to enforce that we manually
+// expire the session when we do not get reconnected within the
+// timeout.
+TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorTimedoutSession)
+{
+ // Use an arbitrary timeout value.
+ Duration sessionTimeout(Seconds(5));
+
+ Try<zookeeper::URL> url = zookeeper::URL::parse(
+ "zk://" + server->connectString() + "/mesos");
+
+ ASSERT_SOME(url);
+
+ Owned<zookeeper::Group> leaderGroup(new Group(url.get(), sessionTimeout));
+
+ // First we bring up three master contender/detector:
+ // 1. A leading contender.
+ // 2. A non-leading contender.
+ // 3. A non-contender (detector).
+
+ // 1. Simulate a leading contender.
+ ZooKeeperMasterContender leaderContender(leaderGroup);
+
+ PID<Master> leader;
+ leader.ip = 10000000;
+ leader.port = 10000;
+
+ leaderContender.initialize(leader);
+
+ Future<Future<Nothing> > contended = leaderContender.contend();
+ AWAIT_READY(contended);
+
+ ZooKeeperMasterDetector leaderDetector(leaderGroup);
+
+ Future<Result<UPID> > detected = leaderDetector.detect();
+ AWAIT_READY(detected);
+ EXPECT_SOME_EQ(leader, detected.get());
+
+ // 2. Simulate a non-leading contender.
+ Owned<zookeeper::Group> followerGroup(new Group(url.get(), sessionTimeout));
+ ZooKeeperMasterContender followerContender(followerGroup);
+
+ PID<Master> follower;
+ follower.ip = 10000001;
+ follower.port = 10001;
+
+ followerContender.initialize(follower);
+
+ contended = followerContender.contend();
+ AWAIT_READY(contended);
+
+ ZooKeeperMasterDetector followerDetector(followerGroup);
+
+ detected = followerDetector.detect();
+ AWAIT_READY(detected);
+ EXPECT_SOME_EQ(leader, detected.get());
+
+ // 3. Simulate a non-contender.
+ Owned<zookeeper::Group> nonContenderGroup(
+ new Group(url.get(), sessionTimeout));
+ ZooKeeperMasterDetector nonContenderDetector(nonContenderGroup);
+
+ detected = nonContenderDetector.detect();
+
+ EXPECT_SOME_EQ(leader, detected.get());
+
+ // Expecting the reconnecting event after we shut down the ZK.
+ Future<Nothing> leaderReconnecting = FUTURE_DISPATCH(
+ leaderGroup->process->self(),
+ &GroupProcess::reconnecting);
+
+ Future<Nothing> followerReconnecting = FUTURE_DISPATCH(
+ followerGroup->process->self(),
+ &GroupProcess::reconnecting);
+
+ Future<Nothing> nonContenderReconnecting = FUTURE_DISPATCH(
+ nonContenderGroup->process->self(),
+ &GroupProcess::reconnecting);
+
+ server->shutdownNetwork();
+
+ AWAIT_READY(leaderReconnecting);
+ AWAIT_READY(followerReconnecting);
+ AWAIT_READY(nonContenderReconnecting);
+
+ // Now the detectors re-detect.
+ Future<Result<UPID> > leaderNoMasterDetected = leaderDetector.detect(leader);
+ Future<Result<UPID> > followerNoMasterDetected =
+ followerDetector.detect(leader);
+ Future<Result<UPID> > nonContenderNoMasterDetected =
+ nonContenderDetector.detect(leader);
+
+ Clock::pause();
+
+ // We may need to advance multiple times because we could have
+ // advanced the clock before the timer in Group starts.
+ while (leaderNoMasterDetected.isPending() ||
+ followerNoMasterDetected.isPending() ||
+ nonContenderNoMasterDetected.isPending()) {
+ Clock::advance(sessionTimeout);
+ Clock::settle();
+ }
+
+ AWAIT_READY(leaderNoMasterDetected);
+ EXPECT_ERROR(leaderNoMasterDetected.get());
+ AWAIT_READY(followerNoMasterDetected);
+ EXPECT_ERROR(followerNoMasterDetected.get());
+ AWAIT_READY(nonContenderNoMasterDetected);
+ EXPECT_ERROR(nonContenderNoMasterDetected.get());
+
+ Clock::resume();
+}
+
+
+// Tests whether a leading master correctly detects a new master when
+// its ZooKeeper session is expired (the follower becomes the new
+// leader).
+TEST_F(ZooKeeperMasterContenderDetectorTest,
+ MasterDetectorExpireMasterZKSession)
+{
+ // Simulate a leading master.
+ Try<zookeeper::URL> url = zookeeper::URL::parse(
+ "zk://" + server->connectString() + "/mesos");
+
+ ASSERT_SOME(url);
+
+ PID<Master> leader;
+ leader.ip = 10000000;
+ leader.port = 10000;
+
+ // Create the group instance so we can expire its session.
+ Owned<zookeeper::Group> group(
+ new Group(url.get(), MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+
+ ZooKeeperMasterContender leaderContender(group);
+
+ leaderContender.initialize(leader);
+
+ Future<Future<Nothing> > leaderContended = leaderContender.contend();
+ AWAIT_READY(leaderContended);
+
+ Future<Nothing> leaderLostLeadership = leaderContended.get();
+
+ ZooKeeperMasterDetector leaderDetector(url.get());
+
+ Future<Result<UPID> > detected = leaderDetector.detect();
+ AWAIT_READY(detected);
+ EXPECT_SOME_EQ(leader, detected.get());
+
+ // Keep detecting.
+ Future<Result<UPID> > newLeaderDetected =
+ leaderDetector.detect(detected.get());
+
+ // Simulate a following master.
+ PID<Master> follower;
+ follower.ip = 10000001;
+ follower.port = 10001;
+
+ ZooKeeperMasterDetector followerDetector(url.get());
+ ZooKeeperMasterContender followerContender(url.get());
+ followerContender.initialize(follower);
+
+ Future<Future<Nothing> > followerContended = followerContender.contend();
+ AWAIT_READY(followerContended);
+
+ LOG(INFO) << "The follower now is detecting the leader";
+ detected = followerDetector.detect(None());
+ AWAIT_READY(detected);
+ EXPECT_SOME_EQ(leader, detected.get());
+
+ // Now expire the leader's zk session.
+ Future<Option<int64_t> > session = group->session();
+ AWAIT_READY(session);
+ EXPECT_SOME(session.get());
+
+ LOG(INFO) << "Now expire the ZK session: " << std::hex << session.get().get();
+
+ server->expireSession(session.get().get());
+
+ AWAIT_READY(leaderLostLeadership);
+
+ // Wait for session expiration and ensure the former leader detects
+ // a new leader.
+ AWAIT_READY(newLeaderDetected);
+ EXPECT_SOME(newLeaderDetected.get());
+ EXPECT_EQ(follower, newLeaderDetected.get().get());
+}
+
+
+// Tests whether a slave correctly DOES NOT disconnect from the
+// master when its ZooKeeper session is expired, but the master still
+// stays the leader when the slave re-connects with the ZooKeeper.
+TEST_F(ZooKeeperMasterContenderDetectorTest, MasterDetectorExpireSlaveZKSession)
+{
+ // Simulate a leading master.
+ Try<zookeeper::URL> url = zookeeper::URL::parse(
+ "zk://" + server->connectString() + "/mesos");
+
+ ASSERT_SOME(url);
+
+ PID<Master> master;
+ master.ip = 10000000;
+ master.port = 10000;
+
+ ZooKeeperMasterContender masterContender(url.get());
+ masterContender.initialize(master);
+
+ Future<Future<Nothing> > leaderContended = masterContender.contend();
+ AWAIT_READY(leaderContended);
+
+ // Simulate a slave.
+ Owned<zookeeper::Group> group(
+ new Group(url.get(), MASTER_DETECTOR_ZK_SESSION_TIMEOUT));
+
+ ZooKeeperMasterDetector slaveDetector(group);
+
+ Future<Result<UPID> > detected = slaveDetector.detect();
+ AWAIT_READY(detected);
+ EXPECT_SOME_EQ(master, detected.get());
+
+ detected = slaveDetector.detect(master);
+
+ // Now expire the slave's zk session.
+ Future<Option<int64_t> > session = group->session();
+ AWAIT_READY(session);
+
+ Future<Nothing> connected = FUTURE_DISPATCH(
+ group->process->self(),
+ &GroupProcess::connected);
+
+ server->expireSession(session.get().get());
+
+ // When connected is called, the leader has already expired and
+ // reconnected.
+ AWAIT_READY(connected);
+
+ // Still pending because there is no leader change.
+ EXPECT_TRUE(detected.isPending());
+}
+
+
+// Tests whether a slave correctly detects the new master when its
+// ZooKeeper session is expired and a new master is elected before the
+// slave reconnects with ZooKeeper.
+TEST_F(ZooKeeperMasterContenderDetectorTest,
+ MasterDetectorExpireSlaveZKSessionNewMaster)
+{
+ Try<zookeeper::URL> url = zookeeper::URL::parse(
+ "zk://" + server->connectString() + "/mesos");
+
+ ASSERT_SOME(url);
+
+ // Simulate a leading master.
+ Owned<zookeeper::Group> leaderGroup(
+ new Group(url.get(), MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+
+ // 1. Simulate a leading contender.
+ ZooKeeperMasterContender leaderContender(leaderGroup);
+ ZooKeeperMasterDetector leaderDetector(leaderGroup);
+
+ PID<Master> leader;
+ leader.ip = 10000000;
+ leader.port = 10000;
+
+ leaderContender.initialize(leader);
+
+ Future<Future<Nothing> > contended = leaderContender.contend();
+ AWAIT_READY(contended);
+
+ Future<Result<UPID> > detected = leaderDetector.detect(None());
+ AWAIT_READY(detected);
+ EXPECT_SOME_EQ(leader, detected.get());
+
+ // 2. Simulate a non-leading contender.
+ Owned<zookeeper::Group> followerGroup(
+ new Group(url.get(), MASTER_CONTENDER_ZK_SESSION_TIMEOUT));
+ ZooKeeperMasterContender followerContender(followerGroup);
+ ZooKeeperMasterDetector followerDetector(followerGroup);
+
+ PID<Master> follower;
+ follower.ip = 10000001;
+ follower.port = 10001;
+
+ followerContender.initialize(follower);
+
+ contended = followerContender.contend();
+ AWAIT_READY(contended);
+
+ detected = followerDetector.detect(None());
+ EXPECT_SOME_EQ(leader, detected.get());
+
+ // 3. Simulate a non-contender.
+ Owned<zookeeper::Group> nonContenderGroup(
+ new Group(url.get(), MASTER_DETECTOR_ZK_SESSION_TIMEOUT));
+ ZooKeeperMasterDetector nonContenderDetector(nonContenderGroup);
+
+ detected = nonContenderDetector.detect();
+
+ EXPECT_SOME_EQ(leader, detected.get());
+
+ detected = nonContenderDetector.detect(leader);
+
+ // Now expire the slave's and leading master's zk sessions.
+ // NOTE: Here we assume that slave stays disconnected from the ZK
+ // when the leading master loses its session.
+ Future<Option<int64_t> > slaveSession = nonContenderGroup->session();
+ AWAIT_READY(slaveSession);
+
+ Future<Option<int64_t> > masterSession = leaderGroup->session();
+ AWAIT_READY(masterSession);
+
+ server->expireSession(slaveSession.get().get());
+ server->expireSession(masterSession.get().get());
+
+ // Wait for session expiration and ensure a new master is detected.
+ AWAIT_READY(detected);
+
+ EXPECT_SOME_EQ(follower, detected.get());
+}
+
+#endif // MESOS_HAS_JAVA
http://git-wip-us.apache.org/repos/asf/mesos/blob/bcd1dc4e/src/tests/master_detector_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_detector_tests.cpp b/src/tests/master_detector_tests.cpp
deleted file mode 100644
index 06c586d..0000000
--- a/src/tests/master_detector_tests.cpp
+++ /dev/null
@@ -1,628 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <zookeeper.h>
-
-#include <gmock/gmock.h>
-
-#include <fstream>
-#include <map>
-#include <string>
-#include <vector>
-
-#include <mesos/executor.hpp>
-#include <mesos/scheduler.hpp>
-
-#include <process/clock.hpp>
-#include <process/future.hpp>
-#include <process/pid.hpp>
-#include <process/protobuf.hpp>
-
-#include <stout/duration.hpp>
-#include <stout/gtest.hpp>
-#include <stout/nothing.hpp>
-#include <stout/os.hpp>
-#include <stout/path.hpp>
-#include <stout/try.hpp>
-
-#include "detector/detector.hpp"
-
-#include "master/master.hpp"
-
-#include "messages/messages.hpp"
-
-#include "slave/slave.hpp"
-
-#include "tests/isolator.hpp"
-#include "tests/mesos.hpp"
-#ifdef MESOS_HAS_JAVA
-#include "tests/zookeeper.hpp"
-#endif
-
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::tests;
-
-using mesos::internal::master::Master;
-
-using mesos::internal::slave::Slave;
-
-using process::Clock;
-using process::Future;
-using process::PID;
-using process::UPID;
-
-using std::map;
-using std::string;
-using std::vector;
-
-using testing::_;
-using testing::AtMost;
-using testing::Return;
-
-
-class MasterDetectorTest : public MesosTest {};
-
-
-TEST_F(MasterDetectorTest, File)
-{
- Try<PID<Master> > master = StartMaster();
- ASSERT_SOME(master);
-
- Files files;
- TestingIsolator isolator;
-
- slave::Flags flags = CreateSlaveFlags();
-
- Slave s(flags, true, &isolator, &files);
- PID<Slave> slave = process::spawn(&s);
-
- // Write "master" to a file and use the "file://" mechanism to
- // create a master detector for the slave. Still requires a master
- // detector for the master first.
- BasicMasterDetector detector1(master.get(), vector<UPID>(), true);
-
- const string& path = path::join(flags.work_dir, "master");
- ASSERT_SOME(os::write(path, stringify(master.get())));
-
- Try<MasterDetector*> detector =
- MasterDetector::create("file://" + path, slave, false, true);
-
- EXPECT_SOME(os::rm(path));
-
- ASSERT_SOME(detector);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _))
- .Times(1);
-
- Future<vector<Offer> > offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers));
-
- driver.start();
-
- AWAIT_READY(offers);
-
- driver.stop();
- driver.join();
-
- Shutdown();
-
- process::terminate(slave);
- process::wait(slave);
-}
-
-
-class MockMasterDetectorListenerProcess
- : public ProtobufProcess<MockMasterDetectorListenerProcess>
-{
-public:
- MockMasterDetectorListenerProcess() {}
- virtual ~MockMasterDetectorListenerProcess() {}
-
- MOCK_METHOD1(newMasterDetected, void(const process::UPID&));
- MOCK_METHOD0(noMasterDetected, void(void));
-
-protected:
- virtual void initialize()
- {
- install<NewMasterDetectedMessage>(
- &MockMasterDetectorListenerProcess::newMasterDetected,
- &NewMasterDetectedMessage::pid);
-
- install<NoMasterDetectedMessage>(
- &MockMasterDetectorListenerProcess::noMasterDetected);
- }
-};
-
-
-#ifdef MESOS_HAS_JAVA
-class ZooKeeperMasterDetectorTest : public ZooKeeperTest {};
-
-
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetector)
-{
- MockMasterDetectorListenerProcess mock;
- process::spawn(mock);
-
- Future<Nothing> newMasterDetected;
- EXPECT_CALL(mock, newMasterDetected(mock.self()))
- .WillOnce(FutureSatisfy(&newMasterDetected));
-
- std::string master = "zk://" + server->connectString() + "/mesos";
-
- Try<MasterDetector*> detector =
- MasterDetector::create(master, mock.self(), true, true);
-
- ASSERT_SOME(detector);
-
- AWAIT_READY(newMasterDetected);
-
- MasterDetector::destroy(detector.get());
-
- process::terminate(mock);
- process::wait(mock);
-}
-
-
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetectors)
-{
- MockMasterDetectorListenerProcess mock1;
- process::spawn(mock1);
-
- Future<Nothing> newMasterDetected1;
- EXPECT_CALL(mock1, newMasterDetected(mock1.self()))
- .WillOnce(FutureSatisfy(&newMasterDetected1));
-
- std::string master = "zk://" + server->connectString() + "/mesos";
-
- Try<MasterDetector*> detector1 =
- MasterDetector::create(master, mock1.self(), true, true);
-
- ASSERT_SOME(detector1);
-
- AWAIT_READY(newMasterDetected1);
-
- MockMasterDetectorListenerProcess mock2;
- process::spawn(mock2);
-
- Future<Nothing> newMasterDetected2;
- EXPECT_CALL(mock2, newMasterDetected(mock1.self())) // N.B. mock1
- .WillOnce(FutureSatisfy(&newMasterDetected2));
-
- Try<MasterDetector*> detector2 =
- MasterDetector::create(master, mock2.self(), true, true);
-
- ASSERT_SOME(detector2);
-
- AWAIT_READY(newMasterDetected2);
-
- // Destroying detector1 (below) might cause another election so we
- // need to set up expectations appropriately.
- EXPECT_CALL(mock2, newMasterDetected(_))
- .WillRepeatedly(Return());
-
- MasterDetector::destroy(detector1.get());
-
- process::terminate(mock1);
- process::wait(mock1);
-
- MasterDetector::destroy(detector2.get());
-
- process::terminate(mock2);
- process::wait(mock2);
-}
-
-
-// Disabled due to MESOS-455.
-TEST_F(ZooKeeperMasterDetectorTest, DISABLED_MasterDetectorShutdownNetwork)
-{
- Clock::pause();
-
- MockMasterDetectorListenerProcess mock;
- process::spawn(mock);
-
- Future<Nothing> newMasterDetected1;
- EXPECT_CALL(mock, newMasterDetected(mock.self()))
- .WillOnce(FutureSatisfy(&newMasterDetected1));
-
- std::string master = "zk://" + server->connectString() + "/mesos";
-
- Try<MasterDetector*> detector =
- MasterDetector::create(master, mock.self(), true, true);
-
- ASSERT_SOME(detector);
-
- AWAIT_READY(newMasterDetected1);
-
- Future<Nothing> noMasterDetected;
- EXPECT_CALL(mock, noMasterDetected())
- .WillOnce(FutureSatisfy(&noMasterDetected));
-
- server->shutdownNetwork();
-
- Clock::advance(Seconds(10)); // TODO(benh): Get session timeout from detector.
-
- AWAIT_READY(noMasterDetected);
-
- Future<Nothing> newMasterDetected2;
- EXPECT_CALL(mock, newMasterDetected(mock.self()))
- .WillOnce(FutureSatisfy(&newMasterDetected2));
-
- server->startNetwork();
-
- AWAIT_READY(newMasterDetected2);
-
- MasterDetector::destroy(detector.get());
-
- process::terminate(mock);
- process::wait(mock);
-
- Clock::resume();
-}
-
-
-// Tests that a detector sends a NoMasterDetectedMessage when we
-// reach our ZooKeeper session timeout. This is to enforce that we
-// manually expire the session when we don't get reconnected within
-// the ZOOKEEPER_SESSION_TIMEOUT.
-TEST_F(ZooKeeperTest, MasterDetectorTimedoutSession)
-{
- Try<zookeeper::URL> url =
- zookeeper::URL::parse("zk://" + server->connectString() + "/mesos");
- ASSERT_SOME(url);
-
- // First we bring up three master detector listeners:
- // 1. A leading contender.
- // 2. A non-leading contender.
- // 3. A non-contender.
-
- // 1. Simulate a leading contender.
- MockMasterDetectorListenerProcess leader;
-
- Future<Nothing> newMasterDetected;
- EXPECT_CALL(leader, newMasterDetected(_))
- .WillOnce(FutureSatisfy(&newMasterDetected));
-
- process::spawn(leader);
-
- ZooKeeperMasterDetector leaderDetector(
- url.get(), leader.self(), true, true);
-
- AWAIT_READY(newMasterDetected);
-
- // 2. Simulate a non-leading contender.
- MockMasterDetectorListenerProcess follower;
-
- EXPECT_CALL(follower, newMasterDetected(_))
- .WillOnce(FutureSatisfy(&newMasterDetected));
-
- process::spawn(follower);
-
- ZooKeeperMasterDetector followerDetector(
- url.get(), follower.self(), true, true);
-
- AWAIT_READY(newMasterDetected);
-
- // 3. Simulate a non-contender.
- MockMasterDetectorListenerProcess nonContender;
-
- EXPECT_CALL(nonContender, newMasterDetected(_))
- .WillOnce(FutureSatisfy(&newMasterDetected));
-
- process::spawn(nonContender);
-
- ZooKeeperMasterDetector nonContenderDetector(
- url.get(), nonContender.self(), false, true);
-
- AWAIT_READY(newMasterDetected);
-
- // Now we want to induce lost connections on each of the
- // master detectors.
- // Induce a reconnection on the leader.
- Future<Nothing> leaderReconnecting = FUTURE_DISPATCH(
- leaderDetector.process->self(),
- &ZooKeeperMasterDetectorProcess::reconnecting);
-
- dispatch(leaderDetector.process,
- &ZooKeeperMasterDetectorProcess::reconnecting);
-
- AWAIT_READY(leaderReconnecting);
-
- // Induce a reconnection on the follower.
- Future<Nothing> followerReconnecting = FUTURE_DISPATCH(
- followerDetector.process->self(),
- &ZooKeeperMasterDetectorProcess::reconnecting);
-
- dispatch(followerDetector.process,
- &ZooKeeperMasterDetectorProcess::reconnecting);
-
- AWAIT_READY(followerReconnecting);
-
- // Induce a reconnection on the non-contender.
- Future<Nothing> nonContenderReconnecting = FUTURE_DISPATCH(
- nonContenderDetector.process->self(),
- &ZooKeeperMasterDetectorProcess::reconnecting);
-
- dispatch(nonContenderDetector.process,
- &ZooKeeperMasterDetectorProcess::reconnecting);
-
- AWAIT_READY(nonContenderReconnecting);
-
- // Now induce the reconnection timeout.
- Future<Nothing> leaderNoMasterDetected;
- EXPECT_CALL(leader, noMasterDetected())
- .WillOnce(FutureSatisfy(&leaderNoMasterDetected));
-
- Future<Nothing> followerNoMasterDetected;
- EXPECT_CALL(follower, noMasterDetected())
- .WillOnce(FutureSatisfy(&followerNoMasterDetected));
-
- Future<Nothing> nonContenderNoMasterDetected;
- EXPECT_CALL(nonContender, noMasterDetected())
- .WillOnce(FutureSatisfy(&nonContenderNoMasterDetected));
-
- Clock::pause();
- Clock::advance(ZOOKEEPER_SESSION_TIMEOUT);
- Clock::settle();
-
- AWAIT_READY(leaderNoMasterDetected);
- AWAIT_READY(followerNoMasterDetected);
- AWAIT_READY(nonContenderNoMasterDetected);
-
- process::terminate(leader);
- process::wait(leader);
-
- process::terminate(follower);
- process::wait(follower);
-
- process::terminate(nonContender);
- process::wait(nonContender);
-}
-
-
-// Tests whether a leading master correctly detects a new master when
-// its ZooKeeper session is expired.
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetectorExpireMasterZKSession)
-{
- // Simulate a leading master.
- MockMasterDetectorListenerProcess leader;
-
- Future<Nothing> newMasterDetected1, newMasterDetected2;
- EXPECT_CALL(leader, newMasterDetected(_))
- .WillOnce(FutureSatisfy(&newMasterDetected1))
- .WillOnce(FutureSatisfy(&newMasterDetected2));
-
- EXPECT_CALL(leader, noMasterDetected())
- .Times(0);
-
- process::spawn(leader);
-
- std::string znode = "zk://" + server->connectString() + "/mesos";
-
- Try<zookeeper::URL> url = zookeeper::URL::parse(znode);
- ASSERT_SOME(url);
-
- // Leader's detector.
- ZooKeeperMasterDetector leaderDetector(
- url.get(), leader.self(), true, true);
-
- AWAIT_READY(newMasterDetected1);
-
- // Simulate a following master.
- MockMasterDetectorListenerProcess follower;
-
- Future<Nothing> newMasterDetected3;
- EXPECT_CALL(follower, newMasterDetected(_))
- .WillOnce(FutureSatisfy(&newMasterDetected3))
- .WillRepeatedly(Return());
-
- EXPECT_CALL(follower, noMasterDetected())
- .Times(0);
-
- process::spawn(follower);
-
- // Follower's detector.
- ZooKeeperMasterDetector followerDetector(
- url.get(),
- follower.self(),
- true,
- true);
-
- AWAIT_READY(newMasterDetected3);
-
- // Now expire the leader's zk session.
- Future<int64_t> session = leaderDetector.session();
- AWAIT_READY(session);
-
- server->expireSession(session.get());
-
- // Wait for session expiration and ensure we receive a
- // NewMasterDetected message.
- AWAIT_READY(newMasterDetected2);
-
- process::terminate(follower);
- process::wait(follower);
-
- process::terminate(leader);
- process::wait(leader);
-}
-
-
-// Tests whether a slave correctly DOES NOT disconnect from the master
-// when its ZooKeeper session is expired, but the master still stays
-// the leader when the slave re-connects with the ZooKeeper.
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetectorExpireSlaveZKSession)
-{
- // Simulate a leading master.
- MockMasterDetectorListenerProcess master;
-
- Future<Nothing> newMasterDetected1;
- EXPECT_CALL(master, newMasterDetected(_))
- .WillOnce(FutureSatisfy(&newMasterDetected1));
-
- EXPECT_CALL(master, noMasterDetected())
- .Times(0);
-
- process::spawn(master);
-
- std::string znode = "zk://" + server->connectString() + "/mesos";
-
- Try<zookeeper::URL> url = zookeeper::URL::parse(znode);
- ASSERT_SOME(url);
-
- // Leading master's detector.
- ZooKeeperMasterDetector masterDetector(
- url.get(), master.self(), true, true);
-
- AWAIT_READY(newMasterDetected1);
-
- // Simulate a slave.
- MockMasterDetectorListenerProcess slave;
-
- Future<Nothing> newMasterDetected2, newMasterDetected3;
- EXPECT_CALL(slave, newMasterDetected(_))
- .Times(1)
- .WillOnce(FutureSatisfy(&newMasterDetected2));
-
- EXPECT_CALL(slave, noMasterDetected())
- .Times(0);
-
- process::spawn(slave);
-
- // Slave's master detector.
- ZooKeeperMasterDetector slaveDetector(
- url.get(), slave.self(), false, true);
-
- AWAIT_READY(newMasterDetected2);
-
- // Now expire the slave's zk session.
- Future<int64_t> session = slaveDetector.session();
- AWAIT_READY(session);
-
- server->expireSession(session.get());
-
- // Wait for enough time to ensure no NewMasterDetected message is sent.
- os::sleep(Seconds(4)); // ZooKeeper needs extra time for session expiration.
-
- process::terminate(slave);
- process::wait(slave);
-
- process::terminate(master);
- process::wait(master);
-}
-
-
-// Tests whether a slave correctly detects the new master when its
-// ZooKeeper session is expired and a new master is elected before the
-// slave reconnects with ZooKeeper.
-TEST_F(ZooKeeperMasterDetectorTest, MasterDetectorExpireSlaveZKSessionNewMaster)
-{
- // Simulate a leading master.
- MockMasterDetectorListenerProcess master1;
-
- Future<Nothing> newMasterDetected1;
- EXPECT_CALL(master1, newMasterDetected(_))
- .WillOnce(FutureSatisfy(&newMasterDetected1))
- .WillRepeatedly(Return());
-
- EXPECT_CALL(master1, noMasterDetected())
- .Times(0);
-
- process::spawn(master1);
-
- std::string znode = "zk://" + server->connectString() + "/mesos";
-
- Try<zookeeper::URL> url = zookeeper::URL::parse(znode);
- ASSERT_SOME(url);
-
- // Leading master's detector.
- ZooKeeperMasterDetector masterDetector1(
- url.get(), master1.self(), true, true);
-
- AWAIT_READY(newMasterDetected1);
-
- // Simulate a non-leading master.
- MockMasterDetectorListenerProcess master2;
-
- Future<Nothing> newMasterDetected2;
- EXPECT_CALL(master2, newMasterDetected(_))
- .WillOnce(FutureSatisfy(&newMasterDetected2))
- .WillRepeatedly(Return());
-
- EXPECT_CALL(master2, noMasterDetected())
- .Times(0);
-
- process::spawn(master2);
-
- // Non-leading master's detector.
- ZooKeeperMasterDetector masterDetector2(
- url.get(), master2.self(), true, true);
-
- AWAIT_READY(newMasterDetected2);
-
- // Simulate a slave.
- MockMasterDetectorListenerProcess slave;
-
- Future<Nothing> newMasterDetected3, newMasterDetected4;
- EXPECT_CALL(slave, newMasterDetected(_))
- .WillOnce(FutureSatisfy(&newMasterDetected3))
- .WillOnce(FutureSatisfy(&newMasterDetected4));
-
- EXPECT_CALL(slave, noMasterDetected())
- .Times(AtMost(1));
-
- process::spawn(slave);
-
- // Slave's master detector.
- ZooKeeperMasterDetector slaveDetector(
- url.get(), slave.self(), false, true);
-
- AWAIT_READY(newMasterDetected3);
-
- // Now expire the slave's and leading master's zk sessions.
- // NOTE: Here we assume that slave stays disconnected from the ZK when the
- // leading master loses its session.
- Future<int64_t> slaveSession = slaveDetector.session();
- AWAIT_READY(slaveSession);
-
- server->expireSession(slaveSession.get());
-
- Future<int64_t> masterSession = masterDetector1.session();
- AWAIT_READY(masterSession);
-
- server->expireSession(masterSession.get());
-
- // Wait for session expiration and ensure we receive a
- // NewMasterDetected message.
- AWAIT_READY_FOR(newMasterDetected4, Seconds(10));
-
- process::terminate(slave);
- process::wait(slave);
-
- process::terminate(master2);
- process::wait(master2);
-
- process::terminate(master1);
- process::wait(master1);
-}
-#endif // MESOS_HAS_JAVA