You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mz...@apache.org on 2019/09/06 22:19:48 UTC
[mesos] 02/04: Moved `removeOffers()` from `Master::accept()` into
`Master::_accept()`.
This is an automated email from the ASF dual-hosted git repository.
mzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 7eb21c41ed255184988298e29644bf7f310c3374
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Fri Sep 6 14:15:38 2019 -0700
Moved `removeOffers()` from `Master::accept()` into `Master::_accept()`.
This patch moves offer removal on accept into the deferred continuation
that follows authorization (if offers pass validation in `accept()`).
Incrementing the `offers_accepted` metric is also moved to `_accept()`.
This is a prerequisite for implementing `rescindOffer()` /
`declineOffer()` / in the dependent patch.
Review: https://reviews.apache.org/r/71434/
---
src/master/master.cpp | 81 +++++++++++++++++++++++++--------------------------
src/master/master.hpp | 1 -
2 files changed, 40 insertions(+), 42 deletions(-)
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 89435c4..60eb3aa 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4427,48 +4427,32 @@ void Master::accept(
// From now on, we are handling the valid offers case.
+ // Get slave id and allocation info from some existing offer
+ // (as they are valid, they must have the same slave id and allocation info).
+ const Offer* existingOffer = ([this](const RepeatedPtrField<OfferID>& ids) {
+ for (const OfferID& id : ids) {
+ const Offer* offer = getOffer(id);
+ if (offer != nullptr) {
+ return offer;
+ }
+ }
+
+ LOG(FATAL) << "No validated offer_ids correspond to existing offers";
+ })(accept.offer_ids());
+
// TODO(bmahler): We currently only support using multiple offers
// for a single slave.
- Option<SlaveID> slaveId = None();
+ SlaveID slaveId = existingOffer->slave_id();
- // TODO(asekretenko): The code below is copying AllocationInfo (and
+ // TODO(asekretenko): The code below is copying AllocationInfo (and later
// injecting it into operations) as a whole, but only the 'role' field is
// subject to offer validation. As for now, this works fine, because
// AllocationInfo has no other fields. However, this is fragile and can
// silently break if more fields are added to AllocationInfo.
- Option<Resource::AllocationInfo> allocationInfo = None();
- Resources offeredResources;
-
- size_t offersAccepted = 0;
-
- // Compute offered resources and remove the offers.
- foreach (const OfferID& offerId, accept.offer_ids()) {
- Offer* offer = getOffer(offerId);
- if (offer == nullptr) {
- LOG(WARNING) << "Ignoring accept of offer " << offerId
- << " since it is no longer valid";
- continue;
- }
-
- if (slaveId.isNone()) {
- slaveId = offer->slave_id();
- }
-
- if (allocationInfo.isNone()) {
- allocationInfo = offer->allocation_info();
- }
-
- offeredResources += offer->resources();
- offersAccepted++;
-
- removeOffer(offer);
- }
-
- framework->metrics.offers_accepted += offersAccepted;
+ Resource::AllocationInfo allocationInfo = existingOffer->allocation_info();
- CHECK_SOME(slaveId);
- Slave* slave = slaves.registered.get(slaveId.get());
- CHECK(slave != nullptr) << slaveId.get();
+ Slave* slave = slaves.registered.get(slaveId);
+ CHECK(slave != nullptr) << slaveId;
// Validate and upgrade all of the resources in `accept.operations`:
//
@@ -4625,7 +4609,7 @@ void Master::accept(
drop(framework,
operation,
"Operation requested feedback, but agent " +
- stringify(slaveId.get()) +
+ stringify(slaveId) +
" does not have the required RESOURCE_PROVIDER capability");
break;
}
@@ -4636,7 +4620,7 @@ void Master::accept(
drop(framework,
operation,
"Operation on agent default resources requested feedback,"
- " but agent " + stringify(slaveId.get()) +
+ " but agent " + stringify(slaveId) +
" does not have the required AGENT_OPERATION_FEEDBACK and"
" RESOURCE_PROVIDER capabilities");
break;
@@ -4666,8 +4650,7 @@ void Master::accept(
// within an offer now contain an `AllocationInfo`. We therefore
// inject the offer's allocation info into the operation's
// resources if the scheduler has not done so already.
- CHECK_SOME(allocationInfo);
- protobuf::injectAllocationInfo(&operation, allocationInfo.get());
+ protobuf::injectAllocationInfo(&operation, allocationInfo);
switch (operation.type()) {
case Offer::Operation::RESERVE:
@@ -4918,8 +4901,7 @@ void Master::accept(
.onAny(defer(self(),
&Master::_accept,
framework->id(),
- slaveId.get(),
- offeredResources,
+ slaveId,
std::move(accept),
lambda::_1));
}
@@ -4928,10 +4910,25 @@ void Master::accept(
void Master::_accept(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
- const Resources& offeredResources,
scheduler::Call::Accept&& accept,
const Future<vector<Future<bool>>>& _authorizations)
{
+ Resources offeredResources;
+ size_t offersAccepted = 0;
+
+ foreach (const OfferID& offerId, accept.offer_ids()) {
+ Offer* offer = getOffer(offerId);
+ if (offer == nullptr) {
+ LOG(WARNING) << "Ignoring accept of offer " << offerId
+ << " since it is no longer valid";
+ continue;
+ }
+ offeredResources += offer->resources();
+ ++offersAccepted;
+
+ removeOffer(offer);
+ }
+
Framework* framework = getFramework(frameworkId);
// TODO(jieyu): Consider using the 'drop' overload mentioned in
@@ -4951,6 +4948,8 @@ void Master::_accept(
return;
}
+ framework->metrics.offers_accepted += offersAccepted;
+
Slave* slave = slaves.registered.get(slaveId);
if (slave == nullptr || !slave->connected) {
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 19c1782..3f35b25 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1096,7 +1096,6 @@ private:
void _accept(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
- const Resources& offeredResources,
mesos::scheduler::Call::Accept&& accept,
const process::Future<
std::vector<process::Future<bool>>>& authorizations);