You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mz...@apache.org on 2019/09/06 22:19:47 UTC

[mesos] 01/04: Separated handling offer validation failure from handling success.

This is an automated email from the ASF dual-hosted git repository.

mzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit a8050cafaa5465bd74a2ced1c37bb6b64c735445
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Fri Sep 6 14:15:28 2019 -0700

    Separated handling offer validation failure from handling success.
    
    This patch refactors the loop through offer IDs in `Master::accept()`
    into two simpler loops: one loop for the offer validation failure case,
    another for the case of validation success, thus bringing removal of
    offers and recovering their resources close together.
    
    This is a prerequisite for implementing `rescindOffer()`/
    `declineOffer()` in the dependent patch.
    
    Review: https://reviews.apache.org/r/71433/
---
 src/master/master.cpp | 111 +++++++++++++++++++++++++++++---------------------
 1 file changed, 64 insertions(+), 47 deletions(-)

diff --git a/src/master/master.cpp b/src/master/master.cpp
index f00906e..89435c4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4335,74 +4335,49 @@ void Master::accept(
     // TODO(jieyu): Add metrics for non launch operations.
   }
 
-  // TODO(bmahler): We currently only support using multiple offers
-  // for a single slave.
-  Resources offeredResources;
-  Option<SlaveID> slaveId = None();
   Option<Error> error = None();
-  Option<Resource::AllocationInfo> allocationInfo = None();
 
   if (accept.offer_ids().size() == 0) {
     error = Error("No offers specified");
   } else {
     // Validate the offers.
     error = validation::offer::validate(accept.offer_ids(), this, framework);
+  }
 
-    size_t offersAccepted = 0;
+  if (error.isSome()) {
+    // TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to
+    // consistently handle message dropping. It would be ideal if the
+    // 'drop' overload can handle both resource recovery and lost task
+    // notifications.
 
-    // Compute offered resources and remove the offers. If the
-    // validation failed, return resources to the allocator.
+    // Remove existing offers and recover their resources.
     foreach (const OfferID& offerId, accept.offer_ids()) {
       Offer* offer = getOffer(offerId);
-      if (offer != nullptr) {
-        // Don't bother adding resources to `offeredResources` in case
-        // validation failed; just recover them.
-        if (error.isSome()) {
-          allocator->recoverResources(
-              offer->framework_id(),
-              offer->slave_id(),
-              offer->resources(),
-              None());
-        } else {
-          slaveId = offer->slave_id();
-          allocationInfo = offer->allocation_info();
-          offeredResources += offer->resources();
-
-          offersAccepted++;
-        }
-
-        removeOffer(offer);
+      if (offer == nullptr) {
+        // If the offer was not in our offer set, then this offer is no
+        // longer valid.
+        LOG(WARNING) << "Ignoring accept of offer " << offerId
+                     << " since it is no longer valid";
         continue;
       }
 
-      // If the offer was not in our offer set, then this offer is no
-      // longer valid.
-      LOG(WARNING) << "Ignoring accept of offer " << offerId
-                   << " since it is no longer valid";
-    }
+      allocator->recoverResources(
+          offer->framework_id(),
+          offer->slave_id(),
+          offer->resources(),
+          None());
 
-    framework->metrics.offers_accepted += offersAccepted;
-  }
+      removeOffer(offer);
+    }
 
-  // If invalid, send TASK_DROPPED for the launch attempts. If the
-  // framework is not partition-aware, send TASK_LOST instead. If
-  // other operations have their `id` field set, then send
-  // OPERATION_ERROR updates for them.
-  //
-  // TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to
-  // consistently handle message dropping. It would be ideal if the
-  // 'drop' overload can handle both resource recovery and lost task
-  // notifications.
-  if (error.isSome()) {
     LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids()
                  << "': " << error->message;
 
-    TaskState newTaskState = TASK_DROPPED;
-    if (!framework->capabilities.partitionAware) {
-      newTaskState = TASK_LOST;
-    }
+    const TaskState newTaskState =
+      framework->capabilities.partitionAware ? TASK_DROPPED : TASK_LOST;
 
     foreach (const Offer::Operation& operation, accept.operations()) {
+      // Send OPERATION_ERROR for non-LAUNCH operations
       if (operation.type() != Offer::Operation::LAUNCH &&
           operation.type() != Offer::Operation::LAUNCH_GROUP) {
         drop(framework,
@@ -4411,6 +4386,7 @@ void Master::accept(
         continue;
       }
 
+      // Send task status updates for launch attempts.
       const RepeatedPtrField<TaskInfo>& tasks = [&]() {
         if (operation.type() == Offer::Operation::LAUNCH) {
           return operation.launch().task_infos();
@@ -4449,6 +4425,47 @@ void Master::accept(
     return;
   }
 
+  // From now on, we are handling the valid offers case.
+
+  // TODO(bmahler): We currently only support using multiple offers
+  // for a single slave.
+  Option<SlaveID> slaveId = None();
+
+  // TODO(asekretenko): The code below is copying AllocationInfo (and
+  // injecting it into operations) as a whole, but only the 'role' field is
+  // subject to offer validation. As for now, this works fine, because
+  // AllocationInfo has no other fields. However, this is fragile and can
+  // silently break if more fields are added to AllocationInfo.
+  Option<Resource::AllocationInfo> allocationInfo = None();
+  Resources offeredResources;
+
+  size_t offersAccepted = 0;
+
+  // Compute offered resources and remove the offers.
+  foreach (const OfferID& offerId, accept.offer_ids()) {
+    Offer* offer = getOffer(offerId);
+    if (offer == nullptr) {
+      LOG(WARNING) << "Ignoring accept of offer " << offerId
+                   << " since it is no longer valid";
+      continue;
+    }
+
+    if (slaveId.isNone()) {
+      slaveId = offer->slave_id();
+    }
+
+    if (allocationInfo.isNone()) {
+      allocationInfo = offer->allocation_info();
+    }
+
+    offeredResources += offer->resources();
+    offersAccepted++;
+
+    removeOffer(offer);
+  }
+
+  framework->metrics.offers_accepted += offersAccepted;
+
   CHECK_SOME(slaveId);
   Slave* slave = slaves.registered.get(slaveId.get());
   CHECK(slave != nullptr) << slaveId.get();