You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2017/03/07 01:26:11 UTC
[5/7] mesos git commit: Updated the master to handle frameworks that
changes its roles.
Updated the master to handle frameworks that changes its roles.
Review: https://reviews.apache.org/r/57110
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/68bb8d1f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/68bb8d1f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/68bb8d1f
Branch: refs/heads/master
Commit: 68bb8d1f0eb13e910906fde2879900ac433b3500
Parents: 98c7722
Author: Michael Park <mp...@apache.org>
Authored: Sun Mar 5 19:01:29 2017 -0800
Committer: Michael Park <mp...@apache.org>
Committed: Mon Mar 6 16:06:15 2017 -0800
----------------------------------------------------------------------
src/master/master.cpp | 168 +++++++++++++++---------------
src/master/master.hpp | 252 ++++++++++++++++++++++++++++++---------------
2 files changed, 256 insertions(+), 164 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/68bb8d1f/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d6d954e..dd1e4cd 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -382,6 +382,56 @@ struct BoundedRateLimiter
};
+bool Framework::isTrackedUnderRole(const std::string& role) const
+{
+ CHECK(master->isWhitelistedRole(role))
+ << "Unknown role '" << role << "'" << " of framework " << *this;
+
+ return master->roles.contains(role) &&
+ master->roles.at(role)->frameworks.contains(id());
+}
+
+void Framework::trackUnderRole(const std::string& role)
+{
+ CHECK(master->isWhitelistedRole(role))
+ << "Unknown role '" << role << "'" << " of framework " << *this;
+
+ CHECK(!isTrackedUnderRole(role));
+
+ CHECK(roles.count(role) > 0);
+
+ if (!master->roles.contains(role)) {
+ master->roles[role] = new Role(role);
+ }
+ master->roles.at(role)->addFramework(this);
+}
+
+void Framework::untrackUnderRole(const std::string& role)
+{
+ CHECK(master->isWhitelistedRole(role))
+ << "Unknown role '" << role << "'" << " of framework " << *this;
+
+ CHECK(isTrackedUnderRole(role));
+
+ // NOTE: Ideally we would also `CHECK` that we're not currently subscribed
+ // to the role. We don't do this currently because this function is used in
+ // `Master::removeFramework` where we're still subscribed to `roles`.
+
+ auto allocatedToRole = [&role](const Resource& resource) {
+ return resource.allocation_info().role() == role;
+ };
+
+ CHECK(totalUsedResources.filter(allocatedToRole).empty());
+ CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+
+ master->roles.at(role)->removeFramework(this);
+ if (master->roles.at(role)->frameworks.empty()) {
+ delete master->roles.at(role);
+ master->roles.erase(role);
+ }
+}
+
+
void Master::initialize()
{
LOG(INFO) << "Master " << info_.id() << " (" << info_.hostname() << ")"
@@ -2641,24 +2691,8 @@ void Master::_subscribe(
if (!framework->recovered()) {
// The framework has previously been registered with this master;
// it may or may not currently be connected.
- LOG(INFO) << "Updating info for framework " << framework->id();
-
- Try<Nothing> updateFrameworkInfo =
- framework->updateFrameworkInfo(frameworkInfo);
-
- if (updateFrameworkInfo.isError()) {
- LOG(INFO) << "Could not update FrameworkInfo of framework '"
- << frameworkInfo.name() << "': " << updateFrameworkInfo.error();
-
- FrameworkErrorMessage message;
- message.set_message(updateFrameworkInfo.error());
- http.send(message);
- http.close();
- return;
- }
-
- allocator->updateFramework(framework->id(), framework->info);
+ updateFramework(framework, frameworkInfo);
framework->reregisteredTime = Clock::now();
// Always failover the old framework connection. See MESOS-4712 for details.
@@ -2951,22 +2985,7 @@ void Master::_subscribe(
// It is now safe to update the framework fields since the request is now
// guaranteed to be successful. We use the fields passed in during
// re-registration.
- LOG(INFO) << "Updating info for framework " << framework->id();
-
- Try<Nothing> updateFrameworkInfo =
- framework->updateFrameworkInfo(frameworkInfo);
-
- if (updateFrameworkInfo.isError()) {
- LOG(INFO) << "Could not update frameworkInfo of framework '" << *framework
- << "': " << updateFrameworkInfo.error();
-
- FrameworkErrorMessage message;
- message.set_message(updateFrameworkInfo.error());
- send(from, message);
- return;
- }
-
- allocator->updateFramework(framework->id(), framework->info);
+ updateFramework(framework, frameworkInfo);
framework->reregisteredTime = Clock::now();
@@ -6036,6 +6055,36 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
}
+void Master::updateFramework(
+ Framework* framework,
+ const FrameworkInfo& frameworkInfo)
+{
+ LOG(INFO) << "Updating info for framework " << framework->id();
+
+ // NOTE: The allocator takes care of activating/deactivating
+ // the frameworks from the added/removed roles, respectively.
+ allocator->updateFramework(framework->id(), frameworkInfo);
+
+ // First, remove the offers allocated to roles being removed.
+ foreach (Offer* offer, utils::copy(framework->offers)) {
+ set<string> newRoles = protobuf::framework::getRoles(frameworkInfo);
+ if (newRoles.count(offer->allocation_info().role()) > 0) {
+ continue;
+ }
+
+ allocator->recoverResources(
+ offer->framework_id(),
+ offer->slave_id(),
+ offer->resources(),
+ None());
+
+ removeOffer(offer, true); // Rescind!
+ }
+
+ framework->update(frameworkInfo);
+}
+
+
void Master::updateSlave(
const SlaveID& slaveId,
const Resources& oversubscribedResources)
@@ -7458,25 +7507,6 @@ void Master::addFramework(Framework* framework)
}
}
- auto addFrameworkRole = [this](Framework* framework, const string& role) {
- CHECK(isWhitelistedRole(role))
- << "Unknown role '" << role << "'"
- << " of framework " << *framework;
-
- if (!roles.contains(role)) {
- roles[role] = new Role(role);
- }
- roles.at(role)->addFramework(framework);
- };
-
- if (framework->capabilities.multiRole) {
- foreach (const string& role, framework->info.roles()) {
- addFrameworkRole(framework, role);
- }
- } else {
- addFrameworkRole(framework, framework->info.role());
- }
-
// There should be no offered resources yet!
CHECK_EQ(Resources(), framework->totalOfferedResources);
@@ -7552,17 +7582,7 @@ Try<Nothing> Master::activateRecoveredFramework(
CHECK(framework->pid.isNone());
CHECK(framework->http.isNone());
- // The `FrameworkInfo` might have changed.
- LOG(INFO) << "Updating info for framework " << framework->id();
-
- Try<Nothing> updateFrameworkInfo =
- framework->updateFrameworkInfo(frameworkInfo);
-
- if (updateFrameworkInfo.isError()) {
- return updateFrameworkInfo;
- }
-
- allocator->updateFramework(framework->id(), framework->info);
+ updateFramework(framework, frameworkInfo);
// Updating `registeredTime` here is debatable: ideally,
// `registeredTime` would be the time at which the framework first
@@ -7897,26 +7917,8 @@ void Master::removeFramework(Framework* framework)
framework->unregisteredTime = Clock::now();
- auto removeFrameworkRole = [this](Framework* framework, const string& role) {
- CHECK(isWhitelistedRole(role))
- << "Unknown role '" << role << "'"
- << " of framework " << *framework;
-
- CHECK(roles.contains(role));
-
- roles[role]->removeFramework(framework);
- if (roles[role]->frameworks.empty()) {
- delete roles[role];
- roles.erase(role);
- }
- };
-
- if (framework->capabilities.multiRole) {
- foreach (const string& role, framework->info.roles()) {
- removeFrameworkRole(framework, role);
- }
- } else {
- removeFrameworkRole(framework, framework->info.role());
+ foreach (const string& role, framework->roles) {
+ framework->untrackUnderRole(role);
}
// TODO(anand): This only works for pid based frameworks. We would
http://git-wip-us.apache.org/repos/asf/mesos/blob/68bb8d1f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1738eeb..d92c8ad 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -602,6 +602,10 @@ protected:
// executors and recover the resources.
void removeFramework(Slave* slave, Framework* framework);
+ void updateFramework(
+ Framework* framework,
+ const FrameworkInfo& frameworkInfo);
+
void disconnect(Framework* framework);
void deactivate(Framework* framework, bool rescind);
@@ -1820,7 +1824,10 @@ private:
hashmap<OfferID, InverseOffer*> inverseOffers;
hashmap<OfferID, process::Timer> inverseOfferTimers;
- // Roles with > 0 frameworks currently registered.
+ // We track information about roles that we're aware of in the system.
+ // Specifically, we keep track of the roles when a framework subscribes to
+ // the role, and/or when there are resources allocated to the role
+ // (e.g. some tasks and/or executors are consuming resources under the role).
hashmap<std::string, Role*> roles;
// Configured role whitelist if using the (deprecated) "explicit
@@ -2210,48 +2217,30 @@ struct Framework
ACTIVE
};
- Framework(Master* const _master,
+ Framework(Master* const master,
const Flags& masterFlags,
- const FrameworkInfo& _info,
+ const FrameworkInfo& info,
const process::UPID& _pid,
const process::Time& time = process::Clock::now())
- : master(_master),
- info(_info),
- roles(protobuf::framework::getRoles(_info)),
- capabilities(_info.capabilities()),
- pid(_pid),
- state(ACTIVE),
- registeredTime(time),
- reregisteredTime(time),
- completedTasks(masterFlags.max_completed_tasks_per_framework),
- unreachableTasks(masterFlags.max_unreachable_tasks_per_framework) {}
+ : Framework(master, masterFlags, info, ACTIVE, time)
+ {
+ pid = _pid;
+ }
- Framework(Master* const _master,
+ Framework(Master* const master,
const Flags& masterFlags,
- const FrameworkInfo& _info,
+ const FrameworkInfo& info,
const HttpConnection& _http,
const process::Time& time = process::Clock::now())
- : master(_master),
- info(_info),
- roles(protobuf::framework::getRoles(_info)),
- capabilities(_info.capabilities()),
- http(_http),
- state(ACTIVE),
- registeredTime(time),
- reregisteredTime(time),
- completedTasks(masterFlags.max_completed_tasks_per_framework),
- unreachableTasks(masterFlags.max_unreachable_tasks_per_framework) {}
+ : Framework(master, masterFlags, info, ACTIVE, time)
+ {
+ http = _http;
+ }
- Framework(Master* const _master,
+ Framework(Master* const master,
const Flags& masterFlags,
- const FrameworkInfo& _info)
- : master(_master),
- info(_info),
- roles(protobuf::framework::getRoles(_info)),
- capabilities(_info.capabilities()),
- state(RECOVERED),
- completedTasks(masterFlags.max_completed_tasks_per_framework),
- unreachableTasks(masterFlags.max_unreachable_tasks_per_framework) {}
+ const FrameworkInfo& info)
+ : Framework(master, masterFlags, info, RECOVERED, process::Time()) {}
~Framework()
{
@@ -2286,6 +2275,18 @@ struct Framework
if (!Master::isRemovable(task->state())) {
totalUsedResources += task->resources();
usedResources[task->slave_id()] += task->resources();
+
+ // It's possible that we're not tracking the task's role for
+ // this framework if the role is absent from the framework's
+ // set of roles. In this case, we track the role's allocation
+ // for this framework.
+ CHECK(!task->resources().empty());
+ const std::string& role =
+ task->resources().begin()->allocation_info().role();
+
+ if (!isTrackedUnderRole(role)) {
+ trackUnderRole(role);
+ }
}
}
@@ -2297,7 +2298,6 @@ struct Framework
// functionally for all tasks is expensive, for now.
void recoverResources(Task* task)
{
- CHECK(Master::isRemovable(task->state()));
CHECK(tasks.contains(task->task_id()))
<< "Unknown task " << task->task_id()
<< " of framework " << task->framework_id();
@@ -2307,6 +2307,23 @@ struct Framework
if (usedResources[task->slave_id()].empty()) {
usedResources.erase(task->slave_id());
}
+
+ // If we are no longer subscribed to the role to which these resources are
+ // being returned to, and we have no more resources allocated to us for that
+ // role, stop tracking the framework under the role.
+ CHECK(!task->resources().empty());
+ const std::string& role =
+ task->resources().begin()->allocation_info().role();
+
+ auto allocatedToRole = [&role](const Resource& resource) {
+ return resource.allocation_info().role() == role;
+ };
+
+ if (roles.count(role) == 0 &&
+ totalUsedResources.filter(allocatedToRole).empty()) {
+ CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+ untrackUnderRole(role);
+ }
}
// Sends a message to the connected framework.
@@ -2355,11 +2372,7 @@ struct Framework
<< " of framework " << task->framework_id();
if (!Master::isRemovable(task->state())) {
- totalUsedResources -= task->resources();
- usedResources[task->slave_id()] -= task->resources();
- if (usedResources[task->slave_id()].empty()) {
- usedResources.erase(task->slave_id());
- }
+ recoverResources(task);
}
if (task->state() == TASK_UNREACHABLE) {
@@ -2431,6 +2444,19 @@ struct Framework
executors[slaveId][executorInfo.executor_id()] = executorInfo;
totalUsedResources += executorInfo.resources();
usedResources[slaveId] += executorInfo.resources();
+
+ // It's possible that we're not tracking the task's role for
+ // this framework if the role is absent from the framework's
+ // set of roles. In this case, we track the role's allocation
+ // for this framework.
+ if (!executorInfo.resources().empty()) {
+ const std::string& role =
+ executorInfo.resources().begin()->allocation_info().role();
+
+ if (!isTrackedUnderRole(role)) {
+ trackUnderRole(role);
+ }
+ }
}
void removeExecutor(const SlaveID& slaveId,
@@ -2441,12 +2467,32 @@ struct Framework
<< "' of framework " << id()
<< " of agent " << slaveId;
- totalUsedResources -= executors[slaveId][executorId].resources();
- usedResources[slaveId] -= executors[slaveId][executorId].resources();
+ const ExecutorInfo& executorInfo = executors[slaveId][executorId];
+
+ totalUsedResources -= executorInfo.resources();
+ usedResources[slaveId] -= executorInfo.resources();
if (usedResources[slaveId].empty()) {
usedResources.erase(slaveId);
}
+ // If we are no longer subscribed to the role to which these resources are
+ // being returned to, and we have no more resources allocated to us for that
+ // role, stop tracking the framework under the role.
+ if (!executorInfo.resources().empty()) {
+ const std::string& role =
+ executorInfo.resources().begin()->allocation_info().role();
+
+ auto allocatedToRole = [&role](const Resource& resource) {
+ return resource.allocation_info().role() == role;
+ };
+
+ if (roles.count(role) == 0 &&
+ totalUsedResources.filter(allocatedToRole).empty()) {
+ CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+ untrackUnderRole(role);
+ }
+ }
+
executors[slaveId].erase(executorId);
if (executors[slaveId].empty()) {
executors.erase(slaveId);
@@ -2456,55 +2502,32 @@ struct Framework
const FrameworkID id() const { return info.id(); }
// Update fields in 'info' using those in 'newInfo'. Currently this
- // only updates 'name', 'failover_timeout', 'hostname', 'webui_url',
- // 'capabilities', and 'labels'.
- Try<Nothing> updateFrameworkInfo(const FrameworkInfo& newInfo)
+ // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname',
+ // 'webui_url', 'capabilities', and 'labels'.
+ void update(const FrameworkInfo& newInfo)
{
// We only merge 'info' from the same framework 'id'.
CHECK_EQ(info.id(), newInfo.id());
+ // Save the old list of roles for later.
+ std::set<std::string> oldRoles = roles;
+
// TODO(jmlvanre): Merge other fields as per design doc in
// MESOS-703.
- // We currently do not allow frameworks to add or remove roles. We
- // do however allow frameworks to opt in and out of `MULTI_ROLE`
- // capability, given that the `role` and `roles` field contain the
- // same number of roles.
- if (capabilities.multiRole || protobuf::frameworkHasCapability(
- newInfo, FrameworkInfo::Capability::MULTI_ROLE)) {
- // Two `roles` sets are equivalent if they contain the same
- // elements. A `role` `*` is not equivalent to an empty `roles`
- // set, but to the set `{*}`. Since we might be dealing with a
- // framework upgrading to `MULTI_ROLE` capability or dropping
- // it, we need to examine either `role` or `roles` in order to
- // determine the roles a framework is subscribed to.
- const std::set<std::string> newRoles =
- protobuf::framework::getRoles(newInfo);
-
- if (roles != newRoles) {
- return Error(
- "Frameworks cannot change their roles: expected '" +
- stringify(roles) + "', but got '" + stringify(newRoles) + "'");
- }
-
- info.clear_role();
- info.clear_roles();
+ info.clear_role();
+ info.clear_roles();
- if (newInfo.has_role()) {
- info.set_role(newInfo.role());
- }
+ if (newInfo.has_role()) {
+ info.set_role(newInfo.role());
+ }
- if (newInfo.roles_size() > 0) {
- info.mutable_roles()->CopyFrom(newInfo.roles());
- }
- } else {
- if (newInfo.role() != info.role()) {
- LOG(WARNING) << "Cannot update FrameworkInfo.role to '"
- << newInfo.role() << "' for framework " << id()
- << ". Check MESOS-703";
- }
+ if (newInfo.roles_size() > 0) {
+ info.mutable_roles()->CopyFrom(newInfo.roles());
}
+ roles = protobuf::framework::getRoles(newInfo);
+
if (newInfo.user() != info.user()) {
LOG(WARNING) << "Cannot update FrameworkInfo.user to '" << newInfo.user()
<< "' for framework " << id() << ". Check MESOS-703";
@@ -2555,7 +2578,45 @@ struct Framework
info.clear_labels();
}
- return Nothing();
+ const std::set<std::string>& newRoles = roles;
+
+ const std::set<std::string> removedRoles = [&]() {
+ std::set<std::string> result = oldRoles;
+ foreach (const std::string& role, newRoles) {
+ result.erase(role);
+ }
+ return result;
+ }();
+
+ foreach (const std::string& role, removedRoles) {
+ auto allocatedToRole = [&role](const Resource& resource) {
+ return resource.allocation_info().role() == role;
+ };
+
+ // Stop tracking the framework under this role if there are
+ // no longer any resources allocated to it.
+ if (totalUsedResources.filter(allocatedToRole).empty()) {
+ CHECK(totalOfferedResources.filter(allocatedToRole).empty());
+ untrackUnderRole(role);
+ }
+ }
+
+ const std::set<std::string> addedRoles = [&]() {
+ std::set<std::string> result = newRoles;
+ foreach (const std::string& role, oldRoles) {
+ result.erase(role);
+ }
+ return result;
+ }();
+
+ foreach (const std::string& role, addedRoles) {
+ // NOTE: It's possible that we're already tracking this framework
+ // under the role because a framework can unsubscribe from a role
+ // while it still has resources allocated to the role.
+ if (!isTrackedUnderRole(role)) {
+ trackUnderRole(role);
+ }
+ }
}
void updateConnection(const process::UPID& newPid)
@@ -2628,6 +2689,10 @@ struct Framework
bool connected() const { return state == ACTIVE || state == INACTIVE; }
bool recovered() const { return state == RECOVERED; }
+ bool isTrackedUnderRole(const std::string& role) const;
+ void trackUnderRole(const std::string& role);
+ void untrackUnderRole(const std::string& role);
+
Master* const master;
FrameworkInfo info;
@@ -2719,6 +2784,31 @@ struct Framework
Option<process::Owned<Heartbeater>> heartbeater;
private:
+ Framework(Master* const _master,
+ const Flags& masterFlags,
+ const FrameworkInfo& _info,
+ State state,
+ const process::Time& time)
+ : master(_master),
+ info(_info),
+ roles(protobuf::framework::getRoles(_info)),
+ capabilities(_info.capabilities()),
+ state(state),
+ registeredTime(time),
+ reregisteredTime(time),
+ completedTasks(masterFlags.max_completed_tasks_per_framework),
+ unreachableTasks(masterFlags.max_unreachable_tasks_per_framework)
+ {
+ foreach (const std::string& role, roles) {
+ // NOTE: It's possible that we're already being tracked under the role
+ // because a framework can unsubscribe from a role while it still has
+ // resources allocated to the role.
+ if (!isTrackedUnderRole(role)) {
+ trackUnderRole(role);
+ }
+ }
+ }
+
Framework(const Framework&); // No copying.
Framework& operator=(const Framework&); // No assigning.
};