You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mz...@apache.org on 2019/09/06 22:19:50 UTC

[mesos] 04/04: Replaced removeOffer + recoverResources pairs with specialized helpers.

This is an automated email from the ASF dual-hosted git repository.

mzhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 927b012e96abeebbb02c293698be1ef43867e15f
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Fri Sep 6 14:15:54 2019 -0700

    Replaced removeOffer + recoverResources pairs with specialized helpers.
    
    This patch adds helper methods `Master::rescindOffer()` /
    `Master::discardOffer()` that recover offer's resources in the allocator
    and remove the offer, and replaces paired calls of `removeOffer()` +
    `allocator->recoverResources()` with these helpers.
    
    Review: https://reviews.apache.org/r/71436/
---
 src/master/http.cpp            |   8 +-
 src/master/master.cpp          | 242 ++++++++++++++++-------------------------
 src/master/master.hpp          |  19 +++-
 src/master/quota_handler.cpp   |  20 +---
 src/master/weights_handler.cpp |   8 +-
 5 files changed, 115 insertions(+), 182 deletions(-)

diff --git a/src/master/http.cpp b/src/master/http.cpp
index 0987d93..60765c9 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -4336,13 +4336,7 @@ Future<Response> Master::Http::_operation(
     // NOTE: However it's entirely possible that these resources are
     // offered to other frameworks in the next 'allocate' and the filter
     // cannot prevent it.
-    master->allocator->recoverResources(
-        offer->framework_id(),
-        offer->slave_id(),
-        offer->resources(),
-        Filters());
-
-    master->removeOffer(offer, true); // Rescind!
+    master->rescindOffer(offer, Filters());
 
     // If we've rescinded enough offers to cover 'operation', we're done.
     Try<Resources> updatedRecovered = totalRecovered.apply(operation);
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 60eb3aa..a2c289a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1164,9 +1164,8 @@ void Master::finalize()
       }
     }
 
-    // Remove offers.
     foreach (Offer* offer, utils::copy(slave->offers)) {
-      removeOffer(offer);
+      discardOffer(offer);
     }
 
     // Remove inverse offers.
@@ -3127,17 +3126,12 @@ void Master::_subscribe(
       LOG(INFO) << "Allowing framework " << *framework
                 << " to subscribe with an already used id";
 
-      // Remove any offers sent to this framework.
+      // Rescind any offers sent to this framework.
       // NOTE: We need to do this because the scheduler might have
       // replied to the offers but the driver might have dropped
       // those messages since it wasn't connected to the master.
       foreach (Offer* offer, utils::copy(framework->offers)) {
-        allocator->recoverResources(
-            offer->framework_id(),
-            offer->slave_id(),
-            offer->resources(),
-            None());
-        removeOffer(offer, true); // Rescind.
+        rescindOffer(offer);
       }
 
       // Also remove inverse offers.
@@ -3368,13 +3362,11 @@ void Master::deactivate(Framework* framework, bool rescind)
 
   // Remove the framework's offers.
   foreach (Offer* offer, utils::copy(framework->offers)) {
-    allocator->recoverResources(
-        offer->framework_id(),
-        offer->slave_id(),
-        offer->resources(),
-        None());
-
-    removeOffer(offer, rescind);
+    if (rescind) {
+      rescindOffer(offer);
+    } else {
+      discardOffer(offer);
+    }
   }
 
   // Remove the framework's inverse offers.
@@ -3421,15 +3413,8 @@ void Master::deactivate(Slave* slave)
 
   allocator->deactivateSlave(slave->id);
 
-  // Remove and rescind offers.
   foreach (Offer* offer, utils::copy(slave->offers)) {
-    allocator->recoverResources(
-        offer->framework_id(),
-        slave->id,
-        offer->resources(),
-        None());
-
-    removeOffer(offer, true); // Rescind!
+    rescindOffer(offer);
   }
 
   // Remove and rescind inverse offers.
@@ -4350,24 +4335,17 @@ void Master::accept(
     // 'drop' overload can handle both resource recovery and lost task
     // notifications.
 
-    // Remove existing offers and recover their resources.
+    // Discard existing offers.
     foreach (const OfferID& offerId, accept.offer_ids()) {
       Offer* offer = getOffer(offerId);
-      if (offer == nullptr) {
+      if (offer != nullptr) {
+        discardOffer(offer);
+      } else {
         // If the offer was not in our offer set, then this offer is no
         // longer valid.
         LOG(WARNING) << "Ignoring accept of offer " << offerId
                      << " since it is no longer valid";
-        continue;
       }
-
-      allocator->recoverResources(
-          offer->framework_id(),
-          offer->slave_id(),
-          offer->resources(),
-          None());
-
-      removeOffer(offer);
     }
 
     LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids()
@@ -4913,21 +4891,14 @@ void Master::_accept(
     scheduler::Call::Accept&& accept,
     const Future<vector<Future<bool>>>& _authorizations)
 {
-  Resources offeredResources;
-  size_t offersAccepted = 0;
-
-  foreach (const OfferID& offerId, accept.offer_ids()) {
-    Offer* offer = getOffer(offerId);
-    if (offer == nullptr) {
-      LOG(WARNING) << "Ignoring accept of offer " << offerId
-                   << " since it is no longer valid";
-      continue;
+  auto discardOffers = [this](const RepeatedPtrField<OfferID>& ids) {
+    for (const OfferID& offerId : ids) {
+      Offer* offer = getOffer(offerId);
+      if (offer != nullptr) {
+        discardOffer(offer);
+      }
     }
-    offeredResources += offer->resources();
-    ++offersAccepted;
-
-    removeOffer(offer);
-  }
+  };
 
   Framework* framework = getFramework(frameworkId);
 
@@ -4938,18 +4909,12 @@ void Master::_accept(
       << "Ignoring ACCEPT call for framework " << frameworkId
       << " because the framework cannot be found";
 
-    // Tell the allocator about the recovered resources.
-    allocator->recoverResources(
-        frameworkId,
-        slaveId,
-        offeredResources,
-        None());
-
+    // TODO(asekretenko): consider replacing this with a CHECK that there
+    // never are any offers for a non-active (inactive/completed/...) framework.
+    discardOffers(accept.offer_ids());
     return;
   }
 
-  framework->metrics.offers_accepted += offersAccepted;
-
   Slave* slave = slaves.registered.get(slaveId);
 
   if (slave == nullptr || !slave->connected) {
@@ -5011,16 +4976,30 @@ void Master::_accept(
       }
     }
 
-    // Tell the allocator about the recovered resources.
-    allocator->recoverResources(
-        frameworkId,
-        slaveId,
-        offeredResources,
-        None());
-
+    // TODO(asekretenko): consider replacing this with a CHECK that there
+    // never are any offers for a removed/disconnected slave.
+    discardOffers(accept.offer_ids());
     return;
   }
 
+  Resources offeredResources;
+  size_t offersAccepted = 0;
+
+  foreach (const OfferID& offerId, accept.offer_ids()) {
+    Offer* offer = getOffer(offerId);
+    if (offer == nullptr) {
+      LOG(WARNING) << "Ignoring accept of offer " << offerId
+                   << " since it is no longer valid";
+      continue;
+    }
+    offeredResources += offer->resources();
+    ++offersAccepted;
+
+    _removeOffer(framework, offer);
+  }
+
+  framework->metrics.offers_accepted += offersAccepted;
+
   // We maintain the "running remaining" resources here to support pipelining of
   // speculative operations (e.g., RESERVE), which would modify the remaining
   // resources. Resources consumed by non-speculative operations (e.g., LAUNCH)
@@ -5328,13 +5307,7 @@ void Master::_accept(
 
           foreach (const Resource& volume, operation.destroy().volumes()) {
             if (offered.contains(volume)) {
-              allocator->recoverResources(
-                  offer->framework_id(),
-                  offer->slave_id(),
-                  offered,
-                  None());
-
-              removeOffer(offer, true);
+              rescindOffer(offer);
 
               // This offer may contain other volumes that are being destroyed.
               // However, we have already rescinded it, so we should move on
@@ -6245,18 +6218,10 @@ void Master::decline(
 
   size_t offersDeclined = 0;
 
-  //  Return resources to the allocator.
   foreach (const OfferID& offerId, decline.offer_ids()) {
     Offer* offer = getOffer(offerId);
     if (offer != nullptr) {
-      allocator->recoverResources(
-          offer->framework_id(),
-          offer->slave_id(),
-          offer->resources(),
-          decline.filters());
-
-      removeOffer(offer);
-
+      discardOffer(offer, decline.filters());
       offersDeclined++;
       continue;
     }
@@ -8320,20 +8285,12 @@ void Master::updateFramework(
   // the frameworks from the added/removed roles, respectively.
   allocator->updateFramework(framework->id(), frameworkInfo, suppressedRoles);
 
-  // First, remove the offers allocated to roles being removed.
+  // Rescind offers allocated to the roles that were removed.
+  const set<string> newRoles = protobuf::framework::getRoles(frameworkInfo);
   foreach (Offer* offer, utils::copy(framework->offers)) {
-    set<string> newRoles = protobuf::framework::getRoles(frameworkInfo);
-    if (newRoles.count(offer->allocation_info().role()) > 0) {
-      continue;
+    if (newRoles.count(offer->allocation_info().role()) == 0) {
+      rescindOffer(offer);
     }
-
-    allocator->recoverResources(
-        offer->framework_id(),
-        offer->slave_id(),
-        offer->resources(),
-        None());
-
-    removeOffer(offer, true); // Rescind!
   }
 
   framework->update(frameworkInfo);
@@ -8829,18 +8786,17 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
   // Then rescind outstanding offers affected by the update.
   // NOTE: Need a copy of offers because the offers are removed inside the loop.
   foreach (Offer* offer, utils::copy(slave->offers)) {
-    bool rescind = false;
-
     const Resources& offered = offer->resources();
     // Since updates of the agent's oversubscribed resources are sent at regular
     // intervals, we only rescind offers containing revocable resources to
     // reduce churn.
     if (hasOversubscribed && !offered.revocable().empty()) {
-      LOG(INFO) << "Removing offer " << offer->id()
+      LOG(INFO) << "Rescinding offer " << offer->id()
                 << " with revocable resources " << offered << " on agent "
                 << *slave;
 
-      rescind = true;
+      rescindOffer(offer);
+      continue;
     }
 
     // Updates on resource providers can change the agent total
@@ -8853,23 +8809,11 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
     if (message.has_resource_providers() &&
         !offeredResourceProviderResources.empty()) {
       LOG(INFO)
-        << "Removing offer " << offer->id()
+        << "Rescinding offer " << offer->id()
         << " with resources " << offered << " on agent " << *slave;
 
-      rescind = true;
+      rescindOffer(offer);
     }
-
-    if (!rescind) {
-      continue;
-    }
-
-    allocator->recoverResources(
-        offer->framework_id(),
-        offer->slave_id(),
-        offered,
-        None());
-
-    removeOffer(offer, true); // Rescind.
   }
 
   // NOTE: We don't need to rescind inverse offers here as they are unrelated to
@@ -8914,13 +8858,10 @@ void Master::updateUnavailability(
         LOG(INFO) << "Removing unavailability of agent " << *slave;
       }
 
-      // Remove and rescind offers since we want to inform frameworks of the
+      // Rescind offers since we want to inform frameworks of the
       // unavailability change as soon as possible.
       foreach (Offer* offer, utils::copy(slave->offers)) {
-        allocator->recoverResources(
-            offer->framework_id(), slave->id, offer->resources(), None());
-
-        removeOffer(offer, true); // Rescind!
+        rescindOffer(offer);
       }
 
       // Remove and rescind inverse offers since the allocator will send new
@@ -11180,12 +11121,9 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
 
 void Master::_failoverFramework(Framework* framework)
 {
-  // Remove the framework's offers (if they weren't removed before).
+  // Discard the framework's offers, if they weren't removed before.
   foreach (Offer* offer, utils::copy(framework->offers)) {
-    allocator->recoverResources(
-        offer->framework_id(), offer->slave_id(), offer->resources(), None());
-
-    removeOffer(offer);
+    discardOffer(offer);
   }
 
   // Also remove the inverse offers.
@@ -11750,13 +11688,7 @@ void Master::_removeSlave(
   }
 
   foreach (Offer* offer, utils::copy(slave->offers)) {
-    // TODO(vinod): We don't need to call 'Allocator::recoverResources'
-    // once MESOS-621 is fixed.
-    allocator->recoverResources(
-        offer->framework_id(), slave->id, offer->resources(), None());
-
-    // Remove and rescind offers.
-    removeOffer(offer, true); // Rescind!
+    rescindOffer(offer);
   }
 
   // Remove inverse offers because sending them for a slave that is
@@ -11914,13 +11846,7 @@ void Master::__removeSlave(
   }
 
   foreach (Offer* offer, utils::copy(slave->offers)) {
-    // TODO(vinod): We don't need to call 'Allocator::recoverResources'
-    // once MESOS-621 is fixed.
-    allocator->recoverResources(
-        offer->framework_id(), slave->id, offer->resources(), None());
-
-    // Remove and rescind offers.
-    removeOffer(offer, true); // Rescind!
+    rescindOffer(offer);
   }
 
   // Remove inverse offers because sending them for a slave that is
@@ -12694,23 +12620,48 @@ void Master::offerTimeout(const OfferID& offerId)
 {
   Offer* offer = getOffer(offerId);
   if (offer != nullptr) {
-    allocator->recoverResources(
-        offer->framework_id(), offer->slave_id(), offer->resources(), None());
-    removeOffer(offer, true);
+    rescindOffer(offer);
   }
 }
 
 
-// TODO(vinod): Instead of 'removeOffer()', consider implementing
-// 'useOffer()', 'discardOffer()' and 'rescindOffer()' for clarity.
-void Master::removeOffer(Offer* offer, bool rescind)
+void Master::rescindOffer(Offer* offer, const Option<Filters>& filters)
+{
+  Framework* framework = getFramework(offer->framework_id());
+  CHECK(framework != nullptr)
+    << "Unknown framework " << offer->framework_id()
+    << " in the offer " << offer->id();
+
+  RescindResourceOfferMessage message;
+  message.mutable_offer_id()->MergeFrom(offer->id());
+
+  framework->metrics.offers_rescinded++;
+  framework->send(message);
+
+  allocator->recoverResources(
+      offer->framework_id(), offer->slave_id(), offer->resources(), filters);
+
+  _removeOffer(framework, offer);
+}
+
+
+void Master::discardOffer(Offer* offer, const Option<Filters>& filters)
 {
-  // Remove from framework.
   Framework* framework = getFramework(offer->framework_id());
   CHECK(framework != nullptr)
     << "Unknown framework " << offer->framework_id()
     << " in the offer " << offer->id();
 
+  allocator->recoverResources(
+      offer->framework_id(), offer->slave_id(), offer->resources(), filters);
+
+  _removeOffer(framework, offer);
+}
+
+
+void Master::_removeOffer(Framework* framework, Offer* offer)
+{
+  CHECK_EQ(framework->id(), offer->framework_id());
   framework->removeOffer(offer);
 
   // Remove from slave.
@@ -12722,13 +12673,6 @@ void Master::removeOffer(Offer* offer, bool rescind)
 
   slave->removeOffer(offer);
 
-  if (rescind) {
-    RescindResourceOfferMessage message;
-    message.mutable_offer_id()->MergeFrom(offer->id());
-    framework->metrics.offers_rescinded++;
-    framework->send(message);
-  }
-
   // Remove and cancel offer removal timers. Canceling the Timers is
   // only done to avoid having too many active Timers in libprocess.
   if (offerTimers.contains(offer->id())) {
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 3f35b25..23eb2a6 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -986,8 +986,23 @@ protected:
   // Remove an offer after specified timeout
   void offerTimeout(const OfferID& offerId);
 
-  // Remove an offer and optionally rescind the offer as well.
-  void removeOffer(Offer* offer, bool rescind = false);
+  // Methods for removing an offer and handling associated resources.
+  // Both recover the resources in the allocator (optionally setting offer
+  // filters) and remove the offer in the master. `rescindOffer` further
+  // notifies the framework about the rescind.
+  //
+  // NOTE: the `filters` field in `rescindOffers` is needed only as
+  // a workaround for the race between the master and the allocator
+  // which happens when the master tries to free up resources to satisfy
+  // operator initiated operations.
+  void rescindOffer(Offer* offer, const Option<Filters>& filters = None());
+  void discardOffer(Offer* offer, const Option<Filters>& filters = None());
+
+  // Helper for rescindOffer() /  discardOffer() / _accept().
+  // Do not use directly.
+  //
+  // The offer must belong to the framework.
+  void _removeOffer(Framework* framework, Offer* offer);
 
   // Remove an inverse offer after specified timeout
   void inverseOfferTimeout(const OfferID& inverseOfferId);
diff --git a/src/master/quota_handler.cpp b/src/master/quota_handler.cpp
index f28eb27..083ee30 100644
--- a/src/master/quota_handler.cpp
+++ b/src/master/quota_handler.cpp
@@ -303,9 +303,6 @@ void Master::QuotaHandler::rescindOffers(const QuotaInfo& request) const
     // Rescind all outstanding offers from the given agent.
     bool agentVisited = false;
     foreach (Offer* offer, utils::copy(slave->offers)) {
-      master->allocator->recoverResources(
-          offer->framework_id(), offer->slave_id(), offer->resources(), None());
-
       auto unallocated = [](const Resources& resources) {
         Resources result = resources;
         result.unallocate();
@@ -313,7 +310,7 @@ void Master::QuotaHandler::rescindOffers(const QuotaInfo& request) const
       };
 
       rescinded += unallocated(offer->resources());
-      master->removeOffer(offer, true);
+      master->rescindOffer(offer);
       agentVisited = true;
     }
 
@@ -644,12 +641,7 @@ Future<http::Response> Master::QuotaHandler::_update(
             consumedAndOffered -=
               ResourceQuantities::fromResources(offer->resources());
 
-            master->allocator->recoverResources(
-                offer->framework_id(),
-                offer->slave_id(),
-                offer->resources(),
-                None());
-            master->removeOffer(offer, true);
+            master->rescindOffer(offer);
           }
         }
 
@@ -685,13 +677,7 @@ Future<http::Response> Master::QuotaHandler::_update(
             }
 
             rescinded += ResourceQuantities::fromResources(offer->resources());
-
-            master->allocator->recoverResources(
-                offer->framework_id(),
-                offer->slave_id(),
-                offer->resources(),
-                None());
-            master->removeOffer(offer, true);
+            master->rescindOffer(offer);
           }
         }
       }
diff --git a/src/master/weights_handler.cpp b/src/master/weights_handler.cpp
index dfb6f06..4ebeb34 100644
--- a/src/master/weights_handler.cpp
+++ b/src/master/weights_handler.cpp
@@ -303,13 +303,7 @@ void Master::WeightsHandler::rescindOffers(
   if (rescind) {
     foreachvalue (const Slave* slave, master->slaves.registered) {
       foreach (Offer* offer, utils::copy(slave->offers)) {
-        master->allocator->recoverResources(
-            offer->framework_id(),
-            offer->slave_id(),
-            offer->resources(),
-            None());
-
-        master->removeOffer(offer, true);
+        master->rescindOffer(offer);
       }
     }
   }