You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mz...@apache.org on 2019/09/06 22:19:48 UTC

[mesos] 02/04: Moved `removeOffers()` from `Master::accept()` into `Master::_accept()`.

This is an automated email from the ASF dual-hosted git repository.

mzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 7eb21c41ed255184988298e29644bf7f310c3374
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Fri Sep 6 14:15:38 2019 -0700

    Moved `removeOffers()` from `Master::accept()` into `Master::_accept()`.
    
    This patch moves offer removal on accept into the deferred continuation
    that follows authorization (if offers pass validation in `accept()`).
    
    Incrementing the `offers_accepted` metric is also moved to `_accept()`.
    
    This is a prerequisite for implementing `rescindOffer()` /
    `declineOffer()` / in the dependent patch.
    
    Review: https://reviews.apache.org/r/71434/
---
 src/master/master.cpp | 81 +++++++++++++++++++++++++--------------------------
 src/master/master.hpp |  1 -
 2 files changed, 40 insertions(+), 42 deletions(-)

diff --git a/src/master/master.cpp b/src/master/master.cpp
index 89435c4..60eb3aa 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4427,48 +4427,32 @@ void Master::accept(
 
   // From now on, we are handling the valid offers case.
 
+  // Get slave id and allocation info from some existing offer
+  // (as they are valid, they must have the same slave id and allocation info).
+  const Offer* existingOffer = ([this](const RepeatedPtrField<OfferID>& ids) {
+    for (const OfferID& id : ids) {
+      const Offer* offer = getOffer(id);
+      if (offer != nullptr) {
+        return offer;
+      }
+    }
+
+    LOG(FATAL) << "No validated offer_ids correspond to existing offers";
+  })(accept.offer_ids());
+
   // TODO(bmahler): We currently only support using multiple offers
   // for a single slave.
-  Option<SlaveID> slaveId = None();
+  SlaveID slaveId = existingOffer->slave_id();
 
-  // TODO(asekretenko): The code below is copying AllocationInfo (and
+  // TODO(asekretenko): The code below is copying AllocationInfo (and later
   // injecting it into operations) as a whole, but only the 'role' field is
   // subject to offer validation. As for now, this works fine, because
   // AllocationInfo has no other fields. However, this is fragile and can
   // silently break if more fields are added to AllocationInfo.
-  Option<Resource::AllocationInfo> allocationInfo = None();
-  Resources offeredResources;
-
-  size_t offersAccepted = 0;
-
-  // Compute offered resources and remove the offers.
-  foreach (const OfferID& offerId, accept.offer_ids()) {
-    Offer* offer = getOffer(offerId);
-    if (offer == nullptr) {
-      LOG(WARNING) << "Ignoring accept of offer " << offerId
-                   << " since it is no longer valid";
-      continue;
-    }
-
-    if (slaveId.isNone()) {
-      slaveId = offer->slave_id();
-    }
-
-    if (allocationInfo.isNone()) {
-      allocationInfo = offer->allocation_info();
-    }
-
-    offeredResources += offer->resources();
-    offersAccepted++;
-
-    removeOffer(offer);
-  }
-
-  framework->metrics.offers_accepted += offersAccepted;
+  Resource::AllocationInfo allocationInfo = existingOffer->allocation_info();
 
-  CHECK_SOME(slaveId);
-  Slave* slave = slaves.registered.get(slaveId.get());
-  CHECK(slave != nullptr) << slaveId.get();
+  Slave* slave = slaves.registered.get(slaveId);
+  CHECK(slave != nullptr) << slaveId;
 
   // Validate and upgrade all of the resources in `accept.operations`:
   //
@@ -4625,7 +4609,7 @@ void Master::accept(
               drop(framework,
                    operation,
                    "Operation requested feedback, but agent " +
-                   stringify(slaveId.get()) +
+                   stringify(slaveId) +
                    " does not have the required RESOURCE_PROVIDER capability");
               break;
             }
@@ -4636,7 +4620,7 @@ void Master::accept(
               drop(framework,
                    operation,
                    "Operation on agent default resources requested feedback,"
-                   " but agent " + stringify(slaveId.get()) +
+                   " but agent " + stringify(slaveId) +
                    " does not have the required AGENT_OPERATION_FEEDBACK and"
                    " RESOURCE_PROVIDER capabilities");
               break;
@@ -4666,8 +4650,7 @@ void Master::accept(
     // within an offer now contain an `AllocationInfo`. We therefore
     // inject the offer's allocation info into the operation's
     // resources if the scheduler has not done so already.
-    CHECK_SOME(allocationInfo);
-    protobuf::injectAllocationInfo(&operation, allocationInfo.get());
+    protobuf::injectAllocationInfo(&operation, allocationInfo);
 
     switch (operation.type()) {
       case Offer::Operation::RESERVE:
@@ -4918,8 +4901,7 @@ void Master::accept(
     .onAny(defer(self(),
                  &Master::_accept,
                  framework->id(),
-                 slaveId.get(),
-                 offeredResources,
+                 slaveId,
                  std::move(accept),
                  lambda::_1));
 }
@@ -4928,10 +4910,25 @@ void Master::accept(
 void Master::_accept(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,
-    const Resources& offeredResources,
     scheduler::Call::Accept&& accept,
     const Future<vector<Future<bool>>>& _authorizations)
 {
+  Resources offeredResources;
+  size_t offersAccepted = 0;
+
+  foreach (const OfferID& offerId, accept.offer_ids()) {
+    Offer* offer = getOffer(offerId);
+    if (offer == nullptr) {
+      LOG(WARNING) << "Ignoring accept of offer " << offerId
+                   << " since it is no longer valid";
+      continue;
+    }
+    offeredResources += offer->resources();
+    ++offersAccepted;
+
+    removeOffer(offer);
+  }
+
   Framework* framework = getFramework(frameworkId);
 
   // TODO(jieyu): Consider using the 'drop' overload mentioned in
@@ -4951,6 +4948,8 @@ void Master::_accept(
     return;
   }
 
+  framework->metrics.offers_accepted += offersAccepted;
+
   Slave* slave = slaves.registered.get(slaveId);
 
   if (slave == nullptr || !slave->connected) {
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 19c1782..3f35b25 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1096,7 +1096,6 @@ private:
   void _accept(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
-      const Resources& offeredResources,
       mesos::scheduler::Call::Accept&& accept,
       const process::Future<
           std::vector<process::Future<bool>>>& authorizations);