You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mz...@apache.org on 2019/09/06 22:19:50 UTC
[mesos] 04/04: Replaced removeOffer + recoverResources pairs with
specialized helpers.
This is an automated email from the ASF dual-hosted git repository.
mzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 927b012e96abeebbb02c293698be1ef43867e15f
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Fri Sep 6 14:15:54 2019 -0700
Replaced removeOffer + recoverResources pairs with specialized helpers.
This patch adds helper methods `Master::rescindOffer()` /
`Master::discardOffer()` that recover offer's resources in the allocator
and remove the offer, and replaces paired calls of `removeOffer()` +
`allocator->recoverResources()` with these helpers.
Review: https://reviews.apache.org/r/71436/
---
src/master/http.cpp | 8 +-
src/master/master.cpp | 242 ++++++++++++++++-------------------------
src/master/master.hpp | 19 +++-
src/master/quota_handler.cpp | 20 +---
src/master/weights_handler.cpp | 8 +-
5 files changed, 115 insertions(+), 182 deletions(-)
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 0987d93..60765c9 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -4336,13 +4336,7 @@ Future<Response> Master::Http::_operation(
// NOTE: However it's entirely possible that these resources are
// offered to other frameworks in the next 'allocate' and the filter
// cannot prevent it.
- master->allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offer->resources(),
- Filters());
-
- master->removeOffer(offer, true); // Rescind!
+ master->rescindOffer(offer, Filters());
// If we've rescinded enough offers to cover 'operation', we're done.
Try<Resources> updatedRecovered = totalRecovered.apply(operation);
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 60eb3aa..a2c289a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1164,9 +1164,8 @@ void Master::finalize()
}
}
- // Remove offers.
foreach (Offer* offer, utils::copy(slave->offers)) {
- removeOffer(offer);
+ discardOffer(offer);
}
// Remove inverse offers.
@@ -3127,17 +3126,12 @@ void Master::_subscribe(
LOG(INFO) << "Allowing framework " << *framework
<< " to subscribe with an already used id";
- // Remove any offers sent to this framework.
+ // Rescind any offers sent to this framework.
// NOTE: We need to do this because the scheduler might have
// replied to the offers but the driver might have dropped
// those messages since it wasn't connected to the master.
foreach (Offer* offer, utils::copy(framework->offers)) {
- allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offer->resources(),
- None());
- removeOffer(offer, true); // Rescind.
+ rescindOffer(offer);
}
// Also remove inverse offers.
@@ -3368,13 +3362,11 @@ void Master::deactivate(Framework* framework, bool rescind)
// Remove the framework's offers.
foreach (Offer* offer, utils::copy(framework->offers)) {
- allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offer->resources(),
- None());
-
- removeOffer(offer, rescind);
+ if (rescind) {
+ rescindOffer(offer);
+ } else {
+ discardOffer(offer);
+ }
}
// Remove the framework's inverse offers.
@@ -3421,15 +3413,8 @@ void Master::deactivate(Slave* slave)
allocator->deactivateSlave(slave->id);
- // Remove and rescind offers.
foreach (Offer* offer, utils::copy(slave->offers)) {
- allocator->recoverResources(
- offer->framework_id(),
- slave->id,
- offer->resources(),
- None());
-
- removeOffer(offer, true); // Rescind!
+ rescindOffer(offer);
}
// Remove and rescind inverse offers.
@@ -4350,24 +4335,17 @@ void Master::accept(
// 'drop' overload can handle both resource recovery and lost task
// notifications.
- // Remove existing offers and recover their resources.
+ // Discard existing offers.
foreach (const OfferID& offerId, accept.offer_ids()) {
Offer* offer = getOffer(offerId);
- if (offer == nullptr) {
+ if (offer != nullptr) {
+ discardOffer(offer);
+ } else {
// If the offer was not in our offer set, then this offer is no
// longer valid.
LOG(WARNING) << "Ignoring accept of offer " << offerId
<< " since it is no longer valid";
- continue;
}
-
- allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offer->resources(),
- None());
-
- removeOffer(offer);
}
LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids()
@@ -4913,21 +4891,14 @@ void Master::_accept(
scheduler::Call::Accept&& accept,
const Future<vector<Future<bool>>>& _authorizations)
{
- Resources offeredResources;
- size_t offersAccepted = 0;
-
- foreach (const OfferID& offerId, accept.offer_ids()) {
- Offer* offer = getOffer(offerId);
- if (offer == nullptr) {
- LOG(WARNING) << "Ignoring accept of offer " << offerId
- << " since it is no longer valid";
- continue;
+ auto discardOffers = [this](const RepeatedPtrField<OfferID>& ids) {
+ for (const OfferID& offerId : ids) {
+ Offer* offer = getOffer(offerId);
+ if (offer != nullptr) {
+ discardOffer(offer);
+ }
}
- offeredResources += offer->resources();
- ++offersAccepted;
-
- removeOffer(offer);
- }
+ };
Framework* framework = getFramework(frameworkId);
@@ -4938,18 +4909,12 @@ void Master::_accept(
<< "Ignoring ACCEPT call for framework " << frameworkId
<< " because the framework cannot be found";
- // Tell the allocator about the recovered resources.
- allocator->recoverResources(
- frameworkId,
- slaveId,
- offeredResources,
- None());
-
+ // TODO(asekretenko): consider replacing this with a CHECK that there
+ // never are any offers for a non-active (inactive/completed/...) framework.
+ discardOffers(accept.offer_ids());
return;
}
- framework->metrics.offers_accepted += offersAccepted;
-
Slave* slave = slaves.registered.get(slaveId);
if (slave == nullptr || !slave->connected) {
@@ -5011,16 +4976,30 @@ void Master::_accept(
}
}
- // Tell the allocator about the recovered resources.
- allocator->recoverResources(
- frameworkId,
- slaveId,
- offeredResources,
- None());
-
+ // TODO(asekretenko): consider replacing this with a CHECK that there
+ // never are any offers for a removed/disconnected slave.
+ discardOffers(accept.offer_ids());
return;
}
+ Resources offeredResources;
+ size_t offersAccepted = 0;
+
+ foreach (const OfferID& offerId, accept.offer_ids()) {
+ Offer* offer = getOffer(offerId);
+ if (offer == nullptr) {
+ LOG(WARNING) << "Ignoring accept of offer " << offerId
+ << " since it is no longer valid";
+ continue;
+ }
+ offeredResources += offer->resources();
+ ++offersAccepted;
+
+ _removeOffer(framework, offer);
+ }
+
+ framework->metrics.offers_accepted += offersAccepted;
+
// We maintain the "running remaining" resources here to support pipelining of
// speculative operations (e.g., RESERVE), which would modify the remaining
// resources. Resources consumed by non-speculative operations (e.g., LAUNCH)
@@ -5328,13 +5307,7 @@ void Master::_accept(
foreach (const Resource& volume, operation.destroy().volumes()) {
if (offered.contains(volume)) {
- allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offered,
- None());
-
- removeOffer(offer, true);
+ rescindOffer(offer);
// This offer may contain other volumes that are being destroyed.
// However, we have already rescinded it, so we should move on
@@ -6245,18 +6218,10 @@ void Master::decline(
size_t offersDeclined = 0;
- // Return resources to the allocator.
foreach (const OfferID& offerId, decline.offer_ids()) {
Offer* offer = getOffer(offerId);
if (offer != nullptr) {
- allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offer->resources(),
- decline.filters());
-
- removeOffer(offer);
-
+ discardOffer(offer, decline.filters());
offersDeclined++;
continue;
}
@@ -8320,20 +8285,12 @@ void Master::updateFramework(
// the frameworks from the added/removed roles, respectively.
allocator->updateFramework(framework->id(), frameworkInfo, suppressedRoles);
- // First, remove the offers allocated to roles being removed.
+ // Rescind offers allocated to the roles that were removed.
+ const set<string> newRoles = protobuf::framework::getRoles(frameworkInfo);
foreach (Offer* offer, utils::copy(framework->offers)) {
- set<string> newRoles = protobuf::framework::getRoles(frameworkInfo);
- if (newRoles.count(offer->allocation_info().role()) > 0) {
- continue;
+ if (newRoles.count(offer->allocation_info().role()) == 0) {
+ rescindOffer(offer);
}
-
- allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offer->resources(),
- None());
-
- removeOffer(offer, true); // Rescind!
}
framework->update(frameworkInfo);
@@ -8829,18 +8786,17 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
// Then rescind outstanding offers affected by the update.
// NOTE: Need a copy of offers because the offers are removed inside the loop.
foreach (Offer* offer, utils::copy(slave->offers)) {
- bool rescind = false;
-
const Resources& offered = offer->resources();
// Since updates of the agent's oversubscribed resources are sent at regular
// intervals, we only rescind offers containing revocable resources to
// reduce churn.
if (hasOversubscribed && !offered.revocable().empty()) {
- LOG(INFO) << "Removing offer " << offer->id()
+ LOG(INFO) << "Rescinding offer " << offer->id()
<< " with revocable resources " << offered << " on agent "
<< *slave;
- rescind = true;
+ rescindOffer(offer);
+ continue;
}
// Updates on resource providers can change the agent total
@@ -8853,23 +8809,11 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
if (message.has_resource_providers() &&
!offeredResourceProviderResources.empty()) {
LOG(INFO)
- << "Removing offer " << offer->id()
+ << "Rescinding offer " << offer->id()
<< " with resources " << offered << " on agent " << *slave;
- rescind = true;
+ rescindOffer(offer);
}
-
- if (!rescind) {
- continue;
- }
-
- allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offered,
- None());
-
- removeOffer(offer, true); // Rescind.
}
// NOTE: We don't need to rescind inverse offers here as they are unrelated to
@@ -8914,13 +8858,10 @@ void Master::updateUnavailability(
LOG(INFO) << "Removing unavailability of agent " << *slave;
}
- // Remove and rescind offers since we want to inform frameworks of the
+ // Rescind offers since we want to inform frameworks of the
// unavailability change as soon as possible.
foreach (Offer* offer, utils::copy(slave->offers)) {
- allocator->recoverResources(
- offer->framework_id(), slave->id, offer->resources(), None());
-
- removeOffer(offer, true); // Rescind!
+ rescindOffer(offer);
}
// Remove and rescind inverse offers since the allocator will send new
@@ -11180,12 +11121,9 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
void Master::_failoverFramework(Framework* framework)
{
- // Remove the framework's offers (if they weren't removed before).
+ // Discard the framework's offers, if they weren't removed before.
foreach (Offer* offer, utils::copy(framework->offers)) {
- allocator->recoverResources(
- offer->framework_id(), offer->slave_id(), offer->resources(), None());
-
- removeOffer(offer);
+ discardOffer(offer);
}
// Also remove the inverse offers.
@@ -11750,13 +11688,7 @@ void Master::_removeSlave(
}
foreach (Offer* offer, utils::copy(slave->offers)) {
- // TODO(vinod): We don't need to call 'Allocator::recoverResources'
- // once MESOS-621 is fixed.
- allocator->recoverResources(
- offer->framework_id(), slave->id, offer->resources(), None());
-
- // Remove and rescind offers.
- removeOffer(offer, true); // Rescind!
+ rescindOffer(offer);
}
// Remove inverse offers because sending them for a slave that is
@@ -11914,13 +11846,7 @@ void Master::__removeSlave(
}
foreach (Offer* offer, utils::copy(slave->offers)) {
- // TODO(vinod): We don't need to call 'Allocator::recoverResources'
- // once MESOS-621 is fixed.
- allocator->recoverResources(
- offer->framework_id(), slave->id, offer->resources(), None());
-
- // Remove and rescind offers.
- removeOffer(offer, true); // Rescind!
+ rescindOffer(offer);
}
// Remove inverse offers because sending them for a slave that is
@@ -12694,23 +12620,48 @@ void Master::offerTimeout(const OfferID& offerId)
{
Offer* offer = getOffer(offerId);
if (offer != nullptr) {
- allocator->recoverResources(
- offer->framework_id(), offer->slave_id(), offer->resources(), None());
- removeOffer(offer, true);
+ rescindOffer(offer);
}
}
-// TODO(vinod): Instead of 'removeOffer()', consider implementing
-// 'useOffer()', 'discardOffer()' and 'rescindOffer()' for clarity.
-void Master::removeOffer(Offer* offer, bool rescind)
+void Master::rescindOffer(Offer* offer, const Option<Filters>& filters)
+{
+ Framework* framework = getFramework(offer->framework_id());
+ CHECK(framework != nullptr)
+ << "Unknown framework " << offer->framework_id()
+ << " in the offer " << offer->id();
+
+ RescindResourceOfferMessage message;
+ message.mutable_offer_id()->MergeFrom(offer->id());
+
+ framework->metrics.offers_rescinded++;
+ framework->send(message);
+
+ allocator->recoverResources(
+ offer->framework_id(), offer->slave_id(), offer->resources(), filters);
+
+ _removeOffer(framework, offer);
+}
+
+
+void Master::discardOffer(Offer* offer, const Option<Filters>& filters)
{
- // Remove from framework.
Framework* framework = getFramework(offer->framework_id());
CHECK(framework != nullptr)
<< "Unknown framework " << offer->framework_id()
<< " in the offer " << offer->id();
+ allocator->recoverResources(
+ offer->framework_id(), offer->slave_id(), offer->resources(), filters);
+
+ _removeOffer(framework, offer);
+}
+
+
+void Master::_removeOffer(Framework* framework, Offer* offer)
+{
+ CHECK_EQ(framework->id(), offer->framework_id());
framework->removeOffer(offer);
// Remove from slave.
@@ -12722,13 +12673,6 @@ void Master::removeOffer(Offer* offer, bool rescind)
slave->removeOffer(offer);
- if (rescind) {
- RescindResourceOfferMessage message;
- message.mutable_offer_id()->MergeFrom(offer->id());
- framework->metrics.offers_rescinded++;
- framework->send(message);
- }
-
// Remove and cancel offer removal timers. Canceling the Timers is
// only done to avoid having too many active Timers in libprocess.
if (offerTimers.contains(offer->id())) {
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 3f35b25..23eb2a6 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -986,8 +986,23 @@ protected:
// Remove an offer after specified timeout
void offerTimeout(const OfferID& offerId);
- // Remove an offer and optionally rescind the offer as well.
- void removeOffer(Offer* offer, bool rescind = false);
+ // Methods for removing an offer and handling associated resources.
+ // Both recover the resources in the allocator (optionally setting offer
+ // filters) and remove the offer in the master. `rescindOffer` further
+ // notifies the framework about the rescind.
+ //
+ // NOTE: the `filters` field in `rescindOffers` is needed only as
+ // a workaround for the race between the master and the allocator
+ // which happens when the master tries to free up resources to satisfy
+ // operator initiated operations.
+ void rescindOffer(Offer* offer, const Option<Filters>& filters = None());
+ void discardOffer(Offer* offer, const Option<Filters>& filters = None());
+
+ // Helper for rescindOffer() / discardOffer() / _accept().
+ // Do not use directly.
+ //
+ // The offer must belong to the framework.
+ void _removeOffer(Framework* framework, Offer* offer);
// Remove an inverse offer after specified timeout
void inverseOfferTimeout(const OfferID& inverseOfferId);
diff --git a/src/master/quota_handler.cpp b/src/master/quota_handler.cpp
index f28eb27..083ee30 100644
--- a/src/master/quota_handler.cpp
+++ b/src/master/quota_handler.cpp
@@ -303,9 +303,6 @@ void Master::QuotaHandler::rescindOffers(const QuotaInfo& request) const
// Rescind all outstanding offers from the given agent.
bool agentVisited = false;
foreach (Offer* offer, utils::copy(slave->offers)) {
- master->allocator->recoverResources(
- offer->framework_id(), offer->slave_id(), offer->resources(), None());
-
auto unallocated = [](const Resources& resources) {
Resources result = resources;
result.unallocate();
@@ -313,7 +310,7 @@ void Master::QuotaHandler::rescindOffers(const QuotaInfo& request) const
};
rescinded += unallocated(offer->resources());
- master->removeOffer(offer, true);
+ master->rescindOffer(offer);
agentVisited = true;
}
@@ -644,12 +641,7 @@ Future<http::Response> Master::QuotaHandler::_update(
consumedAndOffered -=
ResourceQuantities::fromResources(offer->resources());
- master->allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offer->resources(),
- None());
- master->removeOffer(offer, true);
+ master->rescindOffer(offer);
}
}
@@ -685,13 +677,7 @@ Future<http::Response> Master::QuotaHandler::_update(
}
rescinded += ResourceQuantities::fromResources(offer->resources());
-
- master->allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offer->resources(),
- None());
- master->removeOffer(offer, true);
+ master->rescindOffer(offer);
}
}
}
diff --git a/src/master/weights_handler.cpp b/src/master/weights_handler.cpp
index dfb6f06..4ebeb34 100644
--- a/src/master/weights_handler.cpp
+++ b/src/master/weights_handler.cpp
@@ -303,13 +303,7 @@ void Master::WeightsHandler::rescindOffers(
if (rescind) {
foreachvalue (const Slave* slave, master->slaves.registered) {
foreach (Offer* offer, utils::copy(slave->offers)) {
- master->allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offer->resources(),
- None());
-
- master->removeOffer(offer, true);
+ master->rescindOffer(offer);
}
}
}