You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mz...@apache.org on 2019/09/06 22:19:47 UTC
[mesos] 01/04: Separated handling offer validation failure from
handling success.
This is an automated email from the ASF dual-hosted git repository.
mzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit a8050cafaa5465bd74a2ced1c37bb6b64c735445
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Fri Sep 6 14:15:28 2019 -0700
Separated handling offer validation failure from handling success.
This patch refactors the loop through offer IDs in `Master::accept()`
into two simpler loops: one loop for the offer validation failure case,
another for the case of validation success, thus bringing removal of
offers and recovering their resources close together.
This is a prerequisite for implementing `rescindOffer()`/
`declineOffer()` in the dependent patch.
Review: https://reviews.apache.org/r/71433/
---
src/master/master.cpp | 111 +++++++++++++++++++++++++++++---------------------
1 file changed, 64 insertions(+), 47 deletions(-)
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f00906e..89435c4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4335,74 +4335,49 @@ void Master::accept(
// TODO(jieyu): Add metrics for non launch operations.
}
- // TODO(bmahler): We currently only support using multiple offers
- // for a single slave.
- Resources offeredResources;
- Option<SlaveID> slaveId = None();
Option<Error> error = None();
- Option<Resource::AllocationInfo> allocationInfo = None();
if (accept.offer_ids().size() == 0) {
error = Error("No offers specified");
} else {
// Validate the offers.
error = validation::offer::validate(accept.offer_ids(), this, framework);
+ }
- size_t offersAccepted = 0;
+ if (error.isSome()) {
+ // TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to
+ // consistently handle message dropping. It would be ideal if the
+ // 'drop' overload can handle both resource recovery and lost task
+ // notifications.
- // Compute offered resources and remove the offers. If the
- // validation failed, return resources to the allocator.
+ // Remove existing offers and recover their resources.
foreach (const OfferID& offerId, accept.offer_ids()) {
Offer* offer = getOffer(offerId);
- if (offer != nullptr) {
- // Don't bother adding resources to `offeredResources` in case
- // validation failed; just recover them.
- if (error.isSome()) {
- allocator->recoverResources(
- offer->framework_id(),
- offer->slave_id(),
- offer->resources(),
- None());
- } else {
- slaveId = offer->slave_id();
- allocationInfo = offer->allocation_info();
- offeredResources += offer->resources();
-
- offersAccepted++;
- }
-
- removeOffer(offer);
+ if (offer == nullptr) {
+ // If the offer was not in our offer set, then this offer is no
+ // longer valid.
+ LOG(WARNING) << "Ignoring accept of offer " << offerId
+ << " since it is no longer valid";
continue;
}
- // If the offer was not in our offer set, then this offer is no
- // longer valid.
- LOG(WARNING) << "Ignoring accept of offer " << offerId
- << " since it is no longer valid";
- }
+ allocator->recoverResources(
+ offer->framework_id(),
+ offer->slave_id(),
+ offer->resources(),
+ None());
- framework->metrics.offers_accepted += offersAccepted;
- }
+ removeOffer(offer);
+ }
- // If invalid, send TASK_DROPPED for the launch attempts. If the
- // framework is not partition-aware, send TASK_LOST instead. If
- // other operations have their `id` field set, then send
- // OPERATION_ERROR updates for them.
- //
- // TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to
- // consistently handle message dropping. It would be ideal if the
- // 'drop' overload can handle both resource recovery and lost task
- // notifications.
- if (error.isSome()) {
LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids()
<< "': " << error->message;
- TaskState newTaskState = TASK_DROPPED;
- if (!framework->capabilities.partitionAware) {
- newTaskState = TASK_LOST;
- }
+ const TaskState newTaskState =
+ framework->capabilities.partitionAware ? TASK_DROPPED : TASK_LOST;
foreach (const Offer::Operation& operation, accept.operations()) {
+ // Send OPERATION_ERROR for non-LAUNCH operations
if (operation.type() != Offer::Operation::LAUNCH &&
operation.type() != Offer::Operation::LAUNCH_GROUP) {
drop(framework,
@@ -4411,6 +4386,7 @@ void Master::accept(
continue;
}
+ // Send task status updates for launch attempts.
const RepeatedPtrField<TaskInfo>& tasks = [&]() {
if (operation.type() == Offer::Operation::LAUNCH) {
return operation.launch().task_infos();
@@ -4449,6 +4425,47 @@ void Master::accept(
return;
}
+ // From now on, we are handling the valid offers case.
+
+ // TODO(bmahler): We currently only support using multiple offers
+ // for a single slave.
+ Option<SlaveID> slaveId = None();
+
+ // TODO(asekretenko): The code below is copying AllocationInfo (and
+ // injecting it into operations) as a whole, but only the 'role' field is
+ // subject to offer validation. As for now, this works fine, because
+ // AllocationInfo has no other fields. However, this is fragile and can
+ // silently break if more fields are added to AllocationInfo.
+ Option<Resource::AllocationInfo> allocationInfo = None();
+ Resources offeredResources;
+
+ size_t offersAccepted = 0;
+
+ // Compute offered resources and remove the offers.
+ foreach (const OfferID& offerId, accept.offer_ids()) {
+ Offer* offer = getOffer(offerId);
+ if (offer == nullptr) {
+ LOG(WARNING) << "Ignoring accept of offer " << offerId
+ << " since it is no longer valid";
+ continue;
+ }
+
+ if (slaveId.isNone()) {
+ slaveId = offer->slave_id();
+ }
+
+ if (allocationInfo.isNone()) {
+ allocationInfo = offer->allocation_info();
+ }
+
+ offeredResources += offer->resources();
+ offersAccepted++;
+
+ removeOffer(offer);
+ }
+
+ framework->metrics.offers_accepted += offersAccepted;
+
CHECK_SOME(slaveId);
Slave* slave = slaves.registered.get(slaveId.get());
CHECK(slave != nullptr) << slaveId.get();