You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/11/19 19:57:49 UTC
[8/8] git commit: Replaced usage of old detector with new Master
contender and detector abstractions.
Replaced usage of old detector with new Master contender and detector
abstractions.
From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/15510
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f9d1dd81
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f9d1dd81
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f9d1dd81
Branch: refs/heads/master
Commit: f9d1dd819b6cc3843e4d1287ac10276d62cbfed4
Parents: bcd1dc4
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Nov 19 10:39:27 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Nov 19 10:57:19 2013 -0800
----------------------------------------------------------------------
include/mesos/scheduler.hpp | 12 +-
src/Makefile.am | 3 +-
src/cli/resolve.cpp | 27 +-
src/detector/detector.cpp | 580 -------------------------------
src/detector/detector.hpp | 216 ------------
src/local/local.cpp | 22 +-
src/local/main.cpp | 2 -
src/master/http.cpp | 9 +-
src/master/main.cpp | 35 +-
src/master/master.cpp | 128 ++++---
src/master/master.hpp | 31 +-
src/messages/messages.proto | 11 -
src/sched/sched.cpp | 236 +++++++------
src/slave/http.cpp | 4 +-
src/slave/main.cpp | 15 +-
src/slave/slave.cpp | 158 ++++-----
src/slave/slave.hpp | 22 +-
src/tests/allocator_tests.cpp | 3 +-
src/tests/authentication_tests.cpp | 53 +--
src/tests/cluster.hpp | 113 ++++--
src/tests/fault_tolerance_tests.cpp | 123 ++++---
src/tests/gc_tests.cpp | 3 +-
src/tests/isolator_tests.cpp | 3 +-
src/tests/master_tests.cpp | 27 +-
src/tests/mesos.cpp | 41 +++
src/tests/mesos.hpp | 59 ++++
src/tests/slave_recovery_tests.cpp | 16 +-
27 files changed, 686 insertions(+), 1266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 380e087..161cc65 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -40,6 +40,7 @@ namespace mesos {
class SchedulerDriver;
namespace internal {
+class MasterDetector;
class SchedulerProcess;
}
@@ -386,9 +387,12 @@ private:
FrameworkInfo framework;
std::string master;
- // Libprocess process for communicating with master.
+ // Used for communicating with the master.
internal::SchedulerProcess* process;
+ // URL for the master (e.g., zk://, file://, etc).
+ std::string url;
+
// Mutex to enforce all non-callbacks are execute serially.
pthread_mutex_t mutex;
@@ -397,6 +401,12 @@ private:
// Current status of the driver.
Status status;
+
+ const Credential* credential;
+
+protected:
+ // Used to detect (i.e., choose) the master.
+ internal::MasterDetector* detector;
};
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 969aead..feda34b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -183,7 +183,6 @@ libmesos_no_3rdparty_la_SOURCES = \
launcher/launcher.cpp \
exec/exec.cpp \
common/lock.cpp \
- detector/detector.cpp \
common/date_utils.cpp \
common/resources.cpp \
common/attributes.cpp \
@@ -220,7 +219,7 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp \
common/protobuf_utils.hpp \
common/lock.hpp \
common/type_utils.hpp common/thread.hpp common/units.hpp \
- detector/detector.hpp examples/utils.hpp files/files.hpp \
+ examples/utils.hpp files/files.hpp \
hdfs/hdfs.hpp \
launcher/launcher.hpp linux/cgroups.hpp \
linux/fs.hpp local/flags.hpp local/local.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/cli/resolve.cpp
----------------------------------------------------------------------
diff --git a/src/cli/resolve.cpp b/src/cli/resolve.cpp
index b05f510..dddadfc 100644
--- a/src/cli/resolve.cpp
+++ b/src/cli/resolve.cpp
@@ -29,7 +29,7 @@
#include <stout/strings.hpp>
#include <stout/try.hpp>
-#include "detector/detector.hpp"
+#include "master/detector.hpp"
using namespace mesos;
using namespace mesos::internal;
@@ -117,19 +117,32 @@ int main(int argc, char** argv)
return -1;
}
- Future<UPID> pid = mesos::internal::detect(master.get(), !verbose);
+ Try<MasterDetector*> detector = MasterDetector::create(master.get());
+
+ if (detector.isError()) {
+ cerr << "Failed to create a master detector: " << detector.error() << endl;
+ return -1;
+ }
+
+ Future<Result<UPID> > pid = detector.get()->detect();
if (!pid.await(timeout)) {
cerr << "Failed to detect master from '" << master.get()
<< "' within " << timeout << endl;
return -1;
- } else if (pid.isFailed()) {
- cerr << "Failed to detect master from '" << master.get()
- << "': " << pid.failure() << endl;
- return -1;
+ } else {
+ // Not expecting detect() to fail or discard the future.
+ CHECK(pid.isReady());
+ if (pid.get().isError()) {
+ cerr << "Failed to detect master from '" << master.get()
+ << "': " << pid.failure() << endl;
+ return -1;
+ }
}
- cout << string(pid.get()).substr(7) << endl;
+ // The future is not satisfied unless the result is Some.
+ CHECK_SOME(pid.get());
+ cout << strings::remove(pid.get().get(), "master@") << endl;
return 0;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/detector/detector.cpp
----------------------------------------------------------------------
diff --git a/src/detector/detector.cpp b/src/detector/detector.cpp
deleted file mode 100644
index 8d9f118..0000000
--- a/src/detector/detector.cpp
+++ /dev/null
@@ -1,580 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <glog/logging.h>
-
-#include <fstream>
-#include <ios>
-#include <vector>
-
-#include <tr1/memory> // TODO(benh): Replace shared_ptr with unique_ptr.
-
-#include <process/delay.hpp>
-#include <process/dispatch.hpp>
-#include <process/future.hpp>
-#include <process/process.hpp>
-#include <process/protobuf.hpp>
-#include <process/timer.hpp>
-
-#include <stout/error.hpp>
-#include <stout/foreach.hpp>
-#include <stout/lambda.hpp>
-#include <stout/none.hpp>
-#include <stout/numify.hpp>
-#include <stout/option.hpp>
-#include <stout/try.hpp>
-
-#include "detector/detector.hpp"
-
-#include "logging/logging.hpp"
-
-#include "messages/messages.hpp"
-
-#include "zookeeper/authentication.hpp"
-#include "zookeeper/url.hpp"
-#include "zookeeper/watcher.hpp"
-#include "zookeeper/zookeeper.hpp"
-
-using process::Future;
-using process::Process;
-using process::Promise;
-using process::Timer;
-using process::UPID;
-using process::wait; // Necessary on some OS's to disambiguate.
-
-using std::pair;
-using std::string;
-using std::vector;
-
-namespace mesos {
-namespace internal {
-
-
-const Duration ZOOKEEPER_SESSION_TIMEOUT = Seconds(10);
-
-
-MasterDetector::~MasterDetector() {}
-
-
-Try<MasterDetector*> MasterDetector::create(
- const string& master,
- const UPID& pid,
- bool contend,
- bool quiet)
-{
- if (master == "") {
- if (contend) {
- return new BasicMasterDetector(pid);
- } else {
- return Error("Cannot detect master");
- }
- } else if (master.find("zk://") == 0) {
- Try<zookeeper::URL> url = zookeeper::URL::parse(master);
- if (url.isError()) {
- return Error(url.error());
- }
- if (url.get().path == "/") {
- return Error(
- "Expecting a (chroot) path for ZooKeeper ('/' is not supported)");
- }
- return new ZooKeeperMasterDetector(url.get(), pid, contend, quiet);
- } else if (master.find("file://") == 0) {
- const std::string& path = master.substr(7);
- std::ifstream file(path.c_str());
- if (!file.is_open()) {
- return Error("Failed to open file at '" + path + "'");
- }
-
- std::string line;
- getline(file, line);
-
- if (!file) {
- file.close();
- return Error("Failed to read from file at '" + path + "'");
- }
-
- file.close();
-
- return create(line, pid, contend, quiet);
- }
-
- // Okay, try and parse what we got as a PID.
- process::UPID masterPid = master.find("master@") == 0
- ? process::UPID(master)
- : process::UPID("master@" + master);
-
- if (!masterPid) {
- return Error("Cannot parse '" + std::string(masterPid) + "'");
- }
-
- return new BasicMasterDetector(masterPid, pid);
-}
-
-
-void MasterDetector::destroy(MasterDetector *detector)
-{
- if (detector != NULL)
- delete detector;
-}
-
-
-BasicMasterDetector::BasicMasterDetector(const UPID& _master)
- : master(_master)
-{
- // Elect the master.
- NewMasterDetectedMessage message;
- message.set_pid(master);
- process::post(master, message);
-}
-
-
-BasicMasterDetector::BasicMasterDetector(
- const UPID& _master,
- const UPID& pid,
- bool elect)
- : master(_master)
-{
- if (elect) {
- // Elect the master.
- NewMasterDetectedMessage message;
- message.set_pid(master);
- process::post(master, message);
- }
-
- // Tell the pid about the master.
- NewMasterDetectedMessage message;
- message.set_pid(master);
- process::post(pid, message);
-}
-
-
-BasicMasterDetector::BasicMasterDetector(
- const UPID& _master,
- const vector<UPID>& pids,
- bool elect)
- : master(_master)
-{
- if (elect) {
- // Elect the master.
- NewMasterDetectedMessage message;
- message.set_pid(master);
- process::post(master, message);
- }
-
- // Tell each pid about the master.
- foreach (const UPID& pid, pids) {
- NewMasterDetectedMessage message;
- message.set_pid(master);
- process::post(pid, message);
- }
-}
-
-
-BasicMasterDetector::~BasicMasterDetector() {}
-
-
-ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
- const zookeeper::URL& _url,
- const UPID& _pid,
- bool _contend,
- bool quiet)
- : url(_url),
- acl(url.authentication.isSome()
- ? zookeeper::EVERYONE_READ_CREATOR_ALL
- : ZOO_OPEN_ACL_UNSAFE),
- pid(_pid),
- contend(_contend),
- watcher(NULL),
- zk(NULL),
- expire(false),
- timer(),
- currentMasterSeq(),
- currentMasterPID()
-{
- // Set verbosity level for underlying ZooKeeper library logging.
- // TODO(benh): Put this in the C++ API.
- zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
-}
-
-
-ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
-{
- delete zk;
- delete watcher;
-}
-
-
-void ZooKeeperMasterDetectorProcess::initialize()
-{
- // Doing initialization here allows to avoid the race between
- // instantiating the ZooKeeper instance and being spawned ourself.
- watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
- zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
-}
-
-
-int64_t ZooKeeperMasterDetectorProcess::session()
-{
- CHECK_NOTNULL(zk);
- return zk->getSessionId();
-}
-
-
-void ZooKeeperMasterDetectorProcess::connected(bool reconnect)
-{
- if (!reconnect) {
- LOG(INFO) << "Master detector (" << pid << ") connected to ZooKeeper ...";
-
- if (url.authentication.isSome()) {
- const std::string& scheme = url.authentication.get().scheme;
- const std::string& credentials = url.authentication.get().credentials;
- LOG(INFO) << "Authenticating to ZooKeeper using scheme '" << scheme << "'";
- int code = zk->authenticate(scheme, credentials);
- if (code != ZOK) {
- LOG(FATAL) << "Failed to authenticate with ZooKeeper: "
- << zk->message(code);
- }
- }
-
- // Assume the path (chroot) being used does not end with a "/".
- CHECK(url.path.at(url.path.length() - 1) != '/');
-
- // Create znode path (including intermediate znodes) as necessary.
- LOG(INFO) << "Trying to create path '" << url.path << "' in ZooKeeper";
-
- int code = zk->create(url.path, "", acl, 0, NULL, true);
-
- // We fail all non-OK return codes except ZNODEEXISTS (since that
- // means the path we were trying to create exists) and ZNOAUTH
- // (since it's possible that the ACLs on 'dirname(url.path)' don't
- // allow us to create a child znode but we are allowed to create
- // children of 'url.path' itself, which will be determined below
- // if we are contending). Note that it's also possible we got back
- // a ZNONODE because we could not create one of the intermediate
- // znodes (in which case we'll abort in the 'else' below since
- // ZNONODE is non-retryable). TODO(benh): Need to check that we
- // also can put a watch on the children of 'url.path'.
- if (code != ZOK && code != ZNODEEXISTS && code != ZNOAUTH) {
- LOG(FATAL) << "Failed to create '" << url.path
- << "' in ZooKeeper: " << zk->message(code);
- }
-
- if (contend) {
- // We contend with the pid given in constructor.
- string result;
- int code = zk->create(url.path + "/", pid, acl,
- ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
-
- if (code != ZOK) {
- LOG(FATAL) << "Unable to create ephemeral child of '" << url.path
- << "' in ZooKeeper: %s" << zk->message(code);
- }
-
- LOG(INFO) << "Created ephemeral/sequence znode at '" << result << "'";
- }
-
- // Now determine who the master is (it may be us).
- detectMaster();
- } else {
- LOG(INFO) << "Master detector (" << pid << ") reconnected ...";
-
- // Cancel and cleanup the reconnect timer (if necessary).
- if (timer.isSome()) {
- Timer::cancel(timer.get());
- timer = None();
- }
-
- // If we decided to expire the session, make sure we delete the
- // ZooKeeper instance so the session actually expires. We also
- // create a new ZooKeeper instance for clients that want to
- // continue detecting and/or contending (which is likely given
- // that this code is getting executed).
- if (expire) {
- LOG(WARNING) << "Cleaning up after expired ZooKeeper session";
-
- delete CHECK_NOTNULL(zk);
- delete CHECK_NOTNULL(watcher);
-
- watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
- zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
-
- expire = false;
- return;
- }
-
- // We've reconnected and we didn't prematurely expire the session,
- // but the master might have changed, so we should run an
- // election. TODO(benh): Determine if this is really necessary or
- // if the watch set via 'ZooKeeper::getChildren' in 'detectMaster'
- // is sufficient (it should be).
- detectMaster();
- }
-}
-
-
-void ZooKeeperMasterDetectorProcess::reconnecting()
-{
- LOG(INFO) << "Master detector (" << pid << ") lost connection to ZooKeeper, "
- << "attempting to reconnect ...";
-
- // ZooKeeper won't tell us of a session expiration until we
- // reconnect, which could occur much much later than the session was
- // actually expired. This can lead to a prolonged split-brain
- // scenario when network partitions occur. Rather than wait for a
- // reconnection to occur (i.e., a network partition to be repaired)
- // we create a local timer and "expire" our session prematurely if
- // we haven't reconnected within the session expiration time
- // out. Later, when we eventually do reconnect we can force the
- // session to be expired if we decided locally to expire.
- timer = process::delay(
- ZOOKEEPER_SESSION_TIMEOUT, self(), &Self::timedout, zk->getSessionId());
-}
-
-
-void ZooKeeperMasterDetectorProcess::expired()
-{
- LOG(WARNING) << "Master detector (" << pid << ") ZooKeeper session expired!";
-
- // Cancel and cleanup the reconnect timer (if necessary).
- if (timer.isSome()) {
- Timer::cancel(timer.get());
- timer = None();
- }
-
- delete CHECK_NOTNULL(zk);
- delete CHECK_NOTNULL(watcher);
-
- watcher = new ProcessWatcher<ZooKeeperMasterDetectorProcess>(self());
- zk = new ZooKeeper(url.servers, ZOOKEEPER_SESSION_TIMEOUT, watcher);
-
- expire = false;
-}
-
-
-void ZooKeeperMasterDetectorProcess::updated(const string& path)
-{
- // A new master might have showed up and created a sequence
- // identifier or a master may have died, determine who the master is now!
- detectMaster();
-}
-
-
-void ZooKeeperMasterDetectorProcess::created(const string& path)
-{
- LOG(FATAL) << "Unexpected ZooKeeper event (created) for '" << path << "'";
-}
-
-
-void ZooKeeperMasterDetectorProcess::deleted(const string& path)
-{
- LOG(FATAL) << "Unexpected ZooKeeper event (deleted) for '" << path << "'";
-}
-
-
-void ZooKeeperMasterDetectorProcess::timedout(const int64_t& sessionId)
-{
- CHECK_NOTNULL(zk);
- if (timer.isSome() && zk->getSessionId() == sessionId) {
- LOG(WARNING) << "Timed out waiting to reconnect to ZooKeeper "
- << "(sessionId=" << std::hex << sessionId << ")";
- timer = None();
- expire = true;
-
- // TODO(bmahler): We always want to clear the sequence number
- // prior to sending NoMasterDetectedMessage. It might be prudent
- // to use a helper function to enforce this.
- currentMasterSeq = ""; // Clear the master sequence number.
- process::post(pid, NoMasterDetectedMessage());
- }
-}
-
-
-void ZooKeeperMasterDetectorProcess::detectMaster()
-{
- vector<string> results;
-
- int code = zk->getChildren(url.path, true, &results);
-
- if (code != ZOK) {
- if (zk->retryable(code)) {
- // NOTE: We don't expect a ZNONODE here because 'url.path' is always
- // created in the connected() call. Despite that, we don't do a
- // CHECK (code != ZNONODE) just to be safe in case the zk client library
- // does return the code unexpectedly.
- LOG(ERROR) << "Master detector (" << pid << ") failed to get masters: "
- << zk->message(code);
- return; // Try again when we reconnect.
- } else {
- LOG(FATAL) << "Non-retryable ZooKeeper error while getting masters: "
- << zk->message(code);
- }
- } else {
- LOG(INFO) << "Master detector (" << pid << ") found " << results.size()
- << " registered masters";
- }
-
- string masterSeq;
- long min = LONG_MAX;
- foreach (const string& result, results) {
- Try<int> i = numify<int>(result);
- if (i.isError()) {
- LOG(WARNING) << "Unexpected znode at '" << url.path
- << "': " << i.error();
- continue;
- }
- if (i.get() < min) {
- min = i.get();
- masterSeq = result;
- }
- }
-
- // No master present (lost or possibly hasn't come up yet).
- if (masterSeq.empty()) {
- LOG(INFO) << "Master detector (" << pid << ") couldn't find any masters";
- currentMasterSeq = ""; // Clear the master sequence number.
- process::post(pid, NoMasterDetectedMessage());
- } else if (masterSeq != currentMasterSeq) {
- // Okay, let's fetch the master pid from ZooKeeper.
- string result;
- code = zk->get(url.path + "/" + masterSeq, false, &result, NULL);
-
- if (code != ZOK) {
- // This is possible because the master might have failed since
- // the invocation of ZooKeeper::getChildren above.
- // It is fine to not send a NoMasterDetectedMessage here because,
- // 1) If this is due to a connection loss or session expiration,
- // connected() or expired() will be called and the leader detection
- // code (detectMaster()) will be re-tried.
- // 2) If this is due to no masters present (i.e., code == ZNONODE),
- // updated() will be called and the detectMaster() will be re-tried.
- if (zk->retryable(code) || code == ZNONODE) {
- LOG(ERROR) << "Master detector failed to fetch new master pid: "
- << zk->message(code);
- } else {
- LOG(FATAL) << "Non-retryable ZooKeeper error while fetching "
- << "new master pid: " << zk->message(code);
- }
- } else {
- // Now let's parse what we fetched from ZooKeeper.
- LOG(INFO) << "Master detector (" << pid << ") got new master pid: "
- << result;
-
- UPID masterPid = result;
-
- if (masterPid == UPID()) {
- // TODO(benh): Maybe we should try again then!?!? Parsing
- // might have failed because of DNS, and whoever is using the
- // detector might sit "unconnected" indefinitely!
- LOG(ERROR) << "Failed to parse new master pid!";
- currentMasterSeq = ""; // Clear the master sequence number.
- process::post(pid, NoMasterDetectedMessage());
- } else {
- currentMasterSeq = masterSeq;
- currentMasterPID = masterPid;
-
- NewMasterDetectedMessage message;
- message.set_pid(currentMasterPID);
- process::post(pid, message);
- }
- }
- }
-}
-
-
-ZooKeeperMasterDetector::ZooKeeperMasterDetector(
- const zookeeper::URL& url,
- const UPID& pid,
- bool contend,
- bool quiet)
-{
- process = new ZooKeeperMasterDetectorProcess(url, pid, contend, quiet);
- spawn(process);
-}
-
-
-ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
-{
- terminate(process);
- wait(process);
- delete process;
-}
-
-
-Future<int64_t> ZooKeeperMasterDetector::session()
-{
- return dispatch(process, &ZooKeeperMasterDetectorProcess::session);
-}
-
-
-// A simple "listener" for doing one-time detections to support the
-// 'detect' function (see below). Note that this can't be
-// declared/defined inside of the 'detect' function because until
-// C++11 we can't have template arguments be local types (which
-// 'Listener' would be).
-class Listener : public ProtobufProcess<Listener>
-{
-public:
- Future<UPID> future() { return promise.future(); }
-
-protected:
- virtual void initialize()
- {
- // Stop listening if no one cares.
- void(*terminate)(const UPID&, bool) = process::terminate;
- promise.future().onDiscarded(lambda::bind(terminate, self(), true));
-
- install<NewMasterDetectedMessage>(
- &Listener::newMasterDetected,
- &NewMasterDetectedMessage::pid);
- }
-
- void newMasterDetected(const UPID& pid)
- {
- promise.set(pid);
- process::terminate(self());
- }
-
-private:
- Promise<UPID> promise;
-};
-
-
-Future<UPID> detect(const string& master, bool quiet)
-{
- Listener* listener = new Listener();
-
- // Save the future before we spawn.
- Future<UPID> future = listener->future();
-
- process::spawn(listener, true); // Let the GC clean up the Listener.
-
- Try<MasterDetector*> detector =
- MasterDetector::create(master, listener->self(), false, quiet);
-
- if (detector.isError()) {
- process::terminate(listener);
- return Future<UPID>::failed(
- "Failed to create a master detector: " + detector.error());
- }
-
- return future;
-}
-
-
-} // namespace internal {
-} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/detector/detector.hpp
----------------------------------------------------------------------
diff --git a/src/detector/detector.hpp b/src/detector/detector.hpp
deleted file mode 100644
index 3aaebfe..0000000
--- a/src/detector/detector.hpp
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __MASTER_DETECTOR_HPP__
-#define __MASTER_DETECTOR_HPP__
-
-#include <string>
-#include <iostream>
-#include <unistd.h>
-#include <climits>
-#include <cstdlib>
-
-#include <process/future.hpp>
-#include <process/pid.hpp>
-#include <process/process.hpp>
-#include <process/timer.hpp>
-
-#include <stout/try.hpp>
-
-#include "zookeeper/authentication.hpp"
-#include "zookeeper/url.hpp"
-#include "zookeeper/zookeeper.hpp"
-
-class Watcher;
-class ZooKeeper;
-
-namespace mesos {
-namespace internal {
-
-// Returns the current master PID (waiting until a master is elected).
-process::Future<process::UPID> detect(
- const std::string& master,
- bool quiet = true);
-
-
-// Forward declarations.
-class ZooKeeperMasterDetectorProcess;
-
-/**
- * Implements functionality for:
- * a) detecting masters
- * b) contending to be a master
- */
-class MasterDetector
-{
-public:
- virtual ~MasterDetector() = 0;
-
- /**
- * Creates a master detector that, given the specified master (which
- * may be a ZooKeeper URL), knows how to connect to the master, or
- * contend to be a master. The master detector sends messages to the
- * specified pid when a new master is elected, a master is lost,
- * etc.
- *
- * @param master string possibly containing zk:// or file://
- * @param pid libprocess pid to both receive our messages and be
- * used if we should contend
- * @param contend true if should contend to be master
- * @param quite true if should limit log output
- * @return instance of MasterDetector
- */
- static Try<MasterDetector*> create(const std::string& master,
- const process::UPID& pid,
- bool contend = false,
- bool quiet = true);
-
- /**
- * Cleans up and deallocates the detector.
- */
- static void destroy(MasterDetector* detector);
-};
-
-
-class BasicMasterDetector : public MasterDetector
-{
-public:
- /**
- * Create a new master detector where the specified pid contends to
- * be the master and gets elected by default.
- *
- * @param master libprocess pid to send messages/updates and be the
- * master
- */
- BasicMasterDetector(const process::UPID& master);
-
- /**
- * Create a new master detector where the 'master' pid is
- * the master (no contending).
- *
- * @param master libprocess pid to send messages/updates and be the
- * master
- * @param pid/pids libprocess pids to send messages/updates to regarding
- * the master
- * @param elect if true then contend and elect the specified master
- */
- BasicMasterDetector(const process::UPID& master,
- const process::UPID& pid,
- bool elect = false);
-
- BasicMasterDetector(const process::UPID& master,
- const std::vector<process::UPID>& pids,
- bool elect = false);
-
- virtual ~BasicMasterDetector();
-
-private:
- const process::UPID master;
-};
-
-
-class ZooKeeperMasterDetector : public MasterDetector
-{
-public:
- /**
- * Uses ZooKeeper for both detecting masters and contending to be a
- * master.
- *
- * @param url znode path of the master
- * @param pid libprocess pid to send messages/updates to (and to
- * use for contending to be a master)
- * @param contend true if should contend to be master and false otherwise (not
- * needed for slaves and frameworks)
- * @param quiet verbosity logging level for underlying ZooKeeper library
- */
- ZooKeeperMasterDetector(const zookeeper::URL& url,
- const process::UPID& pid,
- bool contend,
- bool quiet);
-
- virtual ~ZooKeeperMasterDetector();
-
- /**
- * Returns the ZooKeeper session ID associated with this detector.
- */
- process::Future<int64_t> session();
-
- // Visible for testing.
- ZooKeeperMasterDetectorProcess* process;
-};
-
-
-// TODO(benh): Make this value configurable via flags and verify that
-// it is always LESS THAN the slave heartbeat timeout.
-extern const Duration ZOOKEEPER_SESSION_TIMEOUT;
-
-
-class ZooKeeperMasterDetectorProcess
- : public process::Process<ZooKeeperMasterDetectorProcess>
-{
-public:
- ZooKeeperMasterDetectorProcess(
- const zookeeper::URL& url,
- const process::UPID& pid,
- bool contend,
- bool quiet);
-
- virtual ~ZooKeeperMasterDetectorProcess();
-
- virtual void initialize();
-
- // ZooKeeperMasterDetector implementation.
- int64_t session();
-
- // ZooKeeper events.
- void connected(bool reconnect);
- void reconnecting();
- void expired();
- void updated(const std::string& path);
- void created(const std::string& path);
- void deleted(const std::string& path);
-
-private:
- // Handles reconnecting "timeouts" by prematurely expiring a session
- // (only used for contending instances). TODO(benh): Remove 'const
- // &' after fixing libprocess.
- void timedout(const int64_t& sessionId);
-
- // Attempts to detect a master.
- void detectMaster();
-
- const zookeeper::URL url;
- const ACL_vector acl;
-
- const process::UPID pid;
- bool contend;
-
- Watcher* watcher;
- ZooKeeper* zk;
-
- bool expire;
- Option<process::Timer> timer;
-
- std::string currentMasterSeq;
- process::UPID currentMasterPID;
-};
-
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __MASTER_DETECTOR_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/local/local.cpp
----------------------------------------------------------------------
diff --git a/src/local/local.cpp b/src/local/local.cpp
index 180756a..83a7f91 100644
--- a/src/local/local.cpp
+++ b/src/local/local.cpp
@@ -27,12 +27,12 @@
#include "local.hpp"
-#include "detector/detector.hpp"
-
#include "logging/flags.hpp"
#include "logging/logging.hpp"
#include "master/allocator.hpp"
+#include "master/contender.hpp"
+#include "master/detector.hpp"
#include "master/drf_sorter.hpp"
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
@@ -79,7 +79,8 @@ static state::protobuf::State* state = NULL;
static Registrar* registrar = NULL;
static Master* master = NULL;
static map<Isolator*, Slave*> slaves;
-static MasterDetector* detector = NULL;
+static StandaloneMasterDetector* detector = NULL;
+static MasterContender* contender = NULL;
static Files* files = NULL;
@@ -126,7 +127,11 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
state = new state::protobuf::State(storage);
registrar = new Registrar(state);
- master = new Master(_allocator, registrar, files, flags);
+ contender = new StandaloneMasterContender();
+ detector = new StandaloneMasterDetector();
+ master =
+ new Master(_allocator, registrar, files, contender, detector, flags);
+ detector->appoint(master->self());
}
PID<Master> pid = process::spawn(master);
@@ -147,13 +152,13 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator)
// Use a different work directory for each slave.
flags.work_dir = path::join(flags.work_dir, stringify(i));
- Slave* slave = new Slave(flags, true, isolator, files);
+ // NOTE: At this point detector is already initialized by the
+ // Master.
+ Slave* slave = new Slave(flags, true, detector, isolator, files);
slaves[isolator] = slave;
pids.push_back(process::spawn(slave));
}
- detector = new BasicMasterDetector(pid, pids, true);
-
return pid;
}
@@ -186,6 +191,9 @@ void shutdown()
delete detector;
detector = NULL;
+ delete contender;
+ contender = NULL;
+
delete files;
files = NULL;
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/local/main.cpp
----------------------------------------------------------------------
diff --git a/src/local/main.cpp b/src/local/main.cpp
index 5995c53..da431b7 100644
--- a/src/local/main.cpp
+++ b/src/local/main.cpp
@@ -22,8 +22,6 @@
#include <stout/os.hpp>
#include <stout/stringify.hpp>
-#include "detector/detector.hpp"
-
#include "local/flags.hpp"
#include "local/local.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index deb5c97..218906a 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -298,7 +298,7 @@ Future<Response> Master::Http::redirect(const Request& request)
LOG(INFO) << "HTTP request for '" << request.path << "'";
// If there's no leader, redirect to this master's base url.
- UPID pid = master.leader != UPID() ? master.leader : master.self();
+ UPID pid = master.leader.isSome() ? master.leader.get() : master.self();
Try<string> hostname = net::getHostname(pid.ip);
if (hostname.isError()) {
@@ -316,7 +316,7 @@ Future<Response> Master::Http::stats(const Request& request)
JSON::Object object;
object.values["uptime"] = (Clock::now() - master.startTime).secs();
- object.values["elected"] = master.elected; // Note: using int not bool.
+ object.values["elected"] = master.elected(); // Note: using int not bool.
object.values["total_schedulers"] = master.frameworks.size();
object.values["active_schedulers"] = master.getActiveFrameworks().size();
object.values["activated_slaves"] = master.slaves.size();
@@ -393,9 +393,8 @@ Future<Response> Master::Http::state(const Request& request)
object.values["cluster"] = master.flags.cluster.get();
}
- // TODO(benh): Use an Option for the leader PID.
- if (master.leader != UPID()) {
- object.values["leader"] = string(master.leader);
+ if (master.leader.isSome()) {
+ object.values["leader"] = string(master.leader.get());
}
if (master.flags.log_dir.isSome()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/master/main.cpp
----------------------------------------------------------------------
diff --git a/src/master/main.cpp b/src/master/main.cpp
index 45caf9d..60c86b3 100644
--- a/src/master/main.cpp
+++ b/src/master/main.cpp
@@ -30,12 +30,12 @@
#include "common/build.hpp"
-#include "detector/detector.hpp"
-
#include "logging/flags.hpp"
#include "logging/logging.hpp"
#include "master/allocator.hpp"
+#include "master/contender.hpp"
+#include "master/detector.hpp"
#include "master/drf_sorter.hpp"
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
@@ -46,8 +46,11 @@
#include "state/storage.hpp"
+#include "zookeeper/detector.hpp"
+
using namespace mesos::internal;
using namespace mesos::internal::master;
+using namespace zookeeper;
using mesos::MasterInfo;
@@ -146,13 +149,28 @@ int main(int argc, char** argv)
Registrar* registrar = new Registrar(state);
Files files;
- Master* master = new Master(allocator, registrar, &files, flags);
- process::spawn(master);
- Try<MasterDetector*> detector =
- MasterDetector::create(zk, master->self(), true, flags.quiet);
+ MasterContender* contender;
+ MasterDetector* detector;
+
+ Try<MasterContender*> contender_ = MasterContender::create(zk);
+ CHECK_SOME(contender_) << "Failed to create a master contender";
+ contender = contender_.get();
+
+ Try<MasterDetector*> detector_ = MasterDetector::create(zk);
+ CHECK_SOME(detector_) << "Failed to create a master detector";
+ detector = detector_.get();
- CHECK_SOME(detector) << "Failed to create a master detector";
+ Master* master = new Master(
+ allocator, registrar, &files, contender, detector, flags);
+
+ if (zk == "") {
+ // It means we are using the standalone detector so we need to
+ // appoint this Master as the leader.
+ dynamic_cast<StandaloneMasterDetector*>(detector)->appoint(master->self());
+ }
+
+ process::spawn(master);
process::wait(master->self());
delete master;
@@ -163,7 +181,8 @@ int main(int argc, char** argv)
delete state;
delete storage;
- MasterDetector::destroy(detector.get());
+ delete contender;
+ delete detector;
return 0;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index abab6ce..f65b344 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -189,27 +189,19 @@ private:
Master::Master(
Allocator* _allocator,
Registrar* _registrar,
- Files* _files)
- : ProcessBase("master"),
- http(*this),
- flags(),
- allocator(_allocator),
- registrar(_registrar),
- files(_files),
- completedFrameworks(MAX_COMPLETED_FRAMEWORKS) {}
-
-
-Master::Master(
- Allocator* _allocator,
- Registrar* _registrar,
Files* _files,
+ MasterContender* _contender,
+ MasterDetector* _detector,
const Flags& _flags)
: ProcessBase("master"),
http(*this),
flags(_flags),
+ leader(None()),
allocator(_allocator),
registrar(_registrar),
files(_files),
+ contender(_contender),
+ detector(_detector),
completedFrameworks(MAX_COMPLETED_FRAMEWORKS) {}
@@ -294,7 +286,6 @@ void Master::initialize()
// The master ID is currently comprised of the current date, the IP
// address and port from self() and the OS PID.
-
Try<string> id =
strings::format("%s-%u-%u-%d", DateUtils::currentDate(),
self().ip, self().port, getpid());
@@ -410,8 +401,6 @@ void Master::initialize()
whitelistWatcher = new WhitelistWatcher(flags.whitelist, allocator);
spawn(whitelistWatcher);
- elected = false;
-
nextFrameworkId = 0;
nextSlaveId = 0;
nextOfferId = 0;
@@ -436,13 +425,6 @@ void Master::initialize()
&Master::submitScheduler,
&SubmitSchedulerRequest::name);
- install<NewMasterDetectedMessage>(
- &Master::newMasterDetected,
- &NewMasterDetectedMessage::pid);
-
- install<NoMasterDetectedMessage>(
- &Master::noMasterDetected);
-
install<RegisterFrameworkMessage>(
&Master::registerFramework,
&RegisterFrameworkMessage::framework);
@@ -558,6 +540,12 @@ void Master::initialize()
.onAny(defer(self(), &Self::fileAttached, lambda::_1, log.get()));
}
}
+
+ contender->initialize(self());
+
+ // Start contending to be a leading master.
+ contender->contend()
+ .onAny(defer(self(), &Master::contended, lambda::_1));
}
@@ -692,34 +680,80 @@ void Master::submitScheduler(const string& name)
}
-void Master::newMasterDetected(const UPID& pid)
+void Master::contended(const Future<Future<Nothing> >& _contended)
{
- // Check and see if we are (1) still waiting to be the elected
- // master, (2) newly elected master, (3) no longer elected master,
- // or (4) still elected master.
-
- leader = pid;
-
- if (leader != self() && !elected) {
- LOG(INFO) << "Waiting to be master!";
- } else if (leader == self() && !elected) {
- LOG(INFO) << "Elected as master!";
- elected = true;
- } else if (leader != self() && elected) {
- LOG(FATAL) << "No longer elected master ... committing suicide!";
- } else if (leader == self() && elected) {
- LOG(INFO) << "Still acting as master!";
+ if (_contended.isFailed()) {
+ CHECK(!elected()) << "Failed to contend so we should not be elected";
+ LOG(ERROR) << "Failed to contend when not elected: "
+ << _contended.failure() << "; contend again...";
+ contender->contend()
+ .onAny(defer(self(), &Master::contended, lambda::_1));
+ return;
+ }
+
+ CHECK(_contended.isReady()) <<
+ "Not expecting MasterContender to discard this future";
+
+ // Now that we know we have our candidacy registered, we start
+ // detecting who is the leader.
+ detector->detect()
+ .onAny(defer(self(), &Master::detected, lambda::_1));
+
+ // Watch for candidacy change.
+ _contended.get()
+ .onAny(defer(self(), &Master::lostCandidacy, lambda::_1));
+}
+
+
+void Master::lostCandidacy(const Future<Nothing>& lost)
+{
+ CHECK(!lost.isDiscarded())
+ << "Not expecting MasterContender to discard this future";
+
+ if (lost.isFailed()) {
+ LOG(ERROR) << "Failed to watch for candidacy: " << lost.failure();
+ }
+
+ if (elected()) {
+ EXIT(1) << "Lost leadership... committing suicide!";
+ } else {
+ LOG(INFO) << "Lost candidacy as a follower... Contend again";
+ contender->contend()
+ .onAny(defer(self(), &Master::contended, lambda::_1));
}
}
-void Master::noMasterDetected()
+void Master::detected(const Future<Result<UPID> >& _leader)
{
- if (elected) {
- LOG(FATAL) << "No longer elected master ... committing suicide!";
+ CHECK(_leader.isReady())
+ << "Not expecting MasterContender to fail or discard this future";
+
+ bool wasElected = elected();
+ leader = _leader.get();
+
+ if (leader.isError()) {
+ if (wasElected) {
+ EXIT(1) << "Failed to detect the leading master while elected: "
+ << leader.error() << "; committing suicide!";
+ } else {
+ LOG(ERROR) << "Failed to detect the leading master when not elected: "
+ << leader.error();
+ }
} else {
- LOG(FATAL) << "No master detected (?) ... committing suicide!";
+ LOG(INFO) << "The newly elected leader is "
+ << (leader.isSome() ? leader.get() : "NONE");
+
+ if (!wasElected && elected()) {
+ LOG(INFO) << "Elected as the leading master!";
+ } else if (wasElected && !elected()) {
+ EXIT(1) << "Lost leadership... committing suicide!";
+ }
}
+
+ // Keep detecting.
+ detector->detect(leader)
+ .onAny(defer(self(), &Master::detected, lambda::_1));
}
@@ -736,7 +770,7 @@ void Master::registerFramework(
return;
}
- if (!elected) {
+ if (!elected()) {
LOG(WARNING) << "Ignoring register framework message since not elected yet";
return;
}
@@ -815,7 +849,7 @@ void Master::reregisterFramework(
return;
}
- if (!elected) {
+ if (!elected()) {
LOG(WARNING) << "Ignoring re-register framework message since "
<< "not elected yet";
return;
@@ -1197,7 +1231,7 @@ void Master::schedulerMessage(const SlaveID& slaveId,
void Master::registerSlave(const UPID& from, const SlaveInfo& slaveInfo)
{
- if (!elected) {
+ if (!elected()) {
LOG(WARNING) << "Ignoring register slave message from "
<< slaveInfo.hostname() << " since not elected yet";
return;
@@ -1243,7 +1277,7 @@ void Master::reregisterSlave(
const vector<ExecutorInfo>& executorInfos,
const vector<Task>& tasks)
{
- if (!elected) {
+ if (!elected()) {
LOG(WARNING) << "Ignoring re-register slave message from "
<< slaveInfo.hostname() << " since not elected yet";
return;
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e377af8..c86c1f1 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -46,6 +46,8 @@
#include "files/files.hpp"
#include "master/constants.hpp"
+#include "master/contender.hpp"
+#include "master/detector.hpp"
#include "master/flags.hpp"
#include "master/registrar.hpp"
@@ -85,21 +87,15 @@ class Master : public ProtobufProcess<Master>
public:
Master(allocator::Allocator* allocator,
Registrar* registrar,
- Files* files);
-
- Master(allocator::Allocator* allocator,
- Registrar* registrar,
Files* files,
- const Flags& flags);
+ MasterContender* contender,
+ MasterDetector* detector,
+ const Flags& flags = Flags());
virtual ~Master();
void submitScheduler(
const std::string& name);
- void newMasterDetected(
- const UPID& pid);
- void noMasterDetected();
- void masterDetectionFailure();
void registerFramework(
const process::UPID& from,
const FrameworkInfo& frameworkInfo);
@@ -195,6 +191,15 @@ protected:
// Return connected frameworks that are not in the process of being removed
std::vector<Framework*> getActiveFrameworks() const;
+ // Invoked when the contender has entered the contest.
+ void contended(const Future<Future<Nothing> >& contended);
+
+ // Invoked when the contender has lost the candidacy.
+ void lostCandidacy(const Future<Nothing>& lost);
+
+ // Invoked when there is a newly elected leading master.
+ void detected(const Future<Result<UPID> >& pid);
+
// Process a launch tasks request (for a non-cancelled offer) by
// launching the desired tasks (if the offer contains a valid set of
// tasks) and reporting any unused resources to the allocator.
@@ -301,15 +306,19 @@ private:
const Flags flags;
- UPID leader; // Current leading master.
+ Result<UPID> leader; // Current leading master.
- bool elected;
+ // Whether we are the current leading master.
+ bool elected() const { return leader.isSome() && leader.get() == self(); }
allocator::Allocator* allocator;
WhitelistWatcher* whitelistWatcher;
Registrar* registrar;
Files* files;
+ MasterContender* contender;
+ MasterDetector* detector;
+
MasterInfo info;
hashmap<FrameworkID, Framework*> frameworks;
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 71f68a0..1f264d5 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -332,17 +332,6 @@ message FrameworkExpiredMessage {
message ShutdownMessage {}
-// Master detector messages.
-
-
-message NoMasterDetectedMessage {}
-
-
-message NewMasterDetectedMessage {
- required string pid = 2;
-}
-
-
message AuthenticateMessage {
required string pid = 1; // PID that needs to be authenticated.
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 3abe72f..51f95bb 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -34,11 +34,14 @@
#include <process/defer.hpp>
#include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/id.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
+#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/flags.hpp>
@@ -55,10 +58,12 @@
#include "common/lock.hpp"
#include "common/type_utils.hpp"
-#include "detector/detector.hpp"
+#include "master/detector.hpp"
#include "local/local.hpp"
+#include "master/detector.hpp"
+
#include "logging/flags.hpp"
#include "logging/logging.hpp"
@@ -66,6 +71,7 @@
using namespace mesos;
using namespace mesos::internal;
+using namespace mesos::internal::master;
using namespace process;
@@ -92,22 +98,20 @@ public:
Scheduler* _scheduler,
const FrameworkInfo& _framework,
const Option<Credential>& _credential,
- const string& _url,
+ MasterDetector* _detector,
pthread_mutex_t* _mutex,
pthread_cond_t* _cond)
: ProcessBase(ID::generate("scheduler")),
driver(_driver),
scheduler(_scheduler),
framework(_framework),
- url(_url),
mutex(_mutex),
cond(_cond),
failover(_framework.has_id() && !framework.id().value().empty()),
- master(UPID()),
+ master(None()),
connected(false),
aborted(false),
- // TODO(benh): Add Try().
- detector(Error("uninitialized")),
+ detector(_detector),
credential(_credential),
authenticatee(NULL),
authenticating(None()),
@@ -123,24 +127,8 @@ public:
protected:
virtual void initialize()
{
- // The master detector needs to be created after this process is
- // running so that the "master detected" message is not dropped.
// TODO(benh): Get access to flags so that we can decide whether
// or not to make ZooKeeper verbose.
- detector = MasterDetector::create(url, self(), false);
- if (detector.isError()) {
- driver->abort();
- scheduler->error(driver, detector.error());
- return;
- }
-
- install<NewMasterDetectedMessage>(
- &SchedulerProcess::newMasterDetected,
- &NewMasterDetectedMessage::pid);
-
- install<NoMasterDetectedMessage>(
- &SchedulerProcess::noMasterDetected);
-
install<FrameworkRegisteredMessage>(
&SchedulerProcess::registered,
&FrameworkRegisteredMessage::framework_id,
@@ -179,31 +167,30 @@ protected:
install<FrameworkErrorMessage>(
&SchedulerProcess::error,
&FrameworkErrorMessage::message);
- }
- virtual void finalize()
- {
- if (detector.isSome()) {
- MasterDetector::destroy(detector.get());
- }
+ // Start detecting masters.
+ detector->detect(master)
+ .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1));
}
- void newMasterDetected(const UPID& pid)
+ void detected(const Future<Result<UPID> >& pid)
{
if (aborted) {
- VLOG(1) << "Ignoring new master detected message because "
- << "the driver is aborted!";
+ VLOG(1) << "Ignoring the master change because the driver is aborted!";
return;
}
- VLOG(1) << "New master at " << pid;
-
- master = pid;
- link(master);
+ // Not expecting MasterDetector to discard or fail the future.
+ CHECK(pid.isReady());
+ master = pid.get();
- // If master failed over, inform the scheduler about the
- // disconnection.
if (connected) {
+ // There are three cases here:
+ // 1. The master failed.
+ // 2. The master failed over to a new master.
+ // 3. The master failed over to the same master.
+ // In any case, we will reconnect (possibly immediately), so we
+ // must notify schedulers of the disconnection.
Stopwatch stopwatch;
if (FLAGS_v >= 1) {
stopwatch.start();
@@ -216,47 +203,35 @@ protected:
connected = false;
- if (credential.isSome()) {
- // Authenticate with the master.
- authenticate();
- } else {
- // Proceed with registration without authentication.
- LOG(INFO) << "No credentials provided."
- << " Attempting to register without authentication";
-
- doReliableRegistration();
- }
- }
+ if (master.isSome()) {
+ VLOG(1) << "New master detected at " << master.get();
+ link(master.get());
- void noMasterDetected()
- {
- if (aborted) {
- VLOG(1) << "Ignoring no master detected message because "
- << "the driver is aborted!";
- return;
- }
-
- VLOG(1) << "No master detected, waiting for another master";
+ if (credential.isSome()) {
+ // Authenticate with the master.
+ authenticate();
+ } else {
+ // Proceed with registration without authentication.
+ LOG(INFO) << "No credentials provided."
+ << " Attempting to register without authentication";
- // Inform the scheduler about the disconnection if the driver
- // was previously registered with the master.
- if (connected) {
- Stopwatch stopwatch;
- if (FLAGS_v >= 1) {
- stopwatch.start();
+ doReliableRegistration();
}
-
- scheduler->disconnected(driver);
-
- VLOG(1) << "Scheduler::disconnected took " << stopwatch.elapsed();
+ } else if (master.isNone()) {
+ // In this case, we don't actually invoke Scheduler::error
+ // since we might get reconnected to a master imminently.
+ VLOG(1) << "No master detected";
+ } else {
+ LOG(ERROR) << "Failed to detect master: " << master.error();
}
- // In this case, we don't actually invoke Scheduler::error
- // since we might get reconnected to a master imminently.
- connected = false;
- master = UPID();
+ // Keep detecting masters.
+ LOG(INFO) << "Detecting new master";
+ detector->detect(master)
+ .onAny(defer(self(), &SchedulerProcess::detected, lambda::_1));
}
+
void authenticate()
{
if (aborted) {
@@ -266,7 +241,7 @@ protected:
authenticated = false;
- if (!master) {
+ if (!master.isSome()) {
return;
}
@@ -282,7 +257,7 @@ protected:
return;
}
- LOG(INFO) << "Authenticating with master " << master;
+ LOG(INFO) << "Authenticating with master " << master.get();
CHECK_SOME(credential);
@@ -302,7 +277,7 @@ protected:
// 'Authenticatee'.
// --> '~Authenticatee()' is invoked by 'AuthenticateeProcess'.
// TODO(vinod): Consider using 'Shared' to 'Owned' upgrade.
- authenticating = authenticatee->authenticate(master)
+ authenticating = authenticatee->authenticate(master.get())
.onAny(defer(self(), &Self::_authenticate));
delay(Seconds(5),
@@ -324,9 +299,21 @@ protected:
CHECK_SOME(authenticating);
const Future<bool>& future = authenticating.get();
+ if (!master.isSome()) {
+ LOG(INFO) << "Ignoring _authenticate because the master is lost";
+ authenticating = None();
+ // Set it to false because we do not want further retries until
+ // a new master is detected.
+ // We obviously do not need to reauthenticate either even if
+ // 'reauthenticate' is currently true because the master is
+ // lost.
+ reauthenticate = false;
+ return;
+ }
+
if (reauthenticate || !future.isReady()) {
LOG(WARNING)
- << "Failed to authenticate with master " << master << ": "
+ << "Failed to authenticate with master " << master.get() << ": "
<< (reauthenticate ? "master changed" :
(future.isFailed() ? future.failure() : "future discarded"));
@@ -339,12 +326,12 @@ protected:
}
if (!future.get()) {
- LOG(ERROR) << "Master " << master << " refused authentication";
+ LOG(ERROR) << "Master " << master.get() << " refused authentication";
error("Master refused authentication");
return;
}
- LOG(INFO) << "Successfully authenticated with master " << master;
+ LOG(INFO) << "Successfully authenticated with master " << master.get();
authenticated = true;
authenticating = None();
@@ -433,7 +420,7 @@ protected:
void doReliableRegistration()
{
- if (connected || !master) {
+ if (connected || !master.isSome()) {
return;
}
@@ -445,13 +432,13 @@ protected:
// Touched for the very first time.
RegisterFrameworkMessage message;
message.mutable_framework()->MergeFrom(framework);
- send(master, message);
+ send(master.get(), message);
} else {
// Not the first time, or failing over.
ReregisterFrameworkMessage message;
message.mutable_framework()->MergeFrom(framework);
message.set_failover(failover);
- send(master, message);
+ send(master.get(), message);
}
delay(Seconds(1), self(), &Self::doReliableRegistration);
@@ -582,10 +569,10 @@ protected:
return;
}
- if (from != master) {
+ if (!master.isSome() || from != master.get()) {
LOG(WARNING) << "Ignoring lost slave message from " << from
<< " because it is not from the registered master ("
- << master << ")";
+ << (master.isSome() ? master.get() : "NONE/ERROR") << ")";
return;
}
@@ -657,7 +644,8 @@ protected:
if (connected && !failover) {
UnregisterFrameworkMessage message;
message.mutable_framework_id()->MergeFrom(framework.id());
- send(master, message);
+ CHECK_SOME(master);
+ send(master.get(), message);
}
Lock lock(mutex);
@@ -683,7 +671,8 @@ protected:
DeactivateFrameworkMessage message;
message.mutable_framework_id()->MergeFrom(framework.id());
- send(master, message);
+ CHECK_SOME(master);
+ send(master.get(), message);
Lock lock(mutex);
pthread_cond_signal(cond);
@@ -699,7 +688,8 @@ protected:
KillTaskMessage message;
message.mutable_framework_id()->MergeFrom(framework.id());
message.mutable_task_id()->MergeFrom(taskId);
- send(master, message);
+ CHECK_SOME(master);
+ send(master.get(), message);
}
void requestResources(const vector<Request>& requests)
@@ -714,7 +704,8 @@ protected:
foreach (const Request& request, requests) {
message.add_requests()->MergeFrom(request);
}
- send(master, message);
+ CHECK_SOME(master);
+ send(master.get(), message);
}
void launchTasks(const OfferID& offerId,
@@ -822,7 +813,8 @@ protected:
// Remove the offer since we saved all the PIDs we might use.
savedOffers.erase(offerId);
- send(master, message);
+ CHECK_SOME(master);
+ send(master.get(), message);
}
void reviveOffers()
@@ -834,7 +826,8 @@ protected:
ReviveOffersMessage message;
message.mutable_framework_id()->MergeFrom(framework.id());
- send(master, message);
+ CHECK_SOME(master);
+ send(master.get(), message);
}
void sendFrameworkMessage(const ExecutorID& executorId,
@@ -874,7 +867,8 @@ protected:
message.mutable_framework_id()->MergeFrom(framework.id());
message.mutable_executor_id()->MergeFrom(executorId);
message.set_data(data);
- send(master, message);
+ CHECK_SOME(master);
+ send(master.get(), message);
}
}
@@ -892,7 +886,8 @@ protected:
message.add_statuses()->MergeFrom(status);
}
- send(master, message);
+ CHECK_SOME(master);
+ send(master.get(), message);
}
private:
@@ -901,16 +896,15 @@ private:
MesosSchedulerDriver* driver;
Scheduler* scheduler;
FrameworkInfo framework;
- string url; // URL for the master (e.g., zk://, file://, etc).
pthread_mutex_t* mutex;
pthread_cond_t* cond;
bool failover;
- UPID master;
+ Result<UPID> master;
volatile bool connected; // Flag to indicate if framework is registered.
volatile bool aborted; // Flag to indicate if the driver is aborted.
- Try<MasterDetector*> detector;
+ MasterDetector* detector;
hashmap<OfferID, hashmap<SlaveID, UPID> > savedOffers;
hashmap<SlaveID, UPID> savedSlavePids;
@@ -955,7 +949,9 @@ MesosSchedulerDriver::MesosSchedulerDriver(
framework(_framework),
master(_master),
process(NULL),
- status(DRIVER_NOT_STARTED)
+ status(DRIVER_NOT_STARTED),
+ credential(NULL),
+ detector(NULL)
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
@@ -1007,13 +1003,7 @@ MesosSchedulerDriver::MesosSchedulerDriver(
CHECK(process == NULL);
- if (pid.isSome()) {
- process = new SchedulerProcess(
- this, scheduler, framework, None(), pid.get(), &mutex, &cond);
- } else {
- process = new SchedulerProcess(
- this, scheduler, framework, None(), master, &mutex, &cond);
- }
+ url = pid.isSome() ? static_cast<string>(pid.get()) : master;
}
@@ -1023,12 +1013,14 @@ MesosSchedulerDriver::MesosSchedulerDriver(
Scheduler* _scheduler,
const FrameworkInfo& _framework,
const string& _master,
- const Credential& credential)
+ const Credential& _credential)
: scheduler(_scheduler),
framework(_framework),
master(_master),
process(NULL),
- status(DRIVER_NOT_STARTED)
+ status(DRIVER_NOT_STARTED),
+ credential(new Credential(_credential)),
+ detector(NULL)
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
@@ -1080,13 +1072,7 @@ MesosSchedulerDriver::MesosSchedulerDriver(
CHECK(process == NULL);
- if (pid.isSome()) {
- process = new SchedulerProcess(
- this, scheduler, framework, credential, pid.get(), &mutex, &cond);
- } else {
- process = new SchedulerProcess(
- this, scheduler, framework, credential, master, &mutex, &cond);
- }
+ url = pid.isSome() ? static_cast<string>(pid.get()) : master;
}
@@ -1116,6 +1102,10 @@ MesosSchedulerDriver::~MesosSchedulerDriver()
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
+ if (detector != NULL) {
+ delete detector;
+ }
+
// Check and see if we need to shutdown a local cluster.
if (master == "local" || master == "localquiet") {
local::shutdown();
@@ -1131,7 +1121,31 @@ Status MesosSchedulerDriver::start()
return status;
}
- CHECK(process != NULL);
+ if (detector == NULL) {
+ Try<MasterDetector*> detector_ = MasterDetector::create(url);
+
+ if (detector_.isError()) {
+ status = DRIVER_ABORTED;
+ string message = "Failed to create a master detector for '" +
+ master + "': " + detector_.error();
+ scheduler->error(this, message);
+ return status;
+ }
+
+ // Save the detector so we can delete it later.
+ detector = detector_.get();
+ }
+
+ CHECK(process == NULL);
+
+ if (credential == NULL) {
+ process = new SchedulerProcess(
+ this, scheduler, framework, None(), detector, &mutex, &cond);
+ } else {
+ const Credential& cred = *credential;
+ process = new SchedulerProcess(
+ this, scheduler, framework, cred, detector, &mutex, &cond);
+ }
spawn(process);
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 62fbb37..2f0bd8d 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -288,7 +288,7 @@ Future<Response> Slave::Http::stats(const Request& request)
object.values["lost_tasks"] = slave.stats.tasks[TASK_LOST];
object.values["valid_status_updates"] = slave.stats.validStatusUpdates;
object.values["invalid_status_updates"] = slave.stats.invalidStatusUpdates;
- object.values["registered"] = slave.master ? "1" : "0";
+ object.values["registered"] = slave.master.isSome() ? "1" : "0";
object.values["recovery_errors"] = slave.recoveryErrors;
return OK(object, request.query.get("jsonp"));
@@ -317,7 +317,7 @@ Future<Response> Slave::Http::state(const Request& request)
object.values["failed_tasks"] = slave.stats.tasks[TASK_FAILED];
object.values["lost_tasks"] = slave.stats.tasks[TASK_LOST];
- Try<string> masterHostname = net::getHostname(slave.master.ip);
+ Try<string> masterHostname = net::getHostname(slave.master.get().ip);
if (masterHostname.isSome()) {
object.values["master_hostname"] = masterHostname.get();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/slave/main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/main.cpp b/src/slave/main.cpp
index 750a127..e83cd9e 100644
--- a/src/slave/main.cpp
+++ b/src/slave/main.cpp
@@ -27,7 +27,7 @@
#include "common/build.hpp"
-#include "detector/detector.hpp"
+#include "master/detector.hpp"
#include "logging/logging.hpp"
@@ -127,19 +127,18 @@ int main(int argc, char** argv)
LOG(INFO) << "Build: " << build::DATE << " by " << build::USER;
LOG(INFO) << "Starting Mesos slave";
- Files files;
- Slave* slave = new Slave(flags, false, isolator, &files);
- process::spawn(slave);
-
- Try<MasterDetector*> detector =
- MasterDetector::create(master.get(), slave->self(), false, flags.quiet);
+ Try<MasterDetector*> detector = MasterDetector::create(master.get());
CHECK_SOME(detector) << "Failed to create a master detector";
+ Files files;
+ Slave* slave = new Slave(flags, false, detector.get(), isolator, &files);
+ process::spawn(slave);
+
process::wait(slave->self());
delete slave;
- MasterDetector::destroy(detector.get());
+ delete detector.get();
Isolator::destroy(isolator);
return 0;
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index bb98fce..a9be378 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -73,6 +73,7 @@ using namespace state;
Slave::Slave(const slave::Flags& _flags,
bool _local,
+ MasterDetector* _detector,
Isolator* _isolator,
Files* _files)
: ProcessBase(ID::generate("slave")),
@@ -80,7 +81,9 @@ Slave::Slave(const slave::Flags& _flags,
http(*this),
flags(_flags),
local(_local),
+ master(None()),
completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
+ detector(_detector),
isolator(_isolator),
files(_files),
monitor(_isolator),
@@ -281,13 +284,6 @@ void Slave::initialize()
startTime = Clock::now();
// Install protobuf handlers.
- install<NewMasterDetectedMessage>(
- &Slave::newMasterDetected,
- &NewMasterDetectedMessage::pid);
-
- install<NoMasterDetectedMessage>(
- &Slave::noMasterDetected);
-
install<SlaveRegisteredMessage>(
&Slave::registered,
&SlaveRegisteredMessage::slave_id);
@@ -433,10 +429,10 @@ void Slave::shutdown(const UPID& from)
// Allow shutdown message only if
// 1) Its a message received from the registered master or
// 2) If its called locally (e.g tests)
- if (from && from != master) {
+ if (from && (!master.isSome() || from != master.get())) {
LOG(WARNING) << "Ignoring shutdown message from " << from
- << " because it is not from the registered master ("
- << master << ")";
+ << " because it is not from the registered master: "
+ << (master.isSome() ? master.get() : "None/Error");
return;
}
@@ -483,75 +479,67 @@ Nothing Slave::detachFile(const string& path)
}
-void Slave::newMasterDetected(const UPID& pid)
+void Slave::detected(const Future<Result<UPID> >& pid)
{
- LOG(INFO) << "New master detected at " << pid;
-
- master = pid;
- link(master);
-
- // Inform the status updates manager about the new master.
- statusUpdateManager->newMasterDetected(master);
+ CHECK(state == DISCONNECTED ||
+ state == RUNNING ||
+ state == TERMINATING) << state;
- if (flags.recover == "cleanup") {
- LOG(INFO) << "Skipping registration because slave is in 'cleanup' mode";
- return;
+ if (state != TERMINATING) {
+ state = DISCONNECTED;
}
- switch (state) {
- case RECOVERING:
- LOG(INFO) << "Postponing registration until recovery is complete";
- break;
- case DISCONNECTED:
- case RUNNING:
- state = DISCONNECTED;
- doReliableRegistration();
- break;
- case TERMINATING:
- LOG(INFO) << "Skipping registration because slave is terminating";
- break;
- default:
- LOG(FATAL) << "Unexpected slave state " << state;
- break;
- }
-}
+ // Not expecting MasterDetector to discard or fail futures.
+ CHECK(pid.isReady());
+ master = pid.get();
+ if (master.isSome()) {
+ LOG(INFO) << "New master detected at " << master.get();
+ link(master.get());
-void Slave::noMasterDetected()
-{
- LOG(INFO) << "Lost master(s) ... waiting";
- master = UPID();
+ // Inform the status updates manager about the new master.
+ statusUpdateManager->newMasterDetected(master.get());
- CHECK(state == RECOVERING || state == DISCONNECTED ||
- state == RUNNING || state == TERMINATING)
- << state;
+ if (state == TERMINATING) {
+ LOG(INFO) << "Skipping registration because slave is terminating";
+ return;
+ }
- // We only change state if the slave is in RUNNING state because
- // if the slave is in:
- // RECOVERY: Slave needs to finish recovery before changing states.
- // DISCONNECTED: Redundant.
- // TERMINATING: Slave is shutting down.
- // TODO(vinod): Subscribe to master detector after recovery.
- // Similarly, unsubscribe from master detector during termination.
- // Currently it is tricky because master detector is injected into
- // the slave from outside.
- if (state == RUNNING) {
- state = DISCONNECTED;
+ // The slave does not (re-)register if it is in the cleanup mode
+ // because we do not want to accept new tasks.
+ if (flags.recover == "cleanup") {
+ LOG(INFO)
+ << "Skipping registration because slave was started in cleanup mode";
+ return;
+ }
+
+ doReliableRegistration();
+ } else if (master.isNone()) {
+ LOG(INFO) << "Lost leading master";
+ } else {
+ LOG(ERROR) << "Failed to detect a master: " << master.error();
}
+
+ // Keep detecting masters.
+ LOG(INFO) << "Detecting new master";
+ detector->detect(master)
+ .onAny(defer(self(), &Slave::detected, lambda::_1));
}
void Slave::registered(const UPID& from, const SlaveID& slaveId)
{
- if (from != master) {
+ if (!master.isSome() || from != master.get()) {
LOG(WARNING) << "Ignoring registration message from " << from
- << " because it is not the expected master " << master;
+ << " because it is not the expected master: "
+ << (master.isSome() ? master.get() : "NONE/ERROR");
return;
}
switch(state) {
case DISCONNECTED: {
- LOG(INFO) << "Registered with master " << master
+ CHECK_SOME(master);
+ LOG(INFO) << "Registered with master " << master.get()
<< "; given slave ID " << slaveId;
state = RUNNING;
@@ -575,7 +563,8 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
EXIT(1) << "Registered but got wrong id: " << slaveId
<< "(expected: " << info.id() << "). Committing suicide";
}
- LOG(WARNING) << "Already registered with master " << master;
+ CHECK_SOME(master);
+ LOG(WARNING) << "Already registered with master " << master.get();
break;
case TERMINATING:
LOG(WARNING) << "Ignoring registration because slave is terminating";
@@ -590,15 +579,17 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
{
- if (from != master) {
+ if (!master.isSome() || from != master.get()) {
LOG(WARNING) << "Ignoring re-registration message from " << from
- << " because it is not the expected master " << master;
+ << " because it is not the expected master: "
+ << (master.isSome() ? master.get() : "NONE/ERROR");
return;
}
switch(state) {
case DISCONNECTED:
- LOG(INFO) << "Re-registered with master " << master;
+ CHECK_SOME(master);
+ LOG(INFO) << "Re-registered with master " << master.get();
state = RUNNING;
if (!(info.id() == slaveId)) {
@@ -612,7 +603,8 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
EXIT(1) << "Re-registered but got wrong id: " << slaveId
<< "(expected: " << info.id() << "). Committing suicide";
}
- LOG(WARNING) << "Already re-registered with master " << master;
+ CHECK_SOME(master);
+ LOG(WARNING) << "Already re-registered with master " << master.get();
break;
case TERMINATING:
LOG(WARNING) << "Ignoring re-registration because slave is terminating";
@@ -633,7 +625,7 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
void Slave::doReliableRegistration()
{
- if (!master) {
+ if (!master.isSome()) {
LOG(INFO) << "Skipping registration because no master present";
return;
}
@@ -649,7 +641,7 @@ void Slave::doReliableRegistration()
// (Vinod): Is the above comment true?
RegisterSlaveMessage message;
message.mutable_slave()->CopyFrom(info);
- send(master, message);
+ send(master.get(), message);
} else {
// Re-registering, so send tasks running.
ReregisterSlaveMessage message;
@@ -698,7 +690,9 @@ void Slave::doReliableRegistration()
}
}
}
- send(master, message);
+
+ CHECK_SOME(master);
+ send(master.get(), message);
}
// Retry registration if necessary.
@@ -1117,10 +1111,11 @@ void Slave::shutdownFramework(
// Allow shutdownFramework() only if
// its called directly (e.g. Slave::finalize()) or
// its a message from the currently registered master.
- if (from && from != master) {
+ if (from && (!master.isSome() || from != master.get())) {
LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
- << " from " << from << " because it is not from the registered"
- << " master (" << master << ")";
+ << " from " << from
+ << " because it is not from the registered master ("
+ << (master.isSome() ? master.get() : "NONE/ERROR") << ")";
return;
}
@@ -1944,7 +1939,7 @@ void Slave::exited(const UPID& pid)
{
LOG(INFO) << pid << " exited";
- if (master == pid) {
+ if (!master.isSome() || master.get() == pid) {
LOG(WARNING) << "Master disconnected!"
<< " Waiting for a new master to be elected";
// TODO(benh): After so long waiting for a master, commit suicide.
@@ -2236,7 +2231,7 @@ void Slave::executorTerminated(
message.mutable_executor_id()->MergeFrom(executorId);
message.set_status(status);
- send(master, message);
+ if (master.isSome()) { send(master.get(), message); }
}
// Remove the executor if either the framework is terminating or
@@ -2747,6 +2742,9 @@ void Slave::__recover(const Future<Nothing>& future)
LOG(INFO) << "Finished recovery";
+ CHECK_EQ(RECOVERING, state);
+ state = DISCONNECTED;
+
// Schedule all old slave directories for garbage collection.
// TODO(vinod): Do this as part of recovery. This needs a fix
// in the recovery code, to recover all slaves instead of only
@@ -2790,18 +2788,12 @@ void Slave::__recover(const Future<Nothing>& future)
// in 'cleanup' mode.
if (frameworks.empty() && flags.recover == "cleanup") {
terminate(self());
- } else if (flags.recover == "reconnect") {
- // Re-register if reconnecting.
- // NOTE: Since the slave in cleanup mode never re-registers, if
- // the master fails over it will not forward the updates from
- // the "unknown" slave to the scheduler. This could lead to the
- // slave waiting indefinitely for acknowledgements. The master's
- // registrar could help in handling this correctly.
- state = DISCONNECTED;
- if (master) {
- doReliableRegistration();
- }
+ return;
}
+
+ // Start detecting masters.
+ detector->detect(master)
+ .onAny(defer(self(), &Slave::detected, lambda::_1));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index b39eaf4..6d7c3e8 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -30,6 +30,7 @@
#include <mesos/resources.hpp>
#include <process/http.hpp>
+#include <process/future.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
@@ -38,11 +39,14 @@
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/multihashmap.hpp>
+#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/owned.hpp>
#include <stout/path.hpp>
#include <stout/uuid.hpp>
+#include "master/detector.hpp"
+
#include "slave/constants.hpp"
#include "slave/flags.hpp"
#include "slave/gc.hpp"
@@ -59,9 +63,11 @@
#include "messages/messages.hpp"
-
namespace mesos {
namespace internal {
+
+class MasterDetector; // Forward declaration.
+
namespace slave {
using namespace process;
@@ -71,22 +77,19 @@ class StatusUpdateManager;
struct Executor;
struct Framework;
-
class Slave : public ProtobufProcess<Slave>
{
public:
Slave(const Flags& flags,
bool local,
- Isolator *isolator,
+ MasterDetector* detector,
+ Isolator* isolator,
Files* files);
virtual ~Slave();
void shutdown(const process::UPID& from);
- void newMasterDetected(const UPID& pid);
- void noMasterDetected();
- void masterDetectionFailure();
void registered(const process::UPID& from, const SlaveID& slaveId);
void reregistered(const process::UPID& from, const SlaveID& slaveId);
void doReliableRegistration();
@@ -215,6 +218,9 @@ protected:
Nothing detachFile(const std::string& path);
+ // Invoked whenever the detector detects a change in masters.
+ void detected(const Future<Result<UPID> >& pid);
+
// Helper routine to lookup a framework.
Framework* getFramework(const FrameworkID& frameworkId);
@@ -308,7 +314,7 @@ private:
SlaveInfo info;
- UPID master;
+ Result<UPID> master;
Resources resources;
Attributes attributes;
@@ -317,6 +323,8 @@ private:
boost::circular_buffer<Owned<Framework> > completedFrameworks;
+ MasterDetector* detector;
+
Isolator* isolator;
Files* files;
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator_tests.cpp b/src/tests/allocator_tests.cpp
index b0beb72..61ab235 100644
--- a/src/tests/allocator_tests.cpp
+++ b/src/tests/allocator_tests.cpp
@@ -30,9 +30,8 @@
#include <process/gmock.hpp>
#include <process/pid.hpp>
-#include "detector/detector.hpp"
-
#include "master/allocator.hpp"
+#include "master/detector.hpp"
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9d1dd81/src/tests/authentication_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/authentication_tests.cpp b/src/tests/authentication_tests.cpp
index 48a9323..cc8b7a9 100644
--- a/src/tests/authentication_tests.cpp
+++ b/src/tests/authentication_tests.cpp
@@ -277,8 +277,9 @@ TEST_F(AuthenticationTest, MasterFailover)
ASSERT_SOME(master);
MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+ Owned<StandaloneMasterDetector> detector =
+ new StandaloneMasterDetector(master.get());
+ TestingMesosSchedulerDriver driver(&sched, detector.get());
// Drop the authenticate message from the scheduler.
Future<AuthenticateMessage> authenticateMessage =
@@ -287,7 +288,6 @@ TEST_F(AuthenticationTest, MasterFailover)
driver.start();
AWAIT_READY(authenticateMessage);
- UPID frameworkPid = authenticateMessage.get().pid();
// While the authentication is in progress simulate a failed over
// master by restarting the master.
@@ -299,12 +299,8 @@ TEST_F(AuthenticationTest, MasterFailover)
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureSatisfy(®istered));
- // Send a new master detected message to inform the scheduler
- // about the new master.
- NewMasterDetectedMessage newMasterDetectedMsg;
- newMasterDetectedMsg.set_pid(master.get());
-
- process::post(frameworkPid, newMasterDetectedMsg);
+ // Appoint a new master and inform the scheduler about it.
+ detector->appoint(master.get());
// Scheduler should successfully register with the new master.
AWAIT_READY(registered);
@@ -325,11 +321,9 @@ TEST_F(AuthenticationTest, LeaderElection)
ASSERT_SOME(master);
MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
- Future<AuthenticateMessage> authenticateMessage =
- FUTURE_PROTOBUF(AuthenticateMessage(), _, _);
+ Owned<StandaloneMasterDetector> detector =
+ new StandaloneMasterDetector(master.get());
+ TestingMesosSchedulerDriver driver(&sched, detector.get());
// Drop the AuthenticationStepMessage from authenticator.
Future<AuthenticationStepMessage> authenticationStepMessage =
@@ -337,10 +331,6 @@ TEST_F(AuthenticationTest, LeaderElection)
driver.start();
- // Grab the framework pid.
- AWAIT_READY(authenticateMessage);
- UPID frameworkPid = authenticateMessage.get().pid();
-
// Drop the intermediate SASL message so that authentication fails.
AWAIT_READY(authenticationStepMessage);
@@ -348,12 +338,8 @@ TEST_F(AuthenticationTest, LeaderElection)
EXPECT_CALL(sched, registered(&driver, _, _))
.WillOnce(FutureSatisfy(®istered));
- // Send a new master detected message to inform the scheduler
- // about the new master after a leader election.
- NewMasterDetectedMessage newMasterDetectedMsg;
- newMasterDetectedMsg.set_pid(master.get());
-
- process::post(frameworkPid, newMasterDetectedMsg);
+ // Appoint a new master and inform the scheduler about it.
+ detector->appoint(master.get());
// Scheduler should successfully register with the new master.
AWAIT_READY(registered);
@@ -375,11 +361,9 @@ TEST_F(AuthenticationTest, SchedulerFailover)
// Launch the first (i.e., failing) scheduler.
MockScheduler sched1;
- MesosSchedulerDriver driver1(
- &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
- Future<AuthenticateMessage> authenticateMessage =
- FUTURE_PROTOBUF(AuthenticateMessage(), _, _);
+ Owned<StandaloneMasterDetector> detector =
+ new StandaloneMasterDetector(master.get());
+ TestingMesosSchedulerDriver driver1(&sched1, detector.get());
Future<FrameworkID> frameworkId;
EXPECT_CALL(sched1, registered(&driver1, _, _))
@@ -387,10 +371,6 @@ TEST_F(AuthenticationTest, SchedulerFailover)
driver1.start();
- // Grab the framework pid.
- AWAIT_READY(authenticateMessage);
- UPID frameworkPid = authenticateMessage.get().pid();
-
AWAIT_READY(frameworkId);
// Drop the AuthenticationStepMessage from authenticator
@@ -400,11 +380,8 @@ TEST_F(AuthenticationTest, SchedulerFailover)
EXPECT_CALL(sched1, disconnected(&driver1));
- // Send a NewMasterDetected message to elicit authentication.
- NewMasterDetectedMessage newMasterDetectedMsg;
- newMasterDetectedMsg.set_pid(master.get());
-
- process::post(frameworkPid, newMasterDetectedMsg);
+ // Appoint a new master and inform the scheduler about it.
+ detector->appoint(master.get());
AWAIT_READY(authenticationStepMessage);