You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/07/09 04:12:00 UTC

[2/3] mesos git commit: Changed semantics of allocator 'updateSlave' method.

Changed semantics of allocator 'updateSlave' method.

We change the semantics of the 'updateSlave' method present in the
allocator interface. While previously the passed optional resource
argument was interpreted as the amount of (new) oversubscribed
resources, it now represents the new amount of total resources on the
given agent.

We addtionally add an optimization of
'HierarchicalAllocatorProcess::updateSlaveTotal' for cases where the
passed total is identical to the current total. This operation is a
no-op now and we prevent updating the sorters.

Review: https://reviews.apache.org/r/60638/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9e8e0133
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9e8e0133
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9e8e0133

Branch: refs/heads/master
Commit: 9e8e013340be57fb85d4fa64e43d2d090e114422
Parents: 13b1fcf
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Sat Jul 8 20:50:20 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Sat Jul 8 20:50:20 2017 -0700

----------------------------------------------------------------------
 include/mesos/allocator/allocator.hpp       |  11 +--
 src/master/allocator/mesos/allocator.hpp    |   8 +-
 src/master/allocator/mesos/hierarchical.cpp |  50 +++---------
 src/master/allocator/mesos/hierarchical.hpp |   6 +-
 src/master/master.cpp                       |  11 ++-
 src/tests/hierarchical_allocator_tests.cpp  | 100 +++++++++++++++++++++--
 6 files changed, 121 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9e8e0133/include/mesos/allocator/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/allocator/allocator.hpp b/include/mesos/allocator/allocator.hpp
index bec9e0b..9d116c6 100644
--- a/include/mesos/allocator/allocator.hpp
+++ b/include/mesos/allocator/allocator.hpp
@@ -204,19 +204,12 @@ public:
   /**
    * Updates an agent.
    *
-   * Updates the latest oversubscribed resources or capabilities for an agent.
-   * TODO(vinod): Instead of just oversubscribed resources have this
-   * method take total resources. We can then reuse this method to
-   * update Agent's total resources in the future.
-   *
-   * @param oversubscribed The new oversubscribed resources estimate from
-   *     the agent. The oversubscribed resources include the total amount
-   *     of oversubscribed resources that are allocated and available.
+   * @param total The new total resources on the agent.
    * @param capabilities The new capabilities of the agent.
    */
   virtual void updateSlave(
       const SlaveID& slave,
-      const Option<Resources>& oversubscribed = None(),
+      const Option<Resources>& total = None(),
       const Option<std::vector<SlaveInfo::Capability>>&
           capabilities = None()) = 0;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9e8e0133/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 2e780c9..725ec7c 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -98,7 +98,7 @@ public:
 
   void updateSlave(
       const SlaveID& slave,
-      const Option<Resources>& oversubscribed = None(),
+      const Option<Resources>& total = None(),
       const Option<std::vector<SlaveInfo::Capability>>& capabilities = None());
 
   void activateSlave(
@@ -237,7 +237,7 @@ public:
 
   virtual void updateSlave(
       const SlaveID& slave,
-      const Option<Resources>& oversubscribed = None(),
+      const Option<Resources>& total = None(),
       const Option<std::vector<SlaveInfo::Capability>>&
           capabilities = None()) = 0;
 
@@ -473,14 +473,14 @@ inline void MesosAllocator<AllocatorProcess>::removeSlave(
 template <typename AllocatorProcess>
 inline void MesosAllocator<AllocatorProcess>::updateSlave(
     const SlaveID& slaveId,
-    const Option<Resources>& oversubscribed,
+    const Option<Resources>& total,
     const Option<std::vector<SlaveInfo::Capability>>& capabilities)
 {
   process::dispatch(
       process,
       &MesosAllocatorProcess::updateSlave,
       slaveId,
-      oversubscribed,
+      total,
       capabilities);
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9e8e0133/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index eb01d8e..fad9330 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -623,7 +623,7 @@ void HierarchicalAllocatorProcess::removeSlave(
 
 void HierarchicalAllocatorProcess::updateSlave(
     const SlaveID& slaveId,
-    const Option<Resources>& oversubscribed,
+    const Option<Resources>& total,
     const Option<vector<SlaveInfo::Capability>>& capabilities)
 {
   CHECK(initialized);
@@ -648,42 +648,11 @@ void HierarchicalAllocatorProcess::updateSlave(
     }
   }
 
-  if (oversubscribed.isSome()) {
-    // Check that all the oversubscribed resources are revocable.
-    CHECK_EQ(oversubscribed.get(), oversubscribed->revocable());
+  if (total.isSome()) {
+    updated = updateSlaveTotal(slaveId, total.get());
 
-    const Resources oldRevocable = slave.total.revocable();
-
-    if (oldRevocable != oversubscribed.get()) {
-      // Update the total resources.
-      //
-      // Reset the total resources to include the non-revocable resources,
-      // plus the new estimate of oversubscribed resources.
-      //
-      // NOTE: All modifications to revocable resources in the allocator for
-      // `slaveId` are lost.
-      //
-      // TODO(alexr): Update this math once the source of revocable resources
-      // is extended beyond oversubscription.
-      slave.total = slave.total.nonRevocable() + oversubscribed.get();
-
-      // Update the total resources in the `roleSorter` by removing the
-      // previous oversubscribed resources and adding the new
-      // oversubscription estimate.
-      roleSorter->remove(slaveId, oldRevocable);
-      roleSorter->add(slaveId, oversubscribed.get());
-
-      updated = true;
-
-      // NOTE: We do not need to update `quotaRoleSorter` because this
-      // function only changes the revocable resources on the slave, but
-      // the quota role sorter only manages non-revocable resources.
-
-      LOG(INFO) << "Agent " << slaveId << " (" << slave.hostname << ")"
-                << " updated with oversubscribed resources "
-                << oversubscribed.get() << " (total: " << slave.total
-                << ", allocated: " << slave.allocated << ")";
-    }
+    LOG(INFO) << "Agent " << slaveId << " (" << slave.hostname << ")"
+              << " updated with total resources " << total.get();
   }
 
   if (updated) {
@@ -2365,7 +2334,7 @@ void HierarchicalAllocatorProcess::untrackFrameworkUnderRole(
 }
 
 
-void HierarchicalAllocatorProcess::updateSlaveTotal(
+bool HierarchicalAllocatorProcess::updateSlaveTotal(
     const SlaveID& slaveId,
     const Resources& total)
 {
@@ -2374,6 +2343,11 @@ void HierarchicalAllocatorProcess::updateSlaveTotal(
   Slave& slave = slaves.at(slaveId);
 
   const Resources oldTotal = slave.total;
+
+  if (oldTotal == total) {
+    return false;
+  }
+
   slave.total = total;
 
   // Currently `roleSorter` and `quotaRoleSorter`, being the root-level
@@ -2387,6 +2361,8 @@ void HierarchicalAllocatorProcess::updateSlaveTotal(
   // See comment at `quotaRoleSorter` declaration regarding non-revocable.
   quotaRoleSorter->remove(slaveId, oldTotal.nonRevocable());
   quotaRoleSorter->add(slaveId, total.nonRevocable());
+
+  return true;
 }
 
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9e8e0133/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 5c58cf4..81d1b96 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -143,7 +143,7 @@ public:
 
   void updateSlave(
       const SlaveID& slave,
-      const Option<Resources>& oversubscribed = None(),
+      const Option<Resources>& total = None(),
       const Option<std::vector<SlaveInfo::Capability>>& capabilities = None());
 
   void deactivateSlave(
@@ -525,8 +525,8 @@ private:
 
   // Helper to update the agent's total resources maintained in the allocator
   // and the role and quota sorters (whose total resources match the agent's
-  // total resources).
-  void updateSlaveTotal(const SlaveID& slaveId, const Resources& total);
+  // total resources). Returns true iff the stored agent total was changed.
+  bool updateSlaveTotal(const SlaveID& slaveId, const Resources& total);
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9e8e0133/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d01cb97..7668749 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -66,7 +66,6 @@
 #include <stout/option.hpp>
 #include <stout/path.hpp>
 #include <stout/stringify.hpp>
-#include <stout/unimplemented.hpp>
 #include <stout/unreachable.hpp>
 #include <stout/utils.hpp>
 #include <stout/uuid.hpp>
@@ -6566,10 +6565,6 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
       slave->totalResources =
         slave->totalResources.nonRevocable() +
         oversubscribedResources.revocable();
-
-      // Now update the agent's resources in the allocator.
-      allocator->updateSlave(slaveId, message.oversubscribed_resources());
-
       break;
     }
     case UpdateSlaveMessage::TOTAL: {
@@ -6579,7 +6574,8 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
       LOG(INFO) << "Received update of agent " << *slave << " with total"
                 << " resources " << totalResources;
 
-      UNIMPLEMENTED;
+      slave->totalResources = totalResources;
+      break;
     }
     case UpdateSlaveMessage::UNKNOWN: {
       LOG(WARNING) << "Ignoring update on agent " << slaveId
@@ -6588,6 +6584,9 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     }
   }
 
+  // Now update the agent's resources in the allocator.
+  allocator->updateSlave(slaveId, slave->totalResources);
+
   // Then rescind any outstanding offers with revocable resources.
   // NOTE: Need a copy of offers because the offers are removed inside the loop.
   foreach (Offer* offer, utils::copy(slave->offers)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9e8e0133/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 2a312a9..e68e39a 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -1872,7 +1872,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveOversubscribedResources)
 
   // Update the slave with 10 oversubscribed cpus.
   Resources oversubscribed = createRevocableResources("cpus", "10");
-  allocator->updateSlave(slave.id(), oversubscribed);
+  allocator->updateSlave(slave.id(), slave.resources() + oversubscribed);
 
   // The next allocation should be for 10 oversubscribed resources.
   expected = Allocation(
@@ -1883,7 +1883,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveOversubscribedResources)
 
   // Update the slave again with 12 oversubscribed cpus.
   Resources oversubscribed2 = createRevocableResources("cpus", "12");
-  allocator->updateSlave(slave.id(), oversubscribed2);
+  allocator->updateSlave(slave.id(), slave.resources() + oversubscribed2);
 
   // The next allocation should be for 2 oversubscribed cpus.
   expected = Allocation(
@@ -1894,7 +1894,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveOversubscribedResources)
 
   // Update the slave again with 5 oversubscribed cpus.
   Resources oversubscribed3 = createRevocableResources("cpus", "5");
-  allocator->updateSlave(slave.id(), oversubscribed3);
+  allocator->updateSlave(slave.id(), slave.resources() + oversubscribed3);
 
   // Since there are no more available oversubscribed resources there
   // shouldn't be an allocation.
@@ -1905,6 +1905,93 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveOversubscribedResources)
 }
 
 
+// This test ensures that we can update the total of an agent. We
+// check that we can expand and shrink the resources available on an
+// agent. Agents can be overallocated, meaning the amount of allocated
+// resources can exceed the total available resources.
+TEST_F(HierarchicalAllocatorTest, UpdateSlaveTotalResources)
+{
+  // Pause clock to disable batch allocation.
+  Clock::pause();
+
+  initialize();
+
+  // Create an agent and a framework. This triggers allocation
+  // of the agent's resources to the framework.
+  const SlaveInfo agent = createSlaveInfo("cpus:100;mem:100;disk:100");
+
+  allocator->addSlave(
+      agent.id(),
+      agent,
+      AGENT_CAPABILITIES(),
+      None(),
+      agent.resources(),
+      {});
+
+  const FrameworkInfo framework = createFrameworkInfo({"role1"});
+  allocator->addFramework(framework.id(), framework, {}, true, {});
+
+  const Allocation expected1 = Allocation(
+      framework.id(),
+      {{"role1", {{agent.id(), agent.resources()}}}});
+
+  AWAIT_EXPECT_EQ(expected1, allocations.get());
+
+  // Increase the agent's total. The additional
+  // resources will be offered to the framework.
+  const Resources addedResources = Resources::parse("cpus:12").get();
+
+  allocator->updateSlave(
+      agent.id(),
+      agent.resources() + addedResources);
+
+  const Allocation expected2 = Allocation(
+      framework.id(),
+      {{"role1", {{agent.id(), addedResources}}}});
+
+  Future<Allocation> allocation = allocations.get();
+  AWAIT_EXPECT_EQ(expected2, allocation);
+
+  // Decrease the agent's total to half its original value. The allocated now
+  // exceeds to total; nothing will be offered due to this operation.
+  const Resources agentResources2 =
+    Resources::parse("cpus:50;mem:50;disk:50").get();
+
+  allocator->updateSlave(agent.id(), agentResources2);
+
+  // Recover all agent resources allocated to the framework in the last two
+  // allocations. We will subsequently be offered the complete agent which has
+  // `agentResources2` resources.
+  allocator->recoverResources(
+      framework.id(),
+      agent.id(),
+      expected1.resources.at("role1").at(agent.id()) +
+        expected2.resources.at("role1").at(agent.id()),
+      None());
+
+  // Advance the clock to trigger allocation of
+  // the available `agentResources2` resources.
+  Clock::advance(flags.allocation_interval);
+
+  const Allocation expected3 = Allocation(
+      framework.id(),
+      {{"role1", {{agent.id(), agentResources2}}}});
+
+  AWAIT_EXPECT_EQ(expected3, allocations.get());
+
+  // Set the agent's total resources to its original value. This will trigger
+  // allocation of the newly added `agentResources2` resources now available on
+  // the agent.
+  allocator->updateSlave(agent.id(), agent.resources());
+
+  const Allocation expected4 = Allocation(
+      framework.id(),
+      {{"role1", {{agent.id(), agentResources2}}}});
+
+  AWAIT_EXPECT_EQ(expected4, allocations.get());
+}
+
+
 // This test ensures that when agent capabilities are updated
 // subsequent allocations properly account for that.
 TEST_F(HierarchicalAllocatorTest, UpdateSlaveCapabilities)
@@ -1939,6 +2026,7 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveCapabilities)
   ASSERT_TRUE(allocation.isPending());
 
   // Update the agent to be MULTI_ROLE capable.
+
   allocator->updateSlave(agent.id(), None(), AGENT_CAPABILITIES());
 
   Clock::settle();
@@ -1983,7 +2071,7 @@ TEST_F(HierarchicalAllocatorTest, OversubscribedNotAllocated)
 
   // Update the slave with 10 oversubscribed cpus.
   Resources oversubscribed = createRevocableResources("cpus", "10");
-  allocator->updateSlave(slave.id(), oversubscribed);
+  allocator->updateSlave(slave.id(), slave.resources() + oversubscribed);
 
   // No allocation should be made for oversubscribed resources because
   // the framework has not opted in for them.
@@ -2027,7 +2115,7 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources)
 
   // Update the slave with 10 oversubscribed cpus.
   Resources oversubscribed = createRevocableResources("cpus", "10");
-  allocator->updateSlave(slave.id(), oversubscribed);
+  allocator->updateSlave(slave.id(), slave.resources() + oversubscribed);
 
   // The next allocation should be for 10 oversubscribed cpus.
   expected = Allocation(
@@ -5068,7 +5156,7 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, AddAndUpdateSlave)
   watch.start(); // Reset.
 
   foreach (const SlaveInfo& slave, slaves) {
-    allocator->updateSlave(slave.id(), oversubscribed);
+    allocator->updateSlave(slave.id(), slave.resources() + oversubscribed);
   }
 
   // Wait for all the `updateSlave` operations to be processed.