You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2016/07/05 16:53:09 UTC

[5/5] mesos git commit: Fixed allocator to update total resources in `quotaRoleSorter`.

Fixed allocator to update total resources in `quotaRoleSorter`.

Each `DRFSorter` tracks the total resources in the cluster. This means
that each sorter must be updated when the resources in the cluster have
changed, e.g., due to the creation of a dynamic reservation or a
persistent volume. In the previous implementation, the quota role sorter
was not updated for non-quota roles when a reservation or persistent
volume was created by a framework. This resulted in inconsistency
between the total resources in the allocator and the quota role sorter.

This could cause several problems. First, removing a slave from the
cluster would leak resources in the quota role sorter. Second, certain
interleavings of slave removals and `reserve`/`unreserve` operations by
frameworks and via HTTP endpoints could lead to `CHECK` failures.

Review: https://reviews.apache.org/r/49377/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/75230e04
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/75230e04
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/75230e04

Branch: refs/heads/0.28.x
Commit: 75230e04895313bb9f4d9b9f655d2c9c713b3a3a
Parents: 3b0efb3
Author: Neil Conway <ne...@gmail.com>
Authored: Mon Jul 4 15:38:42 2016 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Tue Jul 5 18:51:46 2016 +0200

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.cpp     |  52 ++++---
 src/master/allocator/sorter/drf/sorter.cpp      |   5 -
 src/tests/persistent_volume_endpoints_tests.cpp | 136 +++++++++++++++++++
 3 files changed, 171 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/75230e04/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 108a1df..34cea7c 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -607,11 +607,25 @@ void HierarchicalAllocatorProcess::updateAllocation(
 
   const string& role = frameworks[frameworkId].role;
 
-  // Here we apply offer operations to the allocated resources, which
-  // in turns leads to an update of the total. The available resources
-  // remain unchanged.
+  // Here we apply offer operations to the allocated and total
+  // resources in the allocator and each of the sorters. The available
+  // resources remain unchanged.
 
-  // Update the allocated resources.
+  // Update the per-slave allocation.
+  Try<Resources> updatedSlaveAllocation =
+    slaves[slaveId].allocated.apply(operations);
+
+  CHECK_SOME(updatedSlaveAllocation);
+
+  slaves[slaveId].allocated = updatedSlaveAllocation.get();
+
+  // Update the total resources.
+  Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
+  CHECK_SOME(updatedTotal);
+
+  slaves[slaveId].total = updatedTotal.get();
+
+  // Update the total and allocated resources in each sorter.
   CHECK(frameworkSorters.contains(role));
   const Owned<Sorter>& frameworkSorter = frameworkSorters[role];
 
@@ -623,18 +637,35 @@ void HierarchicalAllocatorProcess::updateAllocation(
 
   CHECK_SOME(updatedFrameworkAllocation);
 
+  // Update the total and allocated resources in the framework sorter
+  // for the current role.
+  frameworkSorter->remove(slaveId, frameworkAllocation);
+  frameworkSorter->add(slaveId, updatedFrameworkAllocation.get());
+
   frameworkSorter->update(
       frameworkId.value(),
       slaveId,
       frameworkAllocation,
       updatedFrameworkAllocation.get());
 
+  // Update the total and allocated resources in the role sorter.
+  roleSorter->remove(slaveId, frameworkAllocation);
+  roleSorter->add(slaveId, updatedFrameworkAllocation.get());
+
   roleSorter->update(
       role,
       slaveId,
       frameworkAllocation,
       updatedFrameworkAllocation.get());
 
+  // Update the total and allocated resources in the quota role
+  // sorter. Note that we always update the quota role sorter's total
+  // resources; we only update its allocated resources if this role
+  // has quota set.
+  quotaRoleSorter->remove(slaveId, frameworkAllocation.nonRevocable());
+  quotaRoleSorter->add(
+      slaveId, updatedFrameworkAllocation.get().nonRevocable());
+
   if (quotas.contains(role)) {
     // See comment at `quotaRoleSorter` declaration regarding non-revocable.
     quotaRoleSorter->update(
@@ -644,19 +675,6 @@ void HierarchicalAllocatorProcess::updateAllocation(
         updatedFrameworkAllocation.get().nonRevocable());
   }
 
-  Try<Resources> updatedSlaveAllocation =
-    slaves[slaveId].allocated.apply(operations);
-
-  CHECK_SOME(updatedSlaveAllocation);
-
-  slaves[slaveId].allocated = updatedSlaveAllocation.get();
-
-  // Update the total resources.
-  Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
-  CHECK_SOME(updatedTotal);
-
-  slaves[slaveId].total = updatedTotal.get();
-
   LOG(INFO) << "Updated allocation of framework " << frameworkId
             << " on slave " << slaveId
             << " from " << frameworkAllocation

http://git-wip-us.apache.org/repos/asf/mesos/blob/75230e04/src/master/allocator/sorter/drf/sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp
index a8d83c1..0db6b02 100644
--- a/src/master/allocator/sorter/drf/sorter.cpp
+++ b/src/master/allocator/sorter/drf/sorter.cpp
@@ -142,11 +142,6 @@ void DRFSorter::update(
   const Resources newAllocationQuantity =
     newAllocation.createStrippedScalarQuantity();
 
-  CHECK(total_.scalarQuantities.contains(oldAllocationQuantity));
-
-  total_.scalarQuantities -= oldAllocationQuantity;
-  total_.scalarQuantities += newAllocationQuantity;
-
   CHECK(allocations[name].resources[slaveId].contains(oldAllocation));
   CHECK(allocations[name].scalarQuantities.contains(oldAllocationQuantity));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/75230e04/src/tests/persistent_volume_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_endpoints_tests.cpp b/src/tests/persistent_volume_endpoints_tests.cpp
index 2a74acb..5922578 100644
--- a/src/tests/persistent_volume_endpoints_tests.cpp
+++ b/src/tests/persistent_volume_endpoints_tests.cpp
@@ -1649,6 +1649,142 @@ TEST_F(PersistentVolumeEndpointsTest, EndpointCreateThenOfferRemove)
 }
 
 
+// This test checks that a combination of HTTP endpoint reservations,
+// framework reservations and unreservations, and slave removals works
+// correctly. See MESOS-5698 for context.
+TEST_F(PersistentVolumeEndpointsTest, ReserveAndSlaveRemoval)
+{
+  TestAllocator<> allocator;
+
+  EXPECT_CALL(allocator, initialize(_, _, _, _));
+
+  Try<PID<Master>> master = StartMaster(&allocator);
+  ASSERT_SOME(master);
+
+  Future<SlaveID> slave1Id;
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
+    .WillOnce(DoAll(InvokeAddSlave(&allocator),
+                    FutureArg<0>(&slave1Id)));
+
+  slave::Flags slave1Flags = CreateSlaveFlags();
+  slave1Flags.resources = "cpus:4";
+  Try<PID<Slave>> slave1 = StartSlave(slave1Flags);
+
+  ASSERT_SOME(slave1);
+  AWAIT_READY(slave1Id);
+
+  Future<SlaveID> slave2Id;
+  EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
+    .WillOnce(DoAll(InvokeAddSlave(&allocator),
+                    FutureArg<0>(&slave2Id)));
+
+  // Each slave needs its own flags to ensure work_dirs are unique.
+  slave::Flags slave2Flags = CreateSlaveFlags();
+  slave2Flags.resources = "cpus:3";
+  Try<PID<Slave>> slave2 = StartSlave(slave2Flags);
+
+  ASSERT_SOME(slave2);
+  AWAIT_READY(slave2Id);
+
+  FrameworkInfo frameworkInfo = createFrameworkInfo();
+
+  // Reserve all CPUs on `slave1` via HTTP endpoint.
+  Resources slave1Unreserved = Resources::parse("cpus:4").get();
+  Resources slave1Reserved = slave1Unreserved.flatten(
+      frameworkInfo.role(),
+      createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+  Future<Response> response = process::http::post(
+      master.get(),
+      "reserve",
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+      createRequestBody(slave1Id.get(), "resources", slave1Reserved));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(allocator, addFramework(_, _, _));
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(2u, offers.get().size());
+
+  Future<CheckpointResourcesMessage> checkpointResources =
+    FUTURE_PROTOBUF(CheckpointResourcesMessage(),
+                    master.get(),
+                    slave2.get());
+
+  // Use the offers API to reserve all CPUs on `slave2`.
+  Resources slave2Unreserved = Resources::parse("cpus:3").get();
+  Resources slave2Reserved = slave2Unreserved.flatten(
+      frameworkInfo.role(),
+      createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+  for (size_t i = 0; i < offers.get().size(); i++) {
+    Offer offer = offers.get()[i];
+    SlaveID offeredSlaveId = offer.slave_id();
+
+    ASSERT_TRUE(offeredSlaveId == slave1Id.get() ||
+                offeredSlaveId == slave2Id.get());
+
+    if (offeredSlaveId == slave2Id.get()) {
+      driver.acceptOffers({offer.id()}, {RESERVE(slave2Reserved)});
+      break;
+    }
+  }
+
+  AWAIT_READY(checkpointResources);
+  EXPECT_EQ(Resources(checkpointResources.get().resources()),
+            slave2Reserved);
+
+  // Shutdown `slave2` with an explicit shutdown message.
+  Future<Nothing> removeSlave;
+  EXPECT_CALL(allocator, removeSlave(_))
+    .WillOnce(DoAll(InvokeRemoveSlave(&allocator),
+                    FutureSatisfy(&removeSlave)));
+
+  EXPECT_CALL(sched, offerRescinded(_, _));
+
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(_, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+
+  this->Stop(slave2.get(), true);
+
+  AWAIT_READY(removeSlave);
+
+  AWAIT_READY(slaveLost);
+
+  response = process::http::post(
+      master.get(),
+      "unreserve",
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+      createRequestBody(slave1Id.get(), "resources", slave1Reserved));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  driver.stop();
+  driver.join();
+
+  EXPECT_CALL(allocator, removeSlave(_));
+
+  Shutdown();
+}
+
+
 // This tests that dynamic reservations and persistent volumes are
 // reflected in the "/slaves" master endpoint.
 TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)