You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/05/19 21:22:14 UTC
[1/3] mesos git commit: Index slaves by UPID in the master.
Repository: mesos
Updated Branches:
refs/heads/master 26091f461 -> c24268f13
Index slaves by UPID in the master.
Review: https://reviews.apache.org/r/34388
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42cf03af
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42cf03af
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42cf03af
Branch: refs/heads/master
Commit: 42cf03af66f2691d04e5c88ac7e098625d38e0bf
Parents: b19ffd2
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon May 18 18:37:11 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue May 19 11:55:30 2015 -0700
----------------------------------------------------------------------
src/master/master.cpp | 127 +++++++++++++++++++++++----------------------
src/master/master.hpp | 65 ++++++++++++++++++++++-
2 files changed, 129 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/42cf03af/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index eaea79d..d2df99c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -973,7 +973,9 @@ void Master::exited(const UPID& pid)
}
}
- // The semantics when a slave gets disconnected are as follows:
+ // The semantics when a registered slave gets disconnected are as
+ // follows:
+ //
// 1) If the slave is not checkpointing, the slave is immediately
// removed and all tasks running on it are transitioned to LOST.
// No resources are recovered, because the slave is removed.
@@ -985,42 +987,42 @@ void Master::exited(const UPID& pid)
// 2.2) Framework is not-checkpointing: The slave is not removed
// but the framework is removed from the slave's structs,
// its tasks transitioned to LOST and resources recovered.
- foreachvalue (Slave* slave, slaves.registered) {
- if (slave->pid == pid) {
- LOG(INFO) << "Slave " << *slave << " disconnected";
-
- if (!slave->info.checkpoint()) {
- // Remove the slave, if it is not checkpointing.
- LOG(INFO) << "Removing disconnected slave " << *slave
- << " because it is not checkpointing!";
- removeSlave(slave,
- "slave is non-checkpointing and disconnected");
- return;
- } else if (slave->connected) {
- // Checkpointing slaves can just be disconnected.
- disconnect(slave);
+ if (slaves.registered.contains(pid)) {
+ Slave* slave = slaves.registered.get(pid);
+ CHECK_NOTNULL(slave);
+
+ LOG(INFO) << "Slave " << *slave << " disconnected";
+
+ if (!slave->info.checkpoint()) {
+ // Remove the slave, if it is not checkpointing.
+ LOG(INFO) << "Removing disconnected slave " << *slave
+ << " because it is not checkpointing!";
+ removeSlave(slave, "slave is non-checkpointing and disconnected");
+ return;
+ } else if (slave->connected) {
+ // Checkpointing slaves can just be disconnected.
+ disconnect(slave);
- // Remove all non-checkpointing frameworks.
- hashset<FrameworkID> frameworkIds =
+ // Remove all non-checkpointing frameworks.
+ hashset<FrameworkID> frameworkIds =
slave->tasks.keys() | slave->executors.keys();
- foreach (const FrameworkID& frameworkId, frameworkIds) {
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL && !framework->info.checkpoint()) {
- LOG(INFO) << "Removing framework " << *framework
- << " from disconnected slave " << *slave
- << " because the framework is not checkpointing";
+ foreach (const FrameworkID& frameworkId, frameworkIds) {
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL && !framework->info.checkpoint()) {
+ LOG(INFO) << "Removing framework " << *framework
+ << " from disconnected slave " << *slave
+ << " because the framework is not checkpointing";
- removeFramework(slave, framework);
- }
+ removeFramework(slave, framework);
}
- } else {
- // NOTE: A duplicate exited() event is possible for a slave
- // because its PID doesn't change on restart. See MESOS-675
- // for details.
- LOG(WARNING) << "Ignoring duplicate exited() notification for "
- << "checkpointing slave " << *slave;
}
+ } else {
+ // NOTE: A duplicate exited() event is possible for a slave
+ // because its PID doesn't change on restart. See MESOS-675
+ // for details.
+ LOG(WARNING) << "Ignoring duplicate exited() notification for "
+ << "checkpointing slave " << *slave;
}
}
}
@@ -3094,31 +3096,30 @@ void Master::registerSlave(
}
// Check if this slave is already registered (because it retries).
- foreachvalue (Slave* slave, slaves.registered) {
- if (slave->pid == from) {
- if (!slave->connected) {
- // The slave was previously disconnected but it is now trying
- // to register as a new slave. This could happen if the slave
- // failed recovery and hence registering as a new slave before
- // the master removed the old slave from its map.
- LOG(INFO)
- << "Removing old disconnected slave " << *slave
- << " because a registration attempt is being made from " << from;
- removeSlave(slave,
- "a new slave registered at the same address",
- metrics->slave_removals_reason_registered);
- break;
- } else {
- CHECK(slave->active)
- << "Unexpected connected but deactivated slave " << *slave;
-
- LOG(INFO) << "Slave " << *slave << " already registered,"
- << " resending acknowledgement";
- SlaveRegisteredMessage message;
- message.mutable_slave_id()->MergeFrom(slave->id);
- send(from, message);
- return;
- }
+ if (slaves.registered.contains(from)) {
+ Slave* slave = slaves.registered.get(from);
+ CHECK_NOTNULL(slave);
+
+ if (!slave->connected) {
+ // The slave was previously disconnected but it is now trying
+ // to register as a new slave. This could happen if the slave
+ // failed recovery and hence registering as a new slave before
+ // the master removed the old slave from its map.
+ LOG(INFO) << "Removing old disconnected slave " << *slave
+ << " because a registration attempt occurred";
+ removeSlave(slave,
+ "a new slave registered at the same address",
+ metrics->slave_removals_reason_registered);
+ } else {
+ CHECK(slave->active)
+ << "Unexpected connected but deactivated slave " << *slave;
+
+ LOG(INFO) << "Slave " << *slave << " already registered,"
+ << " resending acknowledgement";
+ SlaveRegisteredMessage message;
+ message.mutable_slave_id()->MergeFrom(slave->id);
+ send(from, message);
+ return;
}
}
@@ -3574,7 +3575,8 @@ void Master::exitedExecutor(
return;
}
- Slave* slave = CHECK_NOTNULL(slaves.registered[slaveId]);
+ Slave* slave = slaves.registered.get(slaveId);
+ CHECK_NOTNULL(slave);
if (!slave->hasExecutor(frameworkId, executorId)) {
LOG(WARNING) << "Ignoring unknown exited executor '" << executorId
@@ -3624,7 +3626,7 @@ void Master::shutdown(
return;
}
- Slave* slave = slaves.registered[shutdown.slave_id()];
+ Slave* slave = slaves.registered.get(shutdown.slave_id());
CHECK_NOTNULL(slave);
ShutdownExecutorMessage message;
@@ -3643,7 +3645,7 @@ void Master::shutdownSlave(const SlaveID& slaveId, const string& message)
return;
}
- Slave* slave = slaves.registered[slaveId];
+ Slave* slave = slaves.registered.get(slaveId);
CHECK_NOTNULL(slave);
LOG(WARNING) << "Shutting down slave " << *slave << " with message '"
@@ -3919,7 +3921,8 @@ void Master::offer(const FrameworkID& frameworkId,
continue;
}
- Slave* slave = slaves.registered[slaveId];
+ Slave* slave = slaves.registered.get(slaveId);
+ CHECK_NOTNULL(slave);
CHECK(slave->info.checkpoint() || !framework->info.checkpoint())
<< "Resources of non checkpointing slave " << *slave
@@ -4599,7 +4602,7 @@ void Master::addSlave(
CHECK_NOTNULL(slave);
slaves.removed.erase(slave->id);
- slaves.registered[slave->id] = slave;
+ slaves.registered.put(slave);
link(slave->pid);
@@ -4734,7 +4737,7 @@ void Master::removeSlave(
// Mark the slave as being removed.
slaves.removing.insert(slave->id);
- slaves.registered.erase(slave->id);
+ slaves.registered.remove(slave);
slaves.removed.put(slave->id, Nothing());
authenticated.erase(slave->pid);
http://git-wip-us.apache.org/repos/asf/mesos/blob/42cf03af/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0922a7c..4a94e23 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1154,7 +1154,70 @@ private:
// these slaves until the registrar determines their fate.
hashset<SlaveID> reregistering;
- hashmap<SlaveID, Slave*> registered;
+ // Registered slaves are indexed by SlaveID and UPID. Note that
+ // iteration is supported but is exposed as iteration over a
+ // hashmap<SlaveID, Slave*> since it is tedious to convert
+ // the map's key/value iterator into a value iterator.
+ //
+ // TODO(bmahler): Consider pulling in boost's multi_index,
+ // or creating a simpler indexing abstraction in stout.
+ struct
+ {
+ bool contains(const SlaveID& slaveId) const
+ {
+ return ids.contains(slaveId);
+ }
+
+ bool contains(const process::UPID& pid) const
+ {
+ return pids.contains(pid);
+ }
+
+ Slave* get(const SlaveID& slaveId) const
+ {
+ return ids.get(slaveId).get(NULL);
+ }
+
+ Slave* get(const process::UPID& pid) const
+ {
+ return pids.get(pid).get(NULL);
+ }
+
+ void put(Slave* slave)
+ {
+ CHECK_NOTNULL(slave);
+ ids[slave->id] = slave;
+ pids[slave->pid] = slave;
+ }
+
+ void remove(Slave* slave)
+ {
+ CHECK_NOTNULL(slave);
+ ids.erase(slave->id);
+ pids.erase(slave->pid);
+ }
+
+ void clear()
+ {
+ ids.clear();
+ pids.clear();
+ }
+
+ size_t size() const { return ids.size(); }
+
+ typedef hashmap<SlaveID, Slave*>::iterator iterator;
+ typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
+
+ iterator begin() { return ids.begin(); }
+ iterator end() { return ids.end(); }
+
+ const_iterator begin() const { return ids.begin(); }
+ const_iterator end() const { return ids.end(); }
+
+ private:
+ hashmap<SlaveID, Slave*> ids;
+ hashmap<process::UPID, Slave*> pids;
+ } registered;
// Slaves that are in the process of being removed from the
// registrar. Think of these as being partially removed: we must
[2/3] mesos git commit: Moved up Slave and Framework structs in
master.hpp.
Posted by bm...@apache.org.
Moved up Slave and Framework structs in master.hpp.
Review: https://reviews.apache.org/r/34387
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b19ffd2f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b19ffd2f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b19ffd2f
Branch: refs/heads/master
Commit: b19ffd2fe72d50fbd9f62c4c32eaa0906b2bd177
Parents: 26091f4
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon May 18 18:17:59 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue May 19 11:55:30 2015 -0700
----------------------------------------------------------------------
src/master/master.hpp | 1951 ++++++++++++++++++++++----------------------
1 file changed, 974 insertions(+), 977 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b19ffd2f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index da0a835..0922a7c 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -89,1228 +89,1225 @@ class Repairer;
class SlaveObserver;
struct BoundedRateLimiter;
-struct Framework;
-struct Role;
-struct Slave;
-class Master : public ProtobufProcess<Master>
+struct Slave
{
-public:
- Master(mesos::master::allocator::Allocator* allocator,
- Registrar* registrar,
- Repairer* repairer,
- Files* files,
- MasterContender* contender,
- MasterDetector* detector,
- const Option<Authorizer*>& authorizer,
- const Option<std::shared_ptr<process::RateLimiter>>&
- slaveRemovalLimiter,
- const Flags& flags = Flags());
-
- virtual ~Master();
-
- // Message handlers.
- void submitScheduler(
- const std::string& name);
-
- void registerFramework(
- const process::UPID& from,
- const FrameworkInfo& frameworkInfo);
-
- void reregisterFramework(
- const process::UPID& from,
- const FrameworkInfo& frameworkInfo,
- bool failover);
-
- void unregisterFramework(
- const process::UPID& from,
- const FrameworkID& frameworkId);
-
- void deactivateFramework(
- const process::UPID& from,
- const FrameworkID& frameworkId);
-
- // TODO(vinod): Remove this once the old driver is removed.
- void resourceRequest(
- const process::UPID& from,
- const FrameworkID& frameworkId,
- const std::vector<Request>& requests);
-
- void launchTasks(
- const process::UPID& from,
- const FrameworkID& frameworkId,
- const std::vector<TaskInfo>& tasks,
- const Filters& filters,
- const std::vector<OfferID>& offerIds);
-
- void reviveOffers(
- const process::UPID& from,
- const FrameworkID& frameworkId);
-
- void killTask(
- const process::UPID& from,
- const FrameworkID& frameworkId,
- const TaskID& taskId);
-
- void statusUpdateAcknowledgement(
- const process::UPID& from,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const TaskID& taskId,
- const std::string& uuid);
-
- void schedulerMessage(
- const process::UPID& from,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const std::string& data);
-
- void registerSlave(
- const process::UPID& from,
- const SlaveInfo& slaveInfo,
- const std::vector<Resource>& checkpointedResources,
- const std::string& version);
-
- void reregisterSlave(
- const process::UPID& from,
- const SlaveInfo& slaveInfo,
- const std::vector<Resource>& checkpointedResources,
- const std::vector<ExecutorInfo>& executorInfos,
- const std::vector<Task>& tasks,
- const std::vector<Archive::Framework>& completedFrameworks,
- const std::string& version);
-
- void unregisterSlave(
- const process::UPID& from,
- const SlaveID& slaveId);
-
- void statusUpdate(
- const StatusUpdate& update,
- const process::UPID& pid);
-
- void reconcileTasks(
- const process::UPID& from,
- const FrameworkID& frameworkId,
- const std::vector<TaskStatus>& statuses);
-
- void exitedExecutor(
- const process::UPID& from,
- const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- int32_t status);
-
- void shutdownSlave(
- const SlaveID& slaveId,
- const std::string& message);
-
- void authenticate(
- const process::UPID& from,
- const process::UPID& pid);
-
- // TODO(bmahler): It would be preferred to use a unique libprocess
- // Process identifier (PID is not sufficient) for identifying the
- // framework instance, rather than relying on re-registration time.
- void frameworkFailoverTimeout(
- const FrameworkID& frameworkId,
- const process::Time& reregisteredTime);
+ Slave(const SlaveInfo& _info,
+ const process::UPID& _pid,
+ const Option<std::string> _version,
+ const process::Time& _registeredTime,
+ const Resources& _checkpointedResources,
+ const std::vector<ExecutorInfo> executorInfos =
+ std::vector<ExecutorInfo>(),
+ const std::vector<Task> tasks =
+ std::vector<Task>())
+ : id(_info.id()),
+ info(_info),
+ pid(_pid),
+ version(_version),
+ registeredTime(_registeredTime),
+ connected(true),
+ active(true),
+ checkpointedResources(_checkpointedResources),
+ observer(NULL)
+ {
+ CHECK(_info.has_id());
- void offer(
- const FrameworkID& framework,
- const hashmap<SlaveID, Resources>& resources);
+ Try<Resources> resources = applyCheckpointedResources(
+ info.resources(),
+ _checkpointedResources);
- // Invoked when there is a newly elected leading master.
- // Made public for testing purposes.
- void detected(const process::Future<Option<MasterInfo>>& pid);
+ // NOTE: This should be validated during slave recovery.
+ CHECK_SOME(resources);
+ totalResources = resources.get();
- // Invoked when the contender has lost the candidacy.
- // Made public for testing purposes.
- void lostCandidacy(const process::Future<Nothing>& lost);
+ foreach (const ExecutorInfo& executorInfo, executorInfos) {
+ CHECK(executorInfo.has_framework_id());
+ addExecutor(executorInfo.framework_id(), executorInfo);
+ }
- // Continuation of recover().
- // Made public for testing purposes.
- process::Future<Nothing> _recover(const Registry& registry);
+ foreach (const Task& task, tasks) {
+ addTask(new Task(task));
+ }
+ }
- // Continuation of reregisterSlave().
- // Made public for testing purposes.
- // TODO(vinod): Instead of doing this create and use a
- // MockRegistrar.
- // TODO(dhamon): Consider FRIEND_TEST macro from gtest.
- void _reregisterSlave(
- const SlaveInfo& slaveInfo,
- const process::UPID& pid,
- const std::vector<Resource>& checkpointedResources,
- const std::vector<ExecutorInfo>& executorInfos,
- const std::vector<Task>& tasks,
- const std::vector<Archive::Framework>& completedFrameworks,
- const std::string& version,
- const process::Future<bool>& readmit);
+ ~Slave() {}
- MasterInfo info() const
+ Task* getTask(const FrameworkID& frameworkId, const TaskID& taskId)
{
- return info_;
+ if (tasks.contains(frameworkId) && tasks[frameworkId].contains(taskId)) {
+ return tasks[frameworkId][taskId];
+ }
+ return NULL;
}
-protected:
- virtual void initialize();
- virtual void finalize();
- virtual void exited(const process::UPID& pid);
- virtual void visit(const process::MessageEvent& event);
- virtual void visit(const process::ExitedEvent& event);
-
- // Invoked when the message is ready to be executed after
- // being throttled.
- // 'principal' being None indicates it is throttled by
- // 'defaultLimiter'.
- void throttled(
- const process::MessageEvent& event,
- const Option<std::string>& principal);
-
- // Continuations of visit().
- void _visit(const process::MessageEvent& event);
- void _visit(const process::ExitedEvent& event);
-
- // Helper method invoked when the capacity for a framework
- // principal is exceeded.
- void exceededCapacity(
- const process::MessageEvent& event,
- const Option<std::string>& principal,
- uint64_t capacity);
-
- // Recovers state from the registrar.
- process::Future<Nothing> recover();
- void recoveredSlavesTimeout(const Registry& registry);
+ void addTask(Task* task)
+ {
+ const TaskID& taskId = task->task_id();
+ const FrameworkID& frameworkId = task->framework_id();
- void _registerSlave(
- const SlaveInfo& slaveInfo,
- const process::UPID& pid,
- const std::vector<Resource>& checkpointedResources,
- const std::string& version,
- const process::Future<bool>& admit);
+ CHECK(!tasks[frameworkId].contains(taskId))
+ << "Duplicate task " << taskId << " of framework " << frameworkId;
- void __reregisterSlave(
- Slave* slave,
- const std::vector<Task>& tasks);
+ tasks[frameworkId][taskId] = task;
- // 'authenticate' is the future returned by the authenticator.
- void _authenticate(
- const process::UPID& pid,
- const process::Future<Option<std::string>>& authenticate);
+ if (!protobuf::isTerminalState(task->state())) {
+ usedResources[frameworkId] += task->resources();
+ }
- void authenticationTimeout(process::Future<Option<std::string>> future);
+ LOG(INFO) << "Adding task " << taskId
+ << " with resources " << task->resources()
+ << " on slave " << id << " (" << info.hostname() << ")";
+ }
- void fileAttached(const process::Future<Nothing>& result,
- const std::string& path);
+ // Notification of task termination, for resource accounting.
+ // TODO(bmahler): This is a hack for performance. We need to
+ // maintain resource counters because computing task resources
+ // functionally for all tasks is expensive, for now.
+ void taskTerminated(Task* task)
+ {
+ const TaskID& taskId = task->task_id();
+ const FrameworkID& frameworkId = task->framework_id();
- // Invoked when the contender has entered the contest.
- void contended(const process::Future<process::Future<Nothing>>& candidacy);
+ CHECK(protobuf::isTerminalState(task->state()));
+ CHECK(tasks[frameworkId].contains(taskId))
+ << "Unknown task " << taskId << " of framework " << frameworkId;
- // Task reconciliation, split from the message handler
- // to allow re-use.
- void _reconcileTasks(
- Framework* framework,
- const std::vector<TaskStatus>& statuses);
+ usedResources[frameworkId] -= task->resources();
+ if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) {
+ usedResources.erase(frameworkId);
+ }
+ }
- // Handles a known re-registering slave by reconciling the master's
- // view of the slave's tasks and executors.
- void reconcile(
- Slave* slave,
- const std::vector<ExecutorInfo>& executors,
- const std::vector<Task>& tasks);
+ void removeTask(Task* task)
+ {
+ const TaskID& taskId = task->task_id();
+ const FrameworkID& frameworkId = task->framework_id();
- // 'registerFramework()' continuation.
- void _registerFramework(
- const process::UPID& from,
- const FrameworkInfo& frameworkInfo,
- const process::Future<Option<Error>>& validationError);
+ CHECK(tasks[frameworkId].contains(taskId))
+ << "Unknown task " << taskId << " of framework " << frameworkId;
- // 'reregisterFramework()' continuation.
- void _reregisterFramework(
- const process::UPID& from,
- const FrameworkInfo& frameworkInfo,
- bool failover,
- const process::Future<Option<Error>>& validationError);
+ if (!protobuf::isTerminalState(task->state())) {
+ usedResources[frameworkId] -= task->resources();
+ if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) {
+ usedResources.erase(frameworkId);
+ }
+ }
- // Add a framework.
- void addFramework(Framework* framework);
+ tasks[frameworkId].erase(taskId);
+ if (tasks[frameworkId].empty()) {
+ tasks.erase(frameworkId);
+ }
- // Replace the scheduler for a framework with a new process ID, in
- // the event of a scheduler failover.
- void failoverFramework(Framework* framework, const process::UPID& newPid);
+ killedTasks.remove(frameworkId, taskId);
+ }
- // Kill all of a framework's tasks, delete the framework object, and
- // reschedule offers that were assigned to this framework.
- void removeFramework(Framework* framework);
+ void addOffer(Offer* offer)
+ {
+ CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
- // Remove a framework from the slave, i.e., remove its tasks and
- // executors and recover the resources.
- void removeFramework(Slave* slave, Framework* framework);
+ offers.insert(offer);
+ offeredResources += offer->resources();
+ }
- void disconnect(Framework* framework);
- void deactivate(Framework* framework);
+ void removeOffer(Offer* offer)
+ {
+ CHECK(offers.contains(offer)) << "Unknown offer " << offer->id();
- void disconnect(Slave* slave);
- void deactivate(Slave* slave);
+ offeredResources -= offer->resources();
+ offers.erase(offer);
+ }
- // Add a slave.
- void addSlave(
- Slave* slave,
- const std::vector<Archive::Framework>& completedFrameworks =
- std::vector<Archive::Framework>());
+ bool hasExecutor(const FrameworkID& frameworkId,
+ const ExecutorID& executorId) const
+ {
+ return executors.contains(frameworkId) &&
+ executors.get(frameworkId).get().contains(executorId);
+ }
- // Remove the slave from the registrar. Called when the slave
- // does not re-register in time after a master failover.
- Nothing removeSlave(const Registry::Slave& slave);
+ void addExecutor(const FrameworkID& frameworkId,
+ const ExecutorInfo& executorInfo)
+ {
+ CHECK(!hasExecutor(frameworkId, executorInfo.executor_id()))
+ << "Duplicate executor " << executorInfo.executor_id()
+ << " of framework " << frameworkId;
- // Remove the slave from the registrar and from the master's state.
- //
- // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
- void removeSlave(
- Slave* slave,
- const std::string& message,
- Option<process::metrics::Counter> reason = None());
+ executors[frameworkId][executorInfo.executor_id()] = executorInfo;
+ usedResources[frameworkId] += executorInfo.resources();
+ }
- void _removeSlave(
- const SlaveInfo& slaveInfo,
- const std::vector<StatusUpdate>& updates,
- const process::Future<bool>& removed,
- const std::string& message,
- Option<process::metrics::Counter> reason = None());
+ void removeExecutor(const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+ {
+ CHECK(hasExecutor(frameworkId, executorId))
+ << "Unknown executor " << executorId << " of framework " << frameworkId;
- // Authorizes the task.
- // Returns true if task is authorized.
- // Returns false if task is not authorized.
- // Returns failure for transient authorization failures.
- process::Future<bool> authorizeTask(
- const TaskInfo& task,
- Framework* framework);
+ usedResources[frameworkId] -=
+ executors[frameworkId][executorId].resources();
- // Add the task and its executor (if not already running) to the
- // framework and slave. Returns the resources consumed as a result,
- // which includes resources for the task and its executor
- // (if not already running).
- Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave);
+ // XXX Remove.
- // Transitions the task, and recovers resources if the task becomes
- // terminal.
- void updateTask(Task* task, const StatusUpdate& update);
+ executors[frameworkId].erase(executorId);
+ if (executors[frameworkId].empty()) {
+ executors.erase(frameworkId);
+ }
+ }
- // Removes the task.
- void removeTask(Task* task);
+ void apply(const Offer::Operation& operation)
+ {
+ Try<Resources> resources = totalResources.apply(operation);
+ CHECK_SOME(resources);
- // Remove an executor and recover its resources.
- void removeExecutor(
- Slave* slave,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId);
+ totalResources = resources.get();
+ checkpointedResources = totalResources.filter(needCheckpointing);
+ }
- // Updates slave's resources by applying the given operation. It
- // also updates the allocator and sends a CheckpointResourcesMessage
- // to the slave with slave's current checkpointed resources.
- void applyOfferOperation(
- Framework* framework,
- Slave* slave,
- const Offer::Operation& operation);
+ const SlaveID id;
+ const SlaveInfo info;
- // Forwards the update to the framework.
- void forward(
- const StatusUpdate& update,
- const process::UPID& acknowledgee,
- Framework* framework);
+ process::UPID pid;
- // Remove an offer after specified timeout
- void offerTimeout(const OfferID& offerId);
+ // The Mesos version of the slave. If set, the slave is >= 0.21.0.
+ // TODO(bmahler): Use stout's Version when it can parse labels, etc.
+ // TODO(bmahler): Make this required once it is always set.
+ const Option<std::string> version;
- // Remove an offer and optionally rescind the offer as well.
- void removeOffer(Offer* offer, bool rescind = false);
+ process::Time registeredTime;
+ Option<process::Time> reregisteredTime;
- Framework* getFramework(const FrameworkID& frameworkId);
- Slave* getSlave(const SlaveID& slaveId);
- Offer* getOffer(const OfferID& offerId);
+ // Slave becomes disconnected when the socket closes.
+ bool connected;
- FrameworkID newFrameworkId();
- OfferID newOfferId();
- SlaveID newSlaveId();
+ // Slave becomes deactivated when it gets disconnected. In the
+ // future this might also happen via HTTP endpoint.
+ // No offers will be made for a deactivated slave.
+ bool active;
- Option<Credentials> credentials;
+ // Executors running on this slave.
+ hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo>> executors;
-private:
- void drop(
- const process::UPID& from,
- const scheduler::Call& call,
- const std::string& message);
+ // Tasks present on this slave.
+ // TODO(bmahler): The task pointer ownership complexity arises from the fact
+ // that we own the pointer here, but it's shared with the Framework struct.
+ // We should find a way to eliminate this.
+ hashmap<FrameworkID, hashmap<TaskID, Task*>> tasks;
- void drop(
- Framework* framework,
- const Offer::Operation& operation,
- const std::string& message);
+ // Tasks that were asked to kill by frameworks.
+ // This is used for reconciliation when the slave re-registers.
+ multihashmap<FrameworkID, TaskID> killedTasks;
- // Call handlers.
- void receive(
- const process::UPID& from,
- const scheduler::Call& call);
+ // Active offers on this slave.
+ hashset<Offer*> offers;
- void accept(
- Framework* framework,
- const scheduler::Call::Accept& accept);
+ hashmap<FrameworkID, Resources> usedResources; // Active task / executors.
+ Resources offeredResources; // Offers.
- void _accept(
- const FrameworkID& frameworkId,
- const SlaveID& slaveId,
- const Resources& offeredResources,
- const scheduler::Call::Accept& accept,
- const process::Future<std::list<process::Future<bool>>>& authorizations);
+ // Resources that should be checkpointed by the slave (e.g.,
+ // persistent volumes, dynamic reservations, etc). These are either
+ // in use by a task/executor, or are available for use and will be
+ // re-offered to the framework.
+ Resources checkpointedResources;
- void reconcile(
- Framework* framework,
- const scheduler::Call::Reconcile& reconcile);
+ // The current total resources of the slave. Note that this is
+ // different from 'info.resources()' because this also consider
+ // operations (e.g., CREATE, RESERVE) that have been applied.
+ Resources totalResources;
- void kill(
- Framework* framework,
- const scheduler::Call::Kill& kill);
+ SlaveObserver* observer;
- void shutdown(
- Framework* framework,
- const scheduler::Call::Shutdown& shutdown);
+private:
+ Slave(const Slave&); // No copying.
+ Slave& operator = (const Slave&); // No assigning.
+};
- bool elected() const
- {
- return leader.isSome() && leader.get() == info_;
- }
- // Inner class used to namespace HTTP route handlers (see
- // master/http.cpp for implementations).
- class Http
- {
- public:
- explicit Http(Master* _master) : master(_master) {}
+inline std::ostream& operator << (std::ostream& stream, const Slave& slave)
+{
+ return stream << slave.id << " at " << slave.pid
+ << " (" << slave.info.hostname() << ")";
+}
- // Logs the request, route handlers can compose this with the
- // desired request handler to get consistent request logging.
- static void log(const process::http::Request& request);
- // /master/health
- process::Future<process::http::Response> health(
- const process::http::Request& request) const;
+// Information about a connected or completed framework.
+// TODO(bmahler): Keeping the task and executor information in sync
+// across the Slave and Framework structs is error prone!
+struct Framework
+{
+ Framework(const FrameworkInfo& _info,
+ const process::UPID& _pid,
+ const process::Time& time = process::Clock::now())
+ : info(_info),
+ pid(_pid),
+ connected(true),
+ active(true),
+ registeredTime(time),
+ reregisteredTime(time),
+ completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {}
- // /master/observe
- process::Future<process::http::Response> observe(
- const process::http::Request& request) const;
+ ~Framework() {}
- // /master/redirect
- process::Future<process::http::Response> redirect(
- const process::http::Request& request) const;
+ Task* getTask(const TaskID& taskId)
+ {
+ if (tasks.count(taskId) > 0) {
+ return tasks[taskId];
+ } else {
+ return NULL;
+ }
+ }
- // /master/roles.json
- process::Future<process::http::Response> roles(
- const process::http::Request& request) const;
+ void addTask(Task* task)
+ {
+ CHECK(!tasks.contains(task->task_id()))
+ << "Duplicate task " << task->task_id()
+ << " of framework " << task->framework_id();
- // /master/teardown and /master/shutdown (deprecated).
- process::Future<process::http::Response> teardown(
- const process::http::Request& request) const;
+ tasks[task->task_id()] = task;
- // /master/slaves
- process::Future<process::http::Response> slaves(
- const process::http::Request& request) const;
+ if (!protobuf::isTerminalState(task->state())) {
+ totalUsedResources += task->resources();
+ usedResources[task->slave_id()] += task->resources();
+ }
+ }
- // /master/state.json
- process::Future<process::http::Response> state(
- const process::http::Request& request) const;
+ // Notification of task termination, for resource accounting.
+ // TODO(bmahler): This is a hack for performance. We need to
+ // maintain resource counters because computing task resources
+ // functionally for all tasks is expensive, for now.
+ void taskTerminated(Task* task)
+ {
+ CHECK(protobuf::isTerminalState(task->state()));
+ CHECK(tasks.contains(task->task_id()))
+ << "Unknown task " << task->task_id()
+ << " of framework " << task->framework_id();
- // /master/state-summary
- process::Future<process::http::Response> stateSummary(
- const process::http::Request& request) const;
+ totalUsedResources -= task->resources();
+ usedResources[task->slave_id()] -= task->resources();
+ if (usedResources[task->slave_id()].empty()) {
+ usedResources.erase(task->slave_id());
+ }
+ }
- // /master/tasks.json
- process::Future<process::http::Response> tasks(
- const process::http::Request& request) const;
+ void addCompletedTask(const Task& task)
+ {
+ // TODO(adam-mesos): Check if completed task already exists.
+ completedTasks.push_back(std::shared_ptr<Task>(new Task(task)));
+ }
- const static std::string HEALTH_HELP;
- const static std::string OBSERVE_HELP;
- const static std::string REDIRECT_HELP;
- const static std::string SHUTDOWN_HELP; // Deprecated.
- const static std::string TEARDOWN_HELP;
- const static std::string SLAVES_HELP;
- const static std::string TASKS_HELP;
+ void removeTask(Task* task)
+ {
+ CHECK(tasks.contains(task->task_id()))
+ << "Unknown task " << task->task_id()
+ << " of framework " << task->framework_id();
- private:
- // Helper for doing authentication, returns the credential used if
- // the authentication was successful (or none if no credentials
- // have been given to the master), otherwise an Error.
- Result<Credential> authenticate(
- const process::http::Request& request) const;
+ if (!protobuf::isTerminalState(task->state())) {
+ totalUsedResources -= task->resources();
+ usedResources[task->slave_id()] -= task->resources();
+ if (usedResources[task->slave_id()].empty()) {
+ usedResources.erase(task->slave_id());
+ }
+ }
- // Continuations.
- process::Future<process::http::Response> _teardown(
- const FrameworkID& id,
- bool authorized = true) const;
+ addCompletedTask(*task);
- Master* master;
- };
+ tasks.erase(task->task_id());
+ }
- Master(const Master&); // No copying.
- Master& operator = (const Master&); // No assigning.
+ void addOffer(Offer* offer)
+ {
+ CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
+ offers.insert(offer);
+ totalOfferedResources += offer->resources();
+ offeredResources[offer->slave_id()] += offer->resources();
+ }
- friend struct Metrics;
+ void removeOffer(Offer* offer)
+ {
+ CHECK(offers.find(offer) != offers.end())
+ << "Unknown offer " << offer->id();
- // NOTE: Since 'getOffer' and 'getSlave' are protected, we need to
- // make the following functions friends so that validation functions
- // can get Offer* and Slave*.
- friend Offer* validation::offer::getOffer(
- Master* master, const OfferID& offerId);
+ totalOfferedResources -= offer->resources();
+ offeredResources[offer->slave_id()] -= offer->resources();
+ if (offeredResources[offer->slave_id()].empty()) {
+ offeredResources.erase(offer->slave_id());
+ }
- friend Slave* validation::offer::getSlave(
- Master* master, const SlaveID& slaveId);
+ offers.erase(offer);
+ }
- const Flags flags;
+ bool hasExecutor(const SlaveID& slaveId,
+ const ExecutorID& executorId)
+ {
+ return executors.contains(slaveId) &&
+ executors[slaveId].contains(executorId);
+ }
- Option<MasterInfo> leader; // Current leading master.
+ void addExecutor(const SlaveID& slaveId,
+ const ExecutorInfo& executorInfo)
+ {
+ CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
+ << "Duplicate executor " << executorInfo.executor_id()
+ << " on slave " << slaveId;
- mesos::master::allocator::Allocator* allocator;
- WhitelistWatcher* whitelistWatcher;
- Registrar* registrar;
- Repairer* repairer;
- Files* files;
+ executors[slaveId][executorInfo.executor_id()] = executorInfo;
+ totalUsedResources += executorInfo.resources();
+ usedResources[slaveId] += executorInfo.resources();
+ }
- MasterContender* contender;
- MasterDetector* detector;
+ void removeExecutor(const SlaveID& slaveId,
+ const ExecutorID& executorId)
+ {
+ CHECK(hasExecutor(slaveId, executorId))
+ << "Unknown executor " << executorId
+ << " of framework " << id()
+ << " of slave " << slaveId;
- const Option<Authorizer*> authorizer;
+ totalUsedResources -= executors[slaveId][executorId].resources();
+ usedResources[slaveId] -= executors[slaveId][executorId].resources();
+ if (usedResources[slaveId].empty()) {
+ usedResources.erase(slaveId);
+ }
- MasterInfo info_;
+ executors[slaveId].erase(executorId);
+ if (executors[slaveId].empty()) {
+ executors.erase(slaveId);
+ }
+ }
- // Indicates when recovery is complete. Recovery begins once the
- // master is elected as a leader.
- Option<process::Future<Nothing>> recovered;
+ const FrameworkID id() const { return info.id(); }
- struct Slaves
+ // Update fields in 'info' using those in 'source'. Currently this
+ // only updates 'name', 'failover_timeout', 'hostname', and
+ // 'webui_url'.
+ void updateFrameworkInfo(const FrameworkInfo& source)
{
- Slaves() : removed(MAX_REMOVED_SLAVES) {}
+ // TODO(jmlvanre): We can't check 'FrameworkInfo.id' yet because
+ // of MESOS-2559. Once this is fixed we can 'CHECK' that we only
+ // merge 'info' from the same framework 'id'.
- // Imposes a time limit for slaves that we recover from the
- // registry to re-register with the master.
- Option<process::Timer> recoveredTimer;
+ // TODO(jmlvanre): Merge other fields as per design doc in
+ // MESOS-703.
- // Slaves that have been recovered from the registrar but have yet
- // to re-register. We keep a "reregistrationTimer" above to ensure
- // we remove these slaves if they do not re-register.
- hashset<SlaveID> recovered;
+ if (source.user() != info.user()) {
+ LOG(WARNING) << "Can not update FrameworkInfo.user to '" << info.user()
+ << "' for framework " << id() << ". Check MESOS-703";
+ }
- // Slaves that are in the process of registering.
- hashset<process::UPID> registering;
+ info.set_name(source.name());
- // Only those slaves that are re-registering for the first time
- // with this master. We must not answer questions related to
- // these slaves until the registrar determines their fate.
- hashset<SlaveID> reregistering;
+ if (source.has_failover_timeout()) {
+ info.set_failover_timeout(source.failover_timeout());
+ } else {
+ info.clear_failover_timeout();
+ }
- hashmap<SlaveID, Slave*> registered;
+ if (source.checkpoint() != info.checkpoint()) {
+ LOG(WARNING) << "Can not update FrameworkInfo.checkpoint to '"
+ << stringify(info.checkpoint()) << "' for framework " << id()
+ << ". Check MESOS-703";
+ }
- // Slaves that are in the process of being removed from the
- // registrar. Think of these as being partially removed: we must
- // not answer questions related to these until they are removed
- // from the registry.
- hashset<SlaveID> removing;
+ if (source.role() != info.role()) {
+ LOG(WARNING) << "Can not update FrameworkInfo.role to '" << info.role()
+ << "' for framework " << id() << ". Check MESOS-703";
+ }
- // We track removed slaves to preserve the consistency
- // semantics of the pre-registrar code when a non-strict registrar
- // is being used. That is, if we remove a slave, we must make
- // an effort to prevent it from (re-)registering, sending updates,
- // etc. We keep a cache here to prevent this from growing in an
- // unbounded manner.
- // TODO(bmahler): Ideally we could use a cache with set semantics.
- Cache<SlaveID, Nothing> removed;
+ if (source.has_hostname()) {
+ info.set_hostname(source.hostname());
+ } else {
+ info.clear_hostname();
+ }
- // This rate limiter is used to limit the removal of slaves failing
- // health checks.
- // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
- // a wrapper around libprocess process which is thread safe.
- Option<std::shared_ptr<process::RateLimiter>> limiter;
+ if (source.principal() != info.principal()) {
+ LOG(WARNING) << "Can not update FrameworkInfo.principal to '"
+ << info.principal() << "' for framework " << id()
+ << ". Check MESOS-703";
+ }
- bool transitioning(const Option<SlaveID>& slaveId)
- {
- if (slaveId.isSome()) {
- return recovered.contains(slaveId.get()) ||
- reregistering.contains(slaveId.get()) ||
- removing.contains(slaveId.get());
- } else {
- return !recovered.empty() ||
- !reregistering.empty() ||
- !removing.empty();
- }
+ if (source.has_webui_url()) {
+ info.set_webui_url(source.webui_url());
+ } else {
+ info.clear_webui_url();
}
- } slaves;
+ }
- struct Frameworks
- {
- Frameworks() : completed(MAX_COMPLETED_FRAMEWORKS) {}
+ FrameworkInfo info;
- hashmap<FrameworkID, Framework*> registered;
- boost::circular_buffer<std::shared_ptr<Framework>> completed;
+ process::UPID pid;
- // Principals of frameworks keyed by PID.
- // NOTE: Multiple PIDs can map to the same principal. The
- // principal is None when the framework doesn't specify it.
- // The differences between this map and 'authenticated' are:
- // 1) This map only includes *registered* frameworks. The mapping
- // is added when a framework (re-)registers.
- // 2) This map includes unauthenticated frameworks (when Master
- // allows them) if they have principals specified in
- // FrameworkInfo.
- hashmap<process::UPID, Option<std::string>> principals;
+ // Framework becomes disconnected when the socket closes.
+ bool connected;
- // BoundedRateLimiters keyed by the framework principal.
- // Like Metrics::Frameworks, all frameworks of the same principal
- // are throttled together at a common rate limit.
- hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters;
+ // Framework becomes deactivated when it is disconnected or
+ // the master receives a DeactivateFrameworkMessage.
+ // No offers will be made to a deactivated framework.
+ bool active;
- // The default limiter is for frameworks not specified in
- // 'flags.rate_limits'.
- Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
- } frameworks;
+ process::Time registeredTime;
+ process::Time reregisteredTime;
+ process::Time unregisteredTime;
- hashmap<OfferID, Offer*> offers;
- hashmap<OfferID, process::Timer> offerTimers;
+ // Tasks that have not yet been launched because they are currently
+ // being authorized.
+ hashmap<TaskID, TaskInfo> pendingTasks;
- hashmap<std::string, Role*> roles;
+ hashmap<TaskID, Task*> tasks;
- // Authenticator names as supplied via flags.
- std::vector<std::string> authenticatorNames;
+ // NOTE: We use a shared pointer for Task because clang doesn't like
+ // Boost's implementation of circular_buffer with Task (Boost
+ // attempts to do some memset's which are unsafe).
+ boost::circular_buffer<std::shared_ptr<Task>> completedTasks;
- Option<Authenticator*> authenticator;
+ hashset<Offer*> offers; // Active offers for framework.
- // Frameworks/slaves that are currently in the process of authentication.
- // 'authenticating' future is completed when authenticator
- // completes authentication.
- // The future is removed from the map when master completes authentication.
- hashmap<process::UPID, process::Future<Option<std::string>>> authenticating;
+ hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo>> executors;
- // Principals of authenticated frameworks/slaves keyed by PID.
- hashmap<process::UPID, std::string> authenticated;
+ // NOTE: For the used and offered resources below, we keep the
+ // total as well as partitioned by SlaveID.
+ // We expose the total resources via the HTTP endpoint, and we
+ // keep a running total of the resources because looping over the
+ // slaves to sum the resources has led to perf issues (MESOS-1862).
+ // We keep the resources partitioned by SlaveID because non-scalar
+ // resources can be lost when summing them up across multiple
+ // slaves (MESOS-2373).
+ //
+ // Also note that keeping the totals is safe even though it yields
+ // incorrect results for non-scalar resources.
+ // (1) For overlapping set items / ranges across slaves, these
+ // will get added N times but only represented once.
+ // (2) When an initial subtraction occurs (N-1), the resource is
+ // no longer represented. (This is the source of the bug).
+ // (3) When any further subtractions occur (N-(1+M)), the
+ // Resources simply ignores the subtraction since there's
+ // nothing to remove, so this is safe for now.
- int64_t nextFrameworkId; // Used to give each framework a unique ID.
- int64_t nextOfferId; // Used to give each slot offer a unique ID.
- int64_t nextSlaveId; // Used to give each slave a unique ID.
+ // TODO(mpark): Strip the non-scalar resources out of the totals
+ // in order to avoid reporting incorrect statistics (MESOS-2623).
- // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
- // thread safe.
- // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
- // copyable metric types only.
- std::shared_ptr<Metrics> metrics;
+ // Active task / executor resources.
+ Resources totalUsedResources;
+ hashmap<SlaveID, Resources> usedResources;
- // Gauge handlers.
- double _uptime_secs()
- {
- return (process::Clock::now() - startTime).secs();
- }
+ // Offered resources.
+ Resources totalOfferedResources;
+ hashmap<SlaveID, Resources> offeredResources;
- double _elected()
- {
- return elected() ? 1 : 0;
- }
+private:
+ Framework(const Framework&); // No copying.
+ Framework& operator = (const Framework&); // No assigning.
+};
+
+
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const Framework& framework)
+{
+ // TODO(vinod): Also log the hostname once FrameworkInfo is properly
+ // updated on framework failover (MESOS-1784).
+ return stream << framework.id() << " (" << framework.info.name()
+ << ") at " << framework.pid;
+}
- double _slaves_connected();
- double _slaves_disconnected();
- double _slaves_active();
- double _slaves_inactive();
- double _frameworks_connected();
- double _frameworks_disconnected();
- double _frameworks_active();
- double _frameworks_inactive();
+// Information about an active role.
+struct Role
+{
+ explicit Role(const mesos::master::RoleInfo& _info)
+ : info(_info) {}
- double _outstanding_offers()
+ void addFramework(Framework* framework)
{
- return offers.size();
+ frameworks[framework->id()] = framework;
}
- double _event_queue_messages()
+ void removeFramework(Framework* framework)
{
- return static_cast<double>(eventCount<process::MessageEvent>());
+ frameworks.erase(framework->id());
}
- double _event_queue_dispatches()
+ Resources resources() const
{
- return static_cast<double>(eventCount<process::DispatchEvent>());
- }
+ Resources resources;
+ foreachvalue (Framework* framework, frameworks) {
+ resources += framework->totalUsedResources;
+ resources += framework->totalOfferedResources;
+ }
- double _event_queue_http_requests()
- {
- return static_cast<double>(eventCount<process::HttpEvent>());
+ return resources;
}
- double _tasks_staging();
- double _tasks_starting();
- double _tasks_running();
+ mesos::master::RoleInfo info;
- double _resources_total(const std::string& name);
- double _resources_used(const std::string& name);
- double _resources_percent(const std::string& name);
+ hashmap<FrameworkID, Framework*> frameworks;
+};
- process::Time startTime; // Start time used to calculate uptime.
- Option<process::Time> electedTime; // Time when this master is elected.
+class Master : public ProtobufProcess<Master>
+{
+public:
+ Master(mesos::master::allocator::Allocator* allocator,
+ Registrar* registrar,
+ Repairer* repairer,
+ Files* files,
+ MasterContender* contender,
+ MasterDetector* detector,
+ const Option<Authorizer*>& authorizer,
+ const Option<std::shared_ptr<process::RateLimiter>>&
+ slaveRemovalLimiter,
+ const Flags& flags = Flags());
- // Validates the framework including authorization.
- // Returns None if the framework is valid.
- // Returns Error if the framework is invalid.
- // Returns Failure if authorization returns 'Failure'.
- process::Future<Option<Error>> validate(
+ virtual ~Master();
+
+ // Message handlers.
+ void submitScheduler(
+ const std::string& name);
+
+ void registerFramework(
+ const process::UPID& from,
+ const FrameworkInfo& frameworkInfo);
+
+ void reregisterFramework(
+ const process::UPID& from,
const FrameworkInfo& frameworkInfo,
- const process::UPID& from);
-};
+ bool failover);
+ void unregisterFramework(
+ const process::UPID& from,
+ const FrameworkID& frameworkId);
-struct Slave
-{
- Slave(const SlaveInfo& _info,
- const process::UPID& _pid,
- const Option<std::string> _version,
- const process::Time& _registeredTime,
- const Resources& _checkpointedResources,
- const std::vector<ExecutorInfo> executorInfos =
- std::vector<ExecutorInfo>(),
- const std::vector<Task> tasks =
- std::vector<Task>())
- : id(_info.id()),
- info(_info),
- pid(_pid),
- version(_version),
- registeredTime(_registeredTime),
- connected(true),
- active(true),
- checkpointedResources(_checkpointedResources),
- observer(NULL)
- {
- CHECK(_info.has_id());
+ void deactivateFramework(
+ const process::UPID& from,
+ const FrameworkID& frameworkId);
- Try<Resources> resources = applyCheckpointedResources(
- info.resources(),
- _checkpointedResources);
+ // TODO(vinod): Remove this once the old driver is removed.
+ void resourceRequest(
+ const process::UPID& from,
+ const FrameworkID& frameworkId,
+ const std::vector<Request>& requests);
- // NOTE: This should be validated during slave recovery.
- CHECK_SOME(resources);
- totalResources = resources.get();
+ void launchTasks(
+ const process::UPID& from,
+ const FrameworkID& frameworkId,
+ const std::vector<TaskInfo>& tasks,
+ const Filters& filters,
+ const std::vector<OfferID>& offerIds);
- foreach (const ExecutorInfo& executorInfo, executorInfos) {
- CHECK(executorInfo.has_framework_id());
- addExecutor(executorInfo.framework_id(), executorInfo);
- }
+ void reviveOffers(
+ const process::UPID& from,
+ const FrameworkID& frameworkId);
- foreach (const Task& task, tasks) {
- addTask(new Task(task));
- }
- }
+ void killTask(
+ const process::UPID& from,
+ const FrameworkID& frameworkId,
+ const TaskID& taskId);
- ~Slave() {}
+ void statusUpdateAcknowledgement(
+ const process::UPID& from,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const TaskID& taskId,
+ const std::string& uuid);
- Task* getTask(const FrameworkID& frameworkId, const TaskID& taskId)
- {
- if (tasks.contains(frameworkId) && tasks[frameworkId].contains(taskId)) {
- return tasks[frameworkId][taskId];
- }
- return NULL;
- }
+ void schedulerMessage(
+ const process::UPID& from,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const std::string& data);
- void addTask(Task* task)
- {
- const TaskID& taskId = task->task_id();
- const FrameworkID& frameworkId = task->framework_id();
+ void registerSlave(
+ const process::UPID& from,
+ const SlaveInfo& slaveInfo,
+ const std::vector<Resource>& checkpointedResources,
+ const std::string& version);
- CHECK(!tasks[frameworkId].contains(taskId))
- << "Duplicate task " << taskId << " of framework " << frameworkId;
+ void reregisterSlave(
+ const process::UPID& from,
+ const SlaveInfo& slaveInfo,
+ const std::vector<Resource>& checkpointedResources,
+ const std::vector<ExecutorInfo>& executorInfos,
+ const std::vector<Task>& tasks,
+ const std::vector<Archive::Framework>& completedFrameworks,
+ const std::string& version);
- tasks[frameworkId][taskId] = task;
+ void unregisterSlave(
+ const process::UPID& from,
+ const SlaveID& slaveId);
- if (!protobuf::isTerminalState(task->state())) {
- usedResources[frameworkId] += task->resources();
- }
+ void statusUpdate(
+ const StatusUpdate& update,
+ const process::UPID& pid);
+
+ void reconcileTasks(
+ const process::UPID& from,
+ const FrameworkID& frameworkId,
+ const std::vector<TaskStatus>& statuses);
+
+ void exitedExecutor(
+ const process::UPID& from,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int32_t status);
+
+ void shutdownSlave(
+ const SlaveID& slaveId,
+ const std::string& message);
+
+ void authenticate(
+ const process::UPID& from,
+ const process::UPID& pid);
+
+ // TODO(bmahler): It would be preferred to use a unique libprocess
+ // Process identifier (PID is not sufficient) for identifying the
+ // framework instance, rather than relying on re-registration time.
+ void frameworkFailoverTimeout(
+ const FrameworkID& frameworkId,
+ const process::Time& reregisteredTime);
+
+ void offer(
+ const FrameworkID& framework,
+ const hashmap<SlaveID, Resources>& resources);
+
+ // Invoked when there is a newly elected leading master.
+ // Made public for testing purposes.
+ void detected(const process::Future<Option<MasterInfo>>& pid);
+
+ // Invoked when the contender has lost the candidacy.
+ // Made public for testing purposes.
+ void lostCandidacy(const process::Future<Nothing>& lost);
+
+ // Continuation of recover().
+ // Made public for testing purposes.
+ process::Future<Nothing> _recover(const Registry& registry);
+
+ // Continuation of reregisterSlave().
+ // Made public for testing purposes.
+ // TODO(vinod): Instead of doing this create and use a
+ // MockRegistrar.
+ // TODO(dhamon): Consider FRIEND_TEST macro from gtest.
+ void _reregisterSlave(
+ const SlaveInfo& slaveInfo,
+ const process::UPID& pid,
+ const std::vector<Resource>& checkpointedResources,
+ const std::vector<ExecutorInfo>& executorInfos,
+ const std::vector<Task>& tasks,
+ const std::vector<Archive::Framework>& completedFrameworks,
+ const std::string& version,
+ const process::Future<bool>& readmit);
+
+ MasterInfo info() const
+ {
+ return info_;
+ }
+
+protected:
+ virtual void initialize();
+ virtual void finalize();
+ virtual void exited(const process::UPID& pid);
+ virtual void visit(const process::MessageEvent& event);
+ virtual void visit(const process::ExitedEvent& event);
- LOG(INFO) << "Adding task " << taskId
- << " with resources " << task->resources()
- << " on slave " << id << " (" << info.hostname() << ")";
- }
+ // Invoked when the message is ready to be executed after
+ // being throttled.
+ // 'principal' being None indicates it is throttled by
+ // 'defaultLimiter'.
+ void throttled(
+ const process::MessageEvent& event,
+ const Option<std::string>& principal);
- // Notification of task termination, for resource accounting.
- // TODO(bmahler): This is a hack for performance. We need to
- // maintain resource counters because computing task resources
- // functionally for all tasks is expensive, for now.
- void taskTerminated(Task* task)
- {
- const TaskID& taskId = task->task_id();
- const FrameworkID& frameworkId = task->framework_id();
+ // Continuations of visit().
+ void _visit(const process::MessageEvent& event);
+ void _visit(const process::ExitedEvent& event);
- CHECK(protobuf::isTerminalState(task->state()));
- CHECK(tasks[frameworkId].contains(taskId))
- << "Unknown task " << taskId << " of framework " << frameworkId;
+ // Helper method invoked when the capacity for a framework
+ // principal is exceeded.
+ void exceededCapacity(
+ const process::MessageEvent& event,
+ const Option<std::string>& principal,
+ uint64_t capacity);
- usedResources[frameworkId] -= task->resources();
- if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) {
- usedResources.erase(frameworkId);
- }
- }
+ // Recovers state from the registrar.
+ process::Future<Nothing> recover();
+ void recoveredSlavesTimeout(const Registry& registry);
- void removeTask(Task* task)
- {
- const TaskID& taskId = task->task_id();
- const FrameworkID& frameworkId = task->framework_id();
+ void _registerSlave(
+ const SlaveInfo& slaveInfo,
+ const process::UPID& pid,
+ const std::vector<Resource>& checkpointedResources,
+ const std::string& version,
+ const process::Future<bool>& admit);
- CHECK(tasks[frameworkId].contains(taskId))
- << "Unknown task " << taskId << " of framework " << frameworkId;
+ void __reregisterSlave(
+ Slave* slave,
+ const std::vector<Task>& tasks);
- if (!protobuf::isTerminalState(task->state())) {
- usedResources[frameworkId] -= task->resources();
- if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) {
- usedResources.erase(frameworkId);
- }
- }
+ // 'authenticate' is the future returned by the authenticator.
+ void _authenticate(
+ const process::UPID& pid,
+ const process::Future<Option<std::string>>& authenticate);
- tasks[frameworkId].erase(taskId);
- if (tasks[frameworkId].empty()) {
- tasks.erase(frameworkId);
- }
+ void authenticationTimeout(process::Future<Option<std::string>> future);
- killedTasks.remove(frameworkId, taskId);
- }
+ void fileAttached(const process::Future<Nothing>& result,
+ const std::string& path);
- void addOffer(Offer* offer)
- {
- CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
+ // Invoked when the contender has entered the contest.
+ void contended(const process::Future<process::Future<Nothing>>& candidacy);
- offers.insert(offer);
- offeredResources += offer->resources();
- }
+ // Task reconciliation, split from the message handler
+ // to allow re-use.
+ void _reconcileTasks(
+ Framework* framework,
+ const std::vector<TaskStatus>& statuses);
- void removeOffer(Offer* offer)
- {
- CHECK(offers.contains(offer)) << "Unknown offer " << offer->id();
+ // Handles a known re-registering slave by reconciling the master's
+ // view of the slave's tasks and executors.
+ void reconcile(
+ Slave* slave,
+ const std::vector<ExecutorInfo>& executors,
+ const std::vector<Task>& tasks);
- offeredResources -= offer->resources();
- offers.erase(offer);
- }
+ // 'registerFramework()' continuation.
+ void _registerFramework(
+ const process::UPID& from,
+ const FrameworkInfo& frameworkInfo,
+ const process::Future<Option<Error>>& validationError);
- bool hasExecutor(const FrameworkID& frameworkId,
- const ExecutorID& executorId) const
- {
- return executors.contains(frameworkId) &&
- executors.get(frameworkId).get().contains(executorId);
- }
+ // 'reregisterFramework()' continuation.
+ void _reregisterFramework(
+ const process::UPID& from,
+ const FrameworkInfo& frameworkInfo,
+ bool failover,
+ const process::Future<Option<Error>>& validationError);
- void addExecutor(const FrameworkID& frameworkId,
- const ExecutorInfo& executorInfo)
- {
- CHECK(!hasExecutor(frameworkId, executorInfo.executor_id()))
- << "Duplicate executor " << executorInfo.executor_id()
- << " of framework " << frameworkId;
+ // Add a framework.
+ void addFramework(Framework* framework);
- executors[frameworkId][executorInfo.executor_id()] = executorInfo;
- usedResources[frameworkId] += executorInfo.resources();
- }
+ // Replace the scheduler for a framework with a new process ID, in
+ // the event of a scheduler failover.
+ void failoverFramework(Framework* framework, const process::UPID& newPid);
- void removeExecutor(const FrameworkID& frameworkId,
- const ExecutorID& executorId)
- {
- CHECK(hasExecutor(frameworkId, executorId))
- << "Unknown executor " << executorId << " of framework " << frameworkId;
+ // Kill all of a framework's tasks, delete the framework object, and
+ // reschedule offers that were assigned to this framework.
+ void removeFramework(Framework* framework);
- usedResources[frameworkId] -=
- executors[frameworkId][executorId].resources();
+ // Remove a framework from the slave, i.e., remove its tasks and
+ // executors and recover the resources.
+ void removeFramework(Slave* slave, Framework* framework);
- // XXX Remove.
+ void disconnect(Framework* framework);
+ void deactivate(Framework* framework);
- executors[frameworkId].erase(executorId);
- if (executors[frameworkId].empty()) {
- executors.erase(frameworkId);
- }
- }
+ void disconnect(Slave* slave);
+ void deactivate(Slave* slave);
- void apply(const Offer::Operation& operation)
- {
- Try<Resources> resources = totalResources.apply(operation);
- CHECK_SOME(resources);
+ // Add a slave.
+ void addSlave(
+ Slave* slave,
+ const std::vector<Archive::Framework>& completedFrameworks =
+ std::vector<Archive::Framework>());
- totalResources = resources.get();
- checkpointedResources = totalResources.filter(needCheckpointing);
- }
+ // Remove the slave from the registrar. Called when the slave
+ // does not re-register in time after a master failover.
+ Nothing removeSlave(const Registry::Slave& slave);
- const SlaveID id;
- const SlaveInfo info;
+ // Remove the slave from the registrar and from the master's state.
+ //
+ // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
+ void removeSlave(
+ Slave* slave,
+ const std::string& message,
+ Option<process::metrics::Counter> reason = None());
- process::UPID pid;
+ void _removeSlave(
+ const SlaveInfo& slaveInfo,
+ const std::vector<StatusUpdate>& updates,
+ const process::Future<bool>& removed,
+ const std::string& message,
+ Option<process::metrics::Counter> reason = None());
- // The Mesos version of the slave. If set, the slave is >= 0.21.0.
- // TODO(bmahler): Use stout's Version when it can parse labels, etc.
- // TODO(bmahler): Make this required once it is always set.
- const Option<std::string> version;
+ // Authorizes the task.
+ // Returns true if task is authorized.
+ // Returns false if task is not authorized.
+ // Returns failure for transient authorization failures.
+ process::Future<bool> authorizeTask(
+ const TaskInfo& task,
+ Framework* framework);
- process::Time registeredTime;
- Option<process::Time> reregisteredTime;
+ // Add the task and its executor (if not already running) to the
+ // framework and slave. Returns the resources consumed as a result,
+ // which includes resources for the task and its executor
+ // (if not already running).
+ Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave);
- // Slave becomes disconnected when the socket closes.
- bool connected;
+ // Transitions the task, and recovers resources if the task becomes
+ // terminal.
+ void updateTask(Task* task, const StatusUpdate& update);
- // Slave becomes deactivated when it gets disconnected. In the
- // future this might also happen via HTTP endpoint.
- // No offers will be made for a deactivated slave.
- bool active;
+ // Removes the task.
+ void removeTask(Task* task);
- // Executors running on this slave.
- hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo>> executors;
+ // Remove an executor and recover its resources.
+ void removeExecutor(
+ Slave* slave,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
- // Tasks present on this slave.
- // TODO(bmahler): The task pointer ownership complexity arises from the fact
- // that we own the pointer here, but it's shared with the Framework struct.
- // We should find a way to eliminate this.
- hashmap<FrameworkID, hashmap<TaskID, Task*>> tasks;
+ // Updates slave's resources by applying the given operation. It
+ // also updates the allocator and sends a CheckpointResourcesMessage
+ // to the slave with slave's current checkpointed resources.
+ void applyOfferOperation(
+ Framework* framework,
+ Slave* slave,
+ const Offer::Operation& operation);
- // Tasks that were asked to kill by frameworks.
- // This is used for reconciliation when the slave re-registers.
- multihashmap<FrameworkID, TaskID> killedTasks;
+ // Forwards the update to the framework.
+ void forward(
+ const StatusUpdate& update,
+ const process::UPID& acknowledgee,
+ Framework* framework);
- // Active offers on this slave.
- hashset<Offer*> offers;
+ // Remove an offer after specified timeout
+ void offerTimeout(const OfferID& offerId);
- hashmap<FrameworkID, Resources> usedResources; // Active task / executors.
- Resources offeredResources; // Offers.
+ // Remove an offer and optionally rescind the offer as well.
+ void removeOffer(Offer* offer, bool rescind = false);
- // Resources that should be checkpointed by the slave (e.g.,
- // persistent volumes, dynamic reservations, etc). These are either
- // in use by a task/executor, or are available for use and will be
- // re-offered to the framework.
- Resources checkpointedResources;
+ Framework* getFramework(const FrameworkID& frameworkId);
+ Slave* getSlave(const SlaveID& slaveId);
+ Offer* getOffer(const OfferID& offerId);
- // The current total resources of the slave. Note that this is
- // different from 'info.resources()' because this also consider
- // operations (e.g., CREATE, RESERVE) that have been applied.
- Resources totalResources;
+ FrameworkID newFrameworkId();
+ OfferID newOfferId();
+ SlaveID newSlaveId();
- SlaveObserver* observer;
+ Option<Credentials> credentials;
private:
- Slave(const Slave&); // No copying.
- Slave& operator = (const Slave&); // No assigning.
-};
+ void drop(
+ const process::UPID& from,
+ const scheduler::Call& call,
+ const std::string& message);
+ void drop(
+ Framework* framework,
+ const Offer::Operation& operation,
+ const std::string& message);
-inline std::ostream& operator << (std::ostream& stream, const Slave& slave)
-{
- return stream << slave.id << " at " << slave.pid
- << " (" << slave.info.hostname() << ")";
-}
+ // Call handlers.
+ void receive(
+ const process::UPID& from,
+ const scheduler::Call& call);
+
+ void accept(
+ Framework* framework,
+ const scheduler::Call::Accept& accept);
+ void _accept(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const Resources& offeredResources,
+ const scheduler::Call::Accept& accept,
+ const process::Future<std::list<process::Future<bool>>>& authorizations);
-// Information about a connected or completed framework.
-// TODO(bmahler): Keeping the task and executor information in sync
-// across the Slave and Framework structs is error prone!
-struct Framework
-{
- Framework(const FrameworkInfo& _info,
- const process::UPID& _pid,
- const process::Time& time = process::Clock::now())
- : info(_info),
- pid(_pid),
- connected(true),
- active(true),
- registeredTime(time),
- reregisteredTime(time),
- completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {}
+ void reconcile(
+ Framework* framework,
+ const scheduler::Call::Reconcile& reconcile);
- ~Framework() {}
+ void kill(
+ Framework* framework,
+ const scheduler::Call::Kill& kill);
- Task* getTask(const TaskID& taskId)
+ void shutdown(
+ Framework* framework,
+ const scheduler::Call::Shutdown& shutdown);
+
+ bool elected() const
{
- if (tasks.count(taskId) > 0) {
- return tasks[taskId];
- } else {
- return NULL;
- }
+ return leader.isSome() && leader.get() == info_;
}
- void addTask(Task* task)
+ // Inner class used to namespace HTTP route handlers (see
+ // master/http.cpp for implementations).
+ class Http
{
- CHECK(!tasks.contains(task->task_id()))
- << "Duplicate task " << task->task_id()
- << " of framework " << task->framework_id();
+ public:
+ explicit Http(Master* _master) : master(_master) {}
- tasks[task->task_id()] = task;
+ // Logs the request, route handlers can compose this with the
+ // desired request handler to get consistent request logging.
+ static void log(const process::http::Request& request);
- if (!protobuf::isTerminalState(task->state())) {
- totalUsedResources += task->resources();
- usedResources[task->slave_id()] += task->resources();
- }
- }
+ // /master/health
+ process::Future<process::http::Response> health(
+ const process::http::Request& request) const;
- // Notification of task termination, for resource accounting.
- // TODO(bmahler): This is a hack for performance. We need to
- // maintain resource counters because computing task resources
- // functionally for all tasks is expensive, for now.
- void taskTerminated(Task* task)
- {
- CHECK(protobuf::isTerminalState(task->state()));
- CHECK(tasks.contains(task->task_id()))
- << "Unknown task " << task->task_id()
- << " of framework " << task->framework_id();
+ // /master/observe
+ process::Future<process::http::Response> observe(
+ const process::http::Request& request) const;
- totalUsedResources -= task->resources();
- usedResources[task->slave_id()] -= task->resources();
- if (usedResources[task->slave_id()].empty()) {
- usedResources.erase(task->slave_id());
- }
- }
+ // /master/redirect
+ process::Future<process::http::Response> redirect(
+ const process::http::Request& request) const;
- void addCompletedTask(const Task& task)
- {
- // TODO(adam-mesos): Check if completed task already exists.
- completedTasks.push_back(std::shared_ptr<Task>(new Task(task)));
- }
+ // /master/roles.json
+ process::Future<process::http::Response> roles(
+ const process::http::Request& request) const;
- void removeTask(Task* task)
- {
- CHECK(tasks.contains(task->task_id()))
- << "Unknown task " << task->task_id()
- << " of framework " << task->framework_id();
+ // /master/teardown and /master/shutdown (deprecated).
+ process::Future<process::http::Response> teardown(
+ const process::http::Request& request) const;
- if (!protobuf::isTerminalState(task->state())) {
- totalUsedResources -= task->resources();
- usedResources[task->slave_id()] -= task->resources();
- if (usedResources[task->slave_id()].empty()) {
- usedResources.erase(task->slave_id());
- }
- }
+ // /master/slaves
+ process::Future<process::http::Response> slaves(
+ const process::http::Request& request) const;
- addCompletedTask(*task);
+ // /master/state.json
+ process::Future<process::http::Response> state(
+ const process::http::Request& request) const;
- tasks.erase(task->task_id());
- }
+ // /master/state-summary
+ process::Future<process::http::Response> stateSummary(
+ const process::http::Request& request) const;
- void addOffer(Offer* offer)
- {
- CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
- offers.insert(offer);
- totalOfferedResources += offer->resources();
- offeredResources[offer->slave_id()] += offer->resources();
- }
+ // /master/tasks.json
+ process::Future<process::http::Response> tasks(
+ const process::http::Request& request) const;
- void removeOffer(Offer* offer)
- {
- CHECK(offers.find(offer) != offers.end())
- << "Unknown offer " << offer->id();
+ const static std::string HEALTH_HELP;
+ const static std::string OBSERVE_HELP;
+ const static std::string REDIRECT_HELP;
+ const static std::string SHUTDOWN_HELP; // Deprecated.
+ const static std::string TEARDOWN_HELP;
+ const static std::string SLAVES_HELP;
+ const static std::string TASKS_HELP;
- totalOfferedResources -= offer->resources();
- offeredResources[offer->slave_id()] -= offer->resources();
- if (offeredResources[offer->slave_id()].empty()) {
- offeredResources.erase(offer->slave_id());
- }
+ private:
+ // Helper for doing authentication, returns the credential used if
+ // the authentication was successful (or none if no credentials
+ // have been given to the master), otherwise an Error.
+ Result<Credential> authenticate(
+ const process::http::Request& request) const;
- offers.erase(offer);
- }
+ // Continuations.
+ process::Future<process::http::Response> _teardown(
+ const FrameworkID& id,
+ bool authorized = true) const;
- bool hasExecutor(const SlaveID& slaveId,
- const ExecutorID& executorId)
- {
- return executors.contains(slaveId) &&
- executors[slaveId].contains(executorId);
- }
+ Master* master;
+ };
- void addExecutor(const SlaveID& slaveId,
- const ExecutorInfo& executorInfo)
- {
- CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
- << "Duplicate executor " << executorInfo.executor_id()
- << " on slave " << slaveId;
+ Master(const Master&); // No copying.
+ Master& operator = (const Master&); // No assigning.
- executors[slaveId][executorInfo.executor_id()] = executorInfo;
- totalUsedResources += executorInfo.resources();
- usedResources[slaveId] += executorInfo.resources();
- }
+ friend struct Metrics;
- void removeExecutor(const SlaveID& slaveId,
- const ExecutorID& executorId)
- {
- CHECK(hasExecutor(slaveId, executorId))
- << "Unknown executor " << executorId
- << " of framework " << id()
- << " of slave " << slaveId;
+ // NOTE: Since 'getOffer' and 'getSlave' are protected, we need to
+ // make the following functions friends so that validation functions
+ // can get Offer* and Slave*.
+ friend Offer* validation::offer::getOffer(
+ Master* master, const OfferID& offerId);
- totalUsedResources -= executors[slaveId][executorId].resources();
- usedResources[slaveId] -= executors[slaveId][executorId].resources();
- if (usedResources[slaveId].empty()) {
- usedResources.erase(slaveId);
- }
+ friend Slave* validation::offer::getSlave(
+ Master* master, const SlaveID& slaveId);
- executors[slaveId].erase(executorId);
- if (executors[slaveId].empty()) {
- executors.erase(slaveId);
- }
- }
+ const Flags flags;
- const FrameworkID id() const { return info.id(); }
+ Option<MasterInfo> leader; // Current leading master.
- // Update fields in 'info' using those in 'source'. Currently this
- // only updates 'name', 'failover_timeout', 'hostname', and
- // 'webui_url'.
- void updateFrameworkInfo(const FrameworkInfo& source)
- {
- // TODO(jmlvanre): We can't check 'FrameworkInfo.id' yet because
- // of MESOS-2559. Once this is fixed we can 'CHECK' that we only
- // merge 'info' from the same framework 'id'.
+ mesos::master::allocator::Allocator* allocator;
+ WhitelistWatcher* whitelistWatcher;
+ Registrar* registrar;
+ Repairer* repairer;
+ Files* files;
+
+ MasterContender* contender;
+ MasterDetector* detector;
+
+ const Option<Authorizer*> authorizer;
+
+ MasterInfo info_;
- // TODO(jmlvanre): Merge other fields as per design doc in
- // MESOS-703.
+ // Indicates when recovery is complete. Recovery begins once the
+ // master is elected as a leader.
+ Option<process::Future<Nothing>> recovered;
- if (source.user() != info.user()) {
- LOG(WARNING) << "Can not update FrameworkInfo.user to '" << info.user()
- << "' for framework " << id() << ". Check MESOS-703";
- }
+ struct Slaves
+ {
+ Slaves() : removed(MAX_REMOVED_SLAVES) {}
- info.set_name(source.name());
+ // Imposes a time limit for slaves that we recover from the
+ // registry to re-register with the master.
+ Option<process::Timer> recoveredTimer;
- if (source.has_failover_timeout()) {
- info.set_failover_timeout(source.failover_timeout());
- } else {
- info.clear_failover_timeout();
- }
+ // Slaves that have been recovered from the registrar but have yet
+ // to re-register. We keep a "reregistrationTimer" above to ensure
+ // we remove these slaves if they do not re-register.
+ hashset<SlaveID> recovered;
- if (source.checkpoint() != info.checkpoint()) {
- LOG(WARNING) << "Can not update FrameworkInfo.checkpoint to '"
- << stringify(info.checkpoint()) << "' for framework " << id()
- << ". Check MESOS-703";
- }
+ // Slaves that are in the process of registering.
+ hashset<process::UPID> registering;
- if (source.role() != info.role()) {
- LOG(WARNING) << "Can not update FrameworkInfo.role to '" << info.role()
- << "' for framework " << id() << ". Check MESOS-703";
- }
+ // Only those slaves that are re-registering for the first time
+ // with this master. We must not answer questions related to
+ // these slaves until the registrar determines their fate.
+ hashset<SlaveID> reregistering;
- if (source.has_hostname()) {
- info.set_hostname(source.hostname());
- } else {
- info.clear_hostname();
- }
+ hashmap<SlaveID, Slave*> registered;
- if (source.principal() != info.principal()) {
- LOG(WARNING) << "Can not update FrameworkInfo.principal to '"
- << info.principal() << "' for framework " << id()
- << ". Check MESOS-703";
- }
+ // Slaves that are in the process of being removed from the
+ // registrar. Think of these as being partially removed: we must
+ // not answer questions related to these until they are removed
+ // from the registry.
+ hashset<SlaveID> removing;
- if (source.has_webui_url()) {
- info.set_webui_url(source.webui_url());
- } else {
- info.clear_webui_url();
- }
- }
+ // We track removed slaves to preserve the consistency
+ // semantics of the pre-registrar code when a non-strict registrar
+ // is being used. That is, if we remove a slave, we must make
+ // an effort to prevent it from (re-)registering, sending updates,
+ // etc. We keep a cache here to prevent this from growing in an
+ // unbounded manner.
+ // TODO(bmahler): Ideally we could use a cache with set semantics.
+ Cache<SlaveID, Nothing> removed;
- FrameworkInfo info;
+ // This rate limiter is used to limit the removal of slaves failing
+ // health checks.
+ // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
+ // a wrapper around libprocess process which is thread safe.
+ Option<std::shared_ptr<process::RateLimiter>> limiter;
- process::UPID pid;
+ bool transitioning(const Option<SlaveID>& slaveId)
+ {
+ if (slaveId.isSome()) {
+ return recovered.contains(slaveId.get()) ||
+ reregistering.contains(slaveId.get()) ||
+ removing.contains(slaveId.get());
+ } else {
+ return !recovered.empty() ||
+ !reregistering.empty() ||
+ !removing.empty();
+ }
+ }
+ } slaves;
- // Framework becomes disconnected when the socket closes.
- bool connected;
+ struct Frameworks
+ {
+ Frameworks() : completed(MAX_COMPLETED_FRAMEWORKS) {}
- // Framework becomes deactivated when it is disconnected or
- // the master receives a DeactivateFrameworkMessage.
- // No offers will be made to a deactivated framework.
- bool active;
+ hashmap<FrameworkID, Framework*> registered;
+ boost::circular_buffer<std::shared_ptr<Framework>> completed;
- process::Time registeredTime;
- process::Time reregisteredTime;
- process::Time unregisteredTime;
+ // Principals of frameworks keyed by PID.
+ // NOTE: Multiple PIDs can map to the same principal. The
+ // principal is None when the framework doesn't specify it.
+ // The differences between this map and 'authenticated' are:
+ // 1) This map only includes *registered* frameworks. The mapping
+ // is added when a framework (re-)registers.
+ // 2) This map includes unauthenticated frameworks (when Master
+ // allows them) if they have principals specified in
+ // FrameworkInfo.
+ hashmap<process::UPID, Option<std::string>> principals;
- // Tasks that have not yet been launched because they are currently
- // being authorized.
- hashmap<TaskID, TaskInfo> pendingTasks;
+ // BoundedRateLimiters keyed by the framework principal.
+ // Like Metrics::Frameworks, all frameworks of the same principal
+ // are throttled together at a common rate limit.
+ hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters;
- hashmap<TaskID, Task*> tasks;
+ // The default limiter is for frameworks not specified in
+ // 'flags.rate_limits'.
+ Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
+ } frameworks;
- // NOTE: We use a shared pointer for Task because clang doesn't like
- // Boost's implementation of circular_buffer with Task (Boost
- // attempts to do some memset's which are unsafe).
- boost::circular_buffer<std::shared_ptr<Task>> completedTasks;
+ hashmap<OfferID, Offer*> offers;
+ hashmap<OfferID, process::Timer> offerTimers;
- hashset<Offer*> offers; // Active offers for framework.
+ hashmap<std::string, Role*> roles;
- hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo>> executors;
+ // Authenticator names as supplied via flags.
+ std::vector<std::string> authenticatorNames;
- // NOTE: For the used and offered resources below, we keep the
- // total as well as partitioned by SlaveID.
- // We expose the total resources via the HTTP endpoint, and we
- // keep a running total of the resources because looping over the
- // slaves to sum the resources has led to perf issues (MESOS-1862).
- // We keep the resources partitioned by SlaveID because non-scalar
- // resources can be lost when summing them up across multiple
- // slaves (MESOS-2373).
- //
- // Also note that keeping the totals is safe even though it yields
- // incorrect results for non-scalar resources.
- // (1) For overlapping set items / ranges across slaves, these
- // will get added N times but only represented once.
- // (2) When an initial subtraction occurs (N-1), the resource is
- // no longer represented. (This is the source of the bug).
- // (3) When any further subtractions occur (N-(1+M)), the
- // Resources simply ignores the subtraction since there's
- // nothing to remove, so this is safe for now.
+ Option<Authenticator*> authenticator;
- // TODO(mpark): Strip the non-scalar resources out of the totals
- // in order to avoid reporting incorrect statistics (MESOS-2623).
+ // Frameworks/slaves that are currently in the process of authentication.
+ // 'authenticating' future is completed when authenticator
+ // completes authentication.
+ // The future is removed from the map when master completes authentication.
+ hashmap<process::UPID, process::Future<Option<std::string>>> authenticating;
- // Active task / executor resources.
- Resources totalUsedResources;
- hashmap<SlaveID, Resources> usedResources;
+ // Principals of authenticated frameworks/slaves keyed by PID.
+ hashmap<process::UPID, std::string> authenticated;
- // Offered resources.
- Resources totalOfferedResources;
- hashmap<SlaveID, Resources> offeredResources;
+ int64_t nextFrameworkId; // Used to give each framework a unique ID.
+ int64_t nextOfferId; // Used to give each slot offer a unique ID.
+ int64_t nextSlaveId; // Used to give each slave a unique ID.
-private:
- Framework(const Framework&); // No copying.
- Framework& operator = (const Framework&); // No assigning.
-};
+ // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
+ // thread safe.
+ // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
+ // copyable metric types only.
+ std::shared_ptr<Metrics> metrics;
+ // Gauge handlers.
+ double _uptime_secs()
+ {
+ return (process::Clock::now() - startTime).secs();
+ }
-inline std::ostream& operator << (
- std::ostream& stream,
- const Framework& framework)
-{
- // TODO(vinod): Also log the hostname once FrameworkInfo is properly
- // updated on framework failover (MESOS-1784).
- return stream << framework.id() << " (" << framework.info.name()
- << ") at " << framework.pid;
-}
+ double _elected()
+ {
+ return elected() ? 1 : 0;
+ }
+ double _slaves_connected();
+ double _slaves_disconnected();
+ double _slaves_active();
+ double _slaves_inactive();
-// Information about an active role.
-struct Role
-{
- explicit Role(const mesos::master::RoleInfo& _info)
- : info(_info) {}
+ double _frameworks_connected();
+ double _frameworks_disconnected();
+ double _frameworks_active();
+ double _frameworks_inactive();
- void addFramework(Framework* framework)
+ double _outstanding_offers()
{
- frameworks[framework->id()] = framework;
+ return offers.size();
}
- void removeFramework(Framework* framework)
+ double _event_queue_messages()
{
- frameworks.erase(framework->id());
+ return static_cast<double>(eventCount<process::MessageEvent>());
}
- Resources resources() const
+ double _event_queue_dispatches()
{
- Resources resources;
- foreachvalue (Framework* framework, frameworks) {
- resources += framework->totalUsedResources;
- resources += framework->totalOfferedResources;
- }
+ return static_cast<double>(eventCount<process::DispatchEvent>());
+ }
- return resources;
+ double _event_queue_http_requests()
+ {
+ return static_cast<double>(eventCount<process::HttpEvent>());
}
- mesos::master::RoleInfo info;
+ double _tasks_staging();
+ double _tasks_starting();
+ double _tasks_running();
- hashmap<FrameworkID, Framework*> frameworks;
+ double _resources_total(const std::string& name);
+ double _resources_used(const std::string& name);
+ double _resources_percent(const std::string& name);
+
+ process::Time startTime; // Start time used to calculate uptime.
+
+ Option<process::Time> electedTime; // Time when this master is elected.
+
+ // Validates the framework including authorization.
+ // Returns None if the framework is valid.
+ // Returns Error if the framework is invalid.
+ // Returns Failure if authorization returns 'Failure'.
+ process::Future<Option<Error>> validate(
+ const FrameworkInfo& frameworkInfo,
+ const process::UPID& from);
};
[3/3] mesos git commit: Removed Master::getSlave.
Posted by bm...@apache.org.
Removed Master::getSlave.
Review: https://reviews.apache.org/r/34389
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c24268f1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c24268f1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c24268f1
Branch: refs/heads/master
Commit: c24268f13a2307ef12bfea794a48786338b422fe
Parents: 42cf03a
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon May 18 18:41:38 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue May 19 11:55:31 2015 -0700
----------------------------------------------------------------------
src/master/master.cpp | 43 +++++++++++++++++++++---------------------
src/master/master.hpp | 6 ++----
src/master/validation.cpp | 2 +-
3 files changed, 24 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c24268f1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d2df99c..1526f59 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2403,7 +2403,8 @@ void Master::accept(
}
CHECK_SOME(slaveId);
- Slave* slave = CHECK_NOTNULL(getSlave(slaveId.get()));
+ Slave* slave = slaves.registered.get(slaveId.get());
+ CHECK_NOTNULL(slave);
LOG(INFO) << "Processing ACCEPT call for offers: " << accept.offer_ids()
<< " on slave " << *slave << " for framework " << *framework;
@@ -2477,7 +2478,7 @@ void Master::_accept(
return;
}
- Slave* slave = getSlave(slaveId);
+ Slave* slave = slaves.registered.get(slaveId);
if (slave == NULL || !slave->connected) {
foreach (const Offer::Operation& operation, accept.operations()) {
@@ -2870,7 +2871,7 @@ void Master::kill(Framework* framework, const scheduler::Call::Kill& kill)
return;
}
- Slave* slave = getSlave(task->slave_id());
+ Slave* slave = slaves.registered.get(task->slave_id());
CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
// We add the task to 'killedTasks' here because the slave
@@ -2934,7 +2935,7 @@ void Master::statusUpdateAcknowledgement(
return;
}
- Slave* slave = getSlave(slaveId);
+ Slave* slave = slaves.registered.get(slaveId);
if (slave == NULL) {
LOG(WARNING)
@@ -3030,7 +3031,8 @@ void Master::schedulerMessage(
return;
}
- Slave* slave = getSlave(slaveId);
+ Slave* slave = slaves.registered.get(slaveId);
+
if (slave == NULL) {
LOG(WARNING) << "Cannot send framework message for framework "
<< *framework << " to slave " << slaveId
@@ -3256,7 +3258,7 @@ void Master::reregisterSlave(
return;
}
- Slave* slave = getSlave(slaveInfo.id());
+ Slave* slave = slaves.registered.get(slaveInfo.id());
if (slave != NULL) {
slave->reregisteredTime = Clock::now();
@@ -3435,7 +3437,7 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
LOG(INFO) << "Asked to unregister slave " << slaveId;
- Slave* slave = getSlave(slaveId);
+ Slave* slave = slaves.registered.get(slaveId);
if (slave != NULL) {
if (slave->pid != from) {
@@ -3473,7 +3475,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
return;
}
- Slave* slave = getSlave(update.slave_id());
+ Slave* slave = slaves.registered.get(update.slave_id());
if (slave == NULL) {
LOG(WARNING) << "Ignoring status update " << update
@@ -4457,7 +4459,8 @@ void Master::removeFramework(Framework* framework)
// Remove pointers to the framework's tasks in slaves.
foreachvalue (Task* task, utils::copy(framework->tasks)) {
- Slave* slave = getSlave(task->slave_id());
+ Slave* slave = slaves.registered.get(task->slave_id());
+
// Since we only find out about tasks when the slave re-registers,
// it must be the case that the slave exists!
CHECK(slave != NULL)
@@ -4501,7 +4504,8 @@ void Master::removeFramework(Framework* framework)
// Remove the framework's executors for correct resource accounting.
foreachkey (const SlaveID& slaveId, utils::copy(framework->executors)) {
- Slave* slave = getSlave(slaveId);
+ Slave* slave = slaves.registered.get(slaveId);
+
if (slave != NULL) {
foreachkey (const ExecutorID& executorId,
utils::copy(framework->executors[slaveId])) {
@@ -4896,7 +4900,9 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
None());
// The slave owns the Task object and cannot be NULL.
- Slave* slave = CHECK_NOTNULL(getSlave(task->slave_id()));
+ Slave* slave = slaves.registered.get(task->slave_id());
+ CHECK_NOTNULL(slave);
+
slave->taskTerminated(task);
Framework* framework = getFramework(task->framework_id());
@@ -4928,7 +4934,8 @@ void Master::removeTask(Task* task)
CHECK_NOTNULL(task);
// The slave owns the Task object and cannot be NULL.
- Slave* slave = CHECK_NOTNULL(getSlave(task->slave_id()));
+ Slave* slave = slaves.registered.get(task->slave_id());
+ CHECK_NOTNULL(slave);
if (!protobuf::isTerminalState(task->state())) {
LOG(WARNING) << "Removing task " << task->task_id()
@@ -5040,7 +5047,8 @@ void Master::removeOffer(Offer* offer, bool rescind)
framework->removeOffer(offer);
// Remove from slave.
- Slave* slave = getSlave(offer->slave_id());
+ Slave* slave = slaves.registered.get(offer->slave_id());
+
CHECK(slave != NULL)
<< "Unknown slave " << offer->slave_id()
<< " in the offer " << offer->id();
@@ -5076,15 +5084,6 @@ Framework* Master::getFramework(const FrameworkID& frameworkId)
// TODO(bmahler): Consider killing this.
-Slave* Master::getSlave(const SlaveID& slaveId)
-{
- return slaves.registered.contains(slaveId)
- ? slaves.registered[slaveId]
- : NULL;
-}
-
-
-// TODO(bmahler): Consider killing this.
Offer* Master::getOffer(const OfferID& offerId)
{
return offers.contains(offerId) ? offers[offerId] : NULL;
http://git-wip-us.apache.org/repos/asf/mesos/blob/c24268f1/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 4a94e23..c8c6251 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -975,7 +975,6 @@ protected:
void removeOffer(Offer* offer, bool rescind = false);
Framework* getFramework(const FrameworkID& frameworkId);
- Slave* getSlave(const SlaveID& slaveId);
Offer* getOffer(const OfferID& offerId);
FrameworkID newFrameworkId();
@@ -1103,9 +1102,8 @@ private:
friend struct Metrics;
- // NOTE: Since 'getOffer' and 'getSlave' are protected, we need to
- // make the following functions friends so that validation functions
- // can get Offer* and Slave*.
+ // NOTE: Since 'getOffer' and 'slaves' are protected,
+ // we need to make the following functions friends.
friend Offer* validation::offer::getOffer(
Master* master, const OfferID& offerId);
http://git-wip-us.apache.org/repos/asf/mesos/blob/c24268f1/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 20a6ac8..1793b0e 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -447,7 +447,7 @@ Offer* getOffer(Master* master, const OfferID& offerId)
Slave* getSlave(Master* master, const SlaveID& slaveId)
{
CHECK_NOTNULL(master);
- return master->getSlave(slaveId);
+ return master->slaves.registered.get(slaveId);
}
Re: [1/3] mesos git commit: Index slaves by UPID in the master.
Posted by James Peach <jo...@gmail.com>.
> On May 19, 2015, at 12:22 PM, bmahler@apache.org wrote:
>
> Repository: mesos
> Updated Branches:
> refs/heads/master 26091f461 -> c24268f13
>
>
> Index slaves by UPID in the master.
>
> Review: https://reviews.apache.org/r/34388
>
>
> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42cf03af
> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42cf03af
> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42cf03af
>
> Branch: refs/heads/master
> Commit: 42cf03af66f2691d04e5c88ac7e098625d38e0bf
> Parents: b19ffd2
> Author: Benjamin Mahler <be...@gmail.com>
> Authored: Mon May 18 18:37:11 2015 -0700
> Committer: Benjamin Mahler <be...@gmail.com>
> Committed: Tue May 19 11:55:30 2015 -0700
>
> ----------------------------------------------------------------------
> src/master/master.cpp | 127 +++++++++++++++++++++++----------------------
> src/master/master.hpp | 65 ++++++++++++++++++++++-
> 2 files changed, 129 insertions(+), 63 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/mesos/blob/42cf03af/src/master/master.cpp
> ----------------------------------------------------------------------
> diff --git a/src/master/master.cpp b/src/master/master.cpp
> index eaea79d..d2df99c 100644
> --- a/src/master/master.cpp
> +++ b/src/master/master.cpp
> @@ -973,7 +973,9 @@ void Master::exited(const UPID& pid)
> }
> }
>
> - // The semantics when a slave gets disconnected are as follows:
> + // The semantics when a registered slave gets disconnected are as
> + // follows:
> + //
> // 1) If the slave is not checkpointing, the slave is immediately
> // removed and all tasks running on it are transitioned to LOST.
> // No resources are recovered, because the slave is removed.
> @@ -985,42 +987,42 @@ void Master::exited(const UPID& pid)
> // 2.2) Framework is not-checkpointing: The slave is not removed
> // but the framework is removed from the slave's structs,
> // its tasks transitioned to LOST and resources recovered.
> - foreachvalue (Slave* slave, slaves.registered) {
> - if (slave->pid == pid) {
> - LOG(INFO) << "Slave " << *slave << " disconnected";
> -
> - if (!slave->info.checkpoint()) {
> - // Remove the slave, if it is not checkpointing.
> - LOG(INFO) << "Removing disconnected slave " << *slave
> - << " because it is not checkpointing!";
> - removeSlave(slave,
> - "slave is non-checkpointing and disconnected");
> - return;
> - } else if (slave->connected) {
> - // Checkpointing slaves can just be disconnected.
> - disconnect(slave);
> + if (slaves.registered.contains(pid)) {
> + Slave* slave = slaves.registered.get(pid);
> + CHECK_NOTNULL(slave);
Would it be better to do this:
if (Slave* slave = slaves.registered.get(pid)) {
J