You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2016/07/04 15:13:26 UTC
[4/4] mesos git commit: Fixed allocator to update total resources in
`quotaRoleSorter`.
Fixed allocator to update total resources in `quotaRoleSorter`.
Each `DRFSorter` tracks the total resources in the cluster. This means
that each sorter must be updated when the resources in the cluster have
changed, e.g., due to the creation of a dynamic reservation or a
persistent volume. In the previous implementation, the quota role sorter
was not updated for non-quota roles when a reservation or persistent
volume was created by a framework. This resulted in inconsistency
between the total resources in the allocator and the quota role sorter.
This could cause several problems. First, removing a slave from the
cluster would leak resources in the quota role sorter. Second, certain
interleavings of slave removals and `reserve`/`unreserve` operations by
frameworks and via HTTP endpoints could lead to `CHECK` failures.
Review: https://reviews.apache.org/r/49377/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e5f73dc7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e5f73dc7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e5f73dc7
Branch: refs/heads/master
Commit: e5f73dc7acc102e026207b976e675fb5f2e67072
Parents: 10204ad
Author: Neil Conway <ne...@gmail.com>
Authored: Mon Jul 4 15:38:42 2016 +0200
Committer: Michael Park <mp...@apache.org>
Committed: Mon Jul 4 15:38:42 2016 +0200
----------------------------------------------------------------------
src/master/allocator/mesos/hierarchical.cpp | 52 ++++---
src/master/allocator/sorter/drf/sorter.cpp | 5 -
src/tests/persistent_volume_endpoints_tests.cpp | 136 +++++++++++++++++++
3 files changed, 171 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e5f73dc7/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 15eb76d..c1e0003 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -612,11 +612,25 @@ void HierarchicalAllocatorProcess::updateAllocation(
const string& role = frameworks[frameworkId].role;
- // Here we apply offer operations to the allocated resources, which
- // in turns leads to an update of the total. The available resources
- // remain unchanged.
+ // Here we apply offer operations to the allocated and total
+ // resources in the allocator and each of the sorters. The available
+ // resources remain unchanged.
- // Update the allocated resources.
+ // Update the per-slave allocation.
+ Try<Resources> updatedSlaveAllocation =
+ slaves[slaveId].allocated.apply(operations);
+
+ CHECK_SOME(updatedSlaveAllocation);
+
+ slaves[slaveId].allocated = updatedSlaveAllocation.get();
+
+ // Update the total resources.
+ Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
+ CHECK_SOME(updatedTotal);
+
+ slaves[slaveId].total = updatedTotal.get();
+
+ // Update the total and allocated resources in each sorter.
CHECK(frameworkSorters.contains(role));
const Owned<Sorter>& frameworkSorter = frameworkSorters[role];
@@ -628,18 +642,35 @@ void HierarchicalAllocatorProcess::updateAllocation(
CHECK_SOME(updatedFrameworkAllocation);
+ // Update the total and allocated resources in the framework sorter
+ // for the current role.
+ frameworkSorter->remove(slaveId, frameworkAllocation);
+ frameworkSorter->add(slaveId, updatedFrameworkAllocation.get());
+
frameworkSorter->update(
frameworkId.value(),
slaveId,
frameworkAllocation,
updatedFrameworkAllocation.get());
+ // Update the total and allocated resources in the role sorter.
+ roleSorter->remove(slaveId, frameworkAllocation);
+ roleSorter->add(slaveId, updatedFrameworkAllocation.get());
+
roleSorter->update(
role,
slaveId,
frameworkAllocation,
updatedFrameworkAllocation.get());
+ // Update the total and allocated resources in the quota role
+ // sorter. Note that we always update the quota role sorter's total
+ // resources; we only update its allocated resources if this role
+ // has quota set.
+ quotaRoleSorter->remove(slaveId, frameworkAllocation.nonRevocable());
+ quotaRoleSorter->add(
+ slaveId, updatedFrameworkAllocation.get().nonRevocable());
+
if (quotas.contains(role)) {
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
quotaRoleSorter->update(
@@ -649,19 +680,6 @@ void HierarchicalAllocatorProcess::updateAllocation(
updatedFrameworkAllocation.get().nonRevocable());
}
- Try<Resources> updatedSlaveAllocation =
- slaves[slaveId].allocated.apply(operations);
-
- CHECK_SOME(updatedSlaveAllocation);
-
- slaves[slaveId].allocated = updatedSlaveAllocation.get();
-
- // Update the total resources.
- Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
- CHECK_SOME(updatedTotal);
-
- slaves[slaveId].total = updatedTotal.get();
-
LOG(INFO) << "Updated allocation of framework " << frameworkId
<< " on agent " << slaveId
<< " from " << frameworkAllocation
http://git-wip-us.apache.org/repos/asf/mesos/blob/e5f73dc7/src/master/allocator/sorter/drf/sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp
index a81ab6e..7df4dd6 100644
--- a/src/master/allocator/sorter/drf/sorter.cpp
+++ b/src/master/allocator/sorter/drf/sorter.cpp
@@ -191,11 +191,6 @@ void DRFSorter::update(
const Resources newAllocationQuantity =
newAllocation.createStrippedScalarQuantity();
- CHECK(total_.scalarQuantities.contains(oldAllocationQuantity));
-
- total_.scalarQuantities -= oldAllocationQuantity;
- total_.scalarQuantities += newAllocationQuantity;
-
CHECK(allocations[name].resources[slaveId].contains(oldAllocation));
CHECK(allocations[name].scalarQuantities.contains(oldAllocationQuantity));
http://git-wip-us.apache.org/repos/asf/mesos/blob/e5f73dc7/src/tests/persistent_volume_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_endpoints_tests.cpp b/src/tests/persistent_volume_endpoints_tests.cpp
index 0324be8..2a22f3b 100644
--- a/src/tests/persistent_volume_endpoints_tests.cpp
+++ b/src/tests/persistent_volume_endpoints_tests.cpp
@@ -1781,6 +1781,142 @@ TEST_F(PersistentVolumeEndpointsTest, EndpointCreateThenOfferRemove)
}
+// This test checks that a combination of HTTP endpoint reservations,
+// framework reservations and unreservations, and slave removals works
+// correctly. See MESOS-5698 for context.
+TEST_F(PersistentVolumeEndpointsTest, ReserveAndSlaveRemoval)
+{
+ TestAllocator<> allocator;
+
+ EXPECT_CALL(allocator, initialize(_, _, _, _, _));
+
+ Try<Owned<cluster::Master>> master = StartMaster(&allocator);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Future<SlaveID> slave1Id;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slave1Id)));
+
+ slave::Flags slave1Flags = CreateSlaveFlags();
+ slave1Flags.resources = "cpus:4";
+ Try<Owned<cluster::Slave>> slave1 = StartSlave(detector.get(), slave1Flags);
+
+ ASSERT_SOME(slave1);
+ AWAIT_READY(slave1Id);
+
+ Future<SlaveID> slave2Id;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slave2Id)));
+
+ // Each slave needs its own flags to ensure work_dirs are unique.
+ slave::Flags slave2Flags = CreateSlaveFlags();
+ slave2Flags.resources = "cpus:3";
+ Try<Owned<cluster::Slave>> slave2 = StartSlave(detector.get(), slave2Flags);
+
+ ASSERT_SOME(slave2);
+ AWAIT_READY(slave2Id);
+
+ FrameworkInfo frameworkInfo = createFrameworkInfo();
+
+ // Reserve all CPUs on `slave1` via HTTP endpoint.
+ Resources slave1Unreserved = Resources::parse("cpus:4").get();
+ Resources slave1Reserved = slave1Unreserved.flatten(
+ frameworkInfo.role(),
+ createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ Future<Response> response = process::http::post(
+ master.get()->pid,
+ "reserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slave1Id.get(), "resources", slave1Reserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, response);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(allocator, addFramework(_, _, _));
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer>> offers;
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(2u, offers.get().size());
+
+ Future<CheckpointResourcesMessage> checkpointResources =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(),
+ master.get()->pid,
+ slave2.get()->pid);
+
+ // Use the offers API to reserve all CPUs on `slave2`.
+ Resources slave2Unreserved = Resources::parse("cpus:3").get();
+ Resources slave2Reserved = slave2Unreserved.flatten(
+ frameworkInfo.role(),
+ createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ for (size_t i = 0; i < offers.get().size(); i++) {
+ Offer offer = offers.get()[i];
+ SlaveID offeredSlaveId = offer.slave_id();
+
+ ASSERT_TRUE(offeredSlaveId == slave1Id.get() ||
+ offeredSlaveId == slave2Id.get());
+
+ if (offeredSlaveId == slave2Id.get()) {
+ driver.acceptOffers({offer.id()}, {RESERVE(slave2Reserved)});
+ break;
+ }
+ }
+
+ AWAIT_READY(checkpointResources);
+ EXPECT_EQ(Resources(checkpointResources.get().resources()),
+ slave2Reserved);
+
+ // Shutdown `slave2` with an explicit shutdown message.
+ Future<Nothing> removeSlave;
+ EXPECT_CALL(allocator, removeSlave(_))
+ .WillOnce(DoAll(InvokeRemoveSlave(&allocator),
+ FutureSatisfy(&removeSlave)));
+
+ EXPECT_CALL(sched, offerRescinded(_, _));
+
+ Future<Nothing> slaveLost;
+ EXPECT_CALL(sched, slaveLost(_, _))
+ .WillOnce(FutureSatisfy(&slaveLost));
+
+ slave2.get()->shutdown();
+
+ AWAIT_READY(removeSlave);
+
+ AWAIT_READY(slaveLost);
+
+ response = process::http::post(
+ master.get()->pid,
+ "unreserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slave1Id.get(), "resources", slave1Reserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, response);
+
+ driver.stop();
+ driver.join();
+
+ EXPECT_CALL(allocator, removeSlave(_));
+}
+
+
// This tests that dynamic reservations and persistent volumes are
// reflected in the "/slaves" master endpoint.
TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)