You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2016/07/05 16:53:09 UTC
[5/5] mesos git commit: Fixed allocator to update total resources in
`quotaRoleSorter`.
Fixed allocator to update total resources in `quotaRoleSorter`.
Each `DRFSorter` tracks the total resources in the cluster. This means
that each sorter must be updated when the resources in the cluster have
changed, e.g., due to the creation of a dynamic reservation or a
persistent volume. In the previous implementation, the quota role sorter
was not updated for non-quota roles when a reservation or persistent
volume was created by a framework. This resulted in inconsistency
between the total resources in the allocator and the quota role sorter.
This could cause several problems. First, removing a slave from the
cluster would leak resources in the quota role sorter. Second, certain
interleavings of slave removals and `reserve`/`unreserve` operations by
frameworks and via HTTP endpoints could lead to `CHECK` failures.
Review: https://reviews.apache.org/r/49377/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/75230e04
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/75230e04
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/75230e04
Branch: refs/heads/0.28.x
Commit: 75230e04895313bb9f4d9b9f655d2c9c713b3a3a
Parents: 3b0efb3
Author: Neil Conway <ne...@gmail.com>
Authored: Mon Jul 4 15:38:42 2016 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Tue Jul 5 18:51:46 2016 +0200
----------------------------------------------------------------------
src/master/allocator/mesos/hierarchical.cpp | 52 ++++---
src/master/allocator/sorter/drf/sorter.cpp | 5 -
src/tests/persistent_volume_endpoints_tests.cpp | 136 +++++++++++++++++++
3 files changed, 171 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/75230e04/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 108a1df..34cea7c 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -607,11 +607,25 @@ void HierarchicalAllocatorProcess::updateAllocation(
const string& role = frameworks[frameworkId].role;
- // Here we apply offer operations to the allocated resources, which
- // in turns leads to an update of the total. The available resources
- // remain unchanged.
+ // Here we apply offer operations to the allocated and total
+ // resources in the allocator and each of the sorters. The available
+ // resources remain unchanged.
- // Update the allocated resources.
+ // Update the per-slave allocation.
+ Try<Resources> updatedSlaveAllocation =
+ slaves[slaveId].allocated.apply(operations);
+
+ CHECK_SOME(updatedSlaveAllocation);
+
+ slaves[slaveId].allocated = updatedSlaveAllocation.get();
+
+ // Update the total resources.
+ Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
+ CHECK_SOME(updatedTotal);
+
+ slaves[slaveId].total = updatedTotal.get();
+
+ // Update the total and allocated resources in each sorter.
CHECK(frameworkSorters.contains(role));
const Owned<Sorter>& frameworkSorter = frameworkSorters[role];
@@ -623,18 +637,35 @@ void HierarchicalAllocatorProcess::updateAllocation(
CHECK_SOME(updatedFrameworkAllocation);
+ // Update the total and allocated resources in the framework sorter
+ // for the current role.
+ frameworkSorter->remove(slaveId, frameworkAllocation);
+ frameworkSorter->add(slaveId, updatedFrameworkAllocation.get());
+
frameworkSorter->update(
frameworkId.value(),
slaveId,
frameworkAllocation,
updatedFrameworkAllocation.get());
+ // Update the total and allocated resources in the role sorter.
+ roleSorter->remove(slaveId, frameworkAllocation);
+ roleSorter->add(slaveId, updatedFrameworkAllocation.get());
+
roleSorter->update(
role,
slaveId,
frameworkAllocation,
updatedFrameworkAllocation.get());
+ // Update the total and allocated resources in the quota role
+ // sorter. Note that we always update the quota role sorter's total
+ // resources; we only update its allocated resources if this role
+ // has quota set.
+ quotaRoleSorter->remove(slaveId, frameworkAllocation.nonRevocable());
+ quotaRoleSorter->add(
+ slaveId, updatedFrameworkAllocation.get().nonRevocable());
+
if (quotas.contains(role)) {
// See comment at `quotaRoleSorter` declaration regarding non-revocable.
quotaRoleSorter->update(
@@ -644,19 +675,6 @@ void HierarchicalAllocatorProcess::updateAllocation(
updatedFrameworkAllocation.get().nonRevocable());
}
- Try<Resources> updatedSlaveAllocation =
- slaves[slaveId].allocated.apply(operations);
-
- CHECK_SOME(updatedSlaveAllocation);
-
- slaves[slaveId].allocated = updatedSlaveAllocation.get();
-
- // Update the total resources.
- Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
- CHECK_SOME(updatedTotal);
-
- slaves[slaveId].total = updatedTotal.get();
-
LOG(INFO) << "Updated allocation of framework " << frameworkId
<< " on slave " << slaveId
<< " from " << frameworkAllocation
http://git-wip-us.apache.org/repos/asf/mesos/blob/75230e04/src/master/allocator/sorter/drf/sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp
index a8d83c1..0db6b02 100644
--- a/src/master/allocator/sorter/drf/sorter.cpp
+++ b/src/master/allocator/sorter/drf/sorter.cpp
@@ -142,11 +142,6 @@ void DRFSorter::update(
const Resources newAllocationQuantity =
newAllocation.createStrippedScalarQuantity();
- CHECK(total_.scalarQuantities.contains(oldAllocationQuantity));
-
- total_.scalarQuantities -= oldAllocationQuantity;
- total_.scalarQuantities += newAllocationQuantity;
-
CHECK(allocations[name].resources[slaveId].contains(oldAllocation));
CHECK(allocations[name].scalarQuantities.contains(oldAllocationQuantity));
http://git-wip-us.apache.org/repos/asf/mesos/blob/75230e04/src/tests/persistent_volume_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_endpoints_tests.cpp b/src/tests/persistent_volume_endpoints_tests.cpp
index 2a74acb..5922578 100644
--- a/src/tests/persistent_volume_endpoints_tests.cpp
+++ b/src/tests/persistent_volume_endpoints_tests.cpp
@@ -1649,6 +1649,142 @@ TEST_F(PersistentVolumeEndpointsTest, EndpointCreateThenOfferRemove)
}
+// This test checks that a combination of HTTP endpoint reservations,
+// framework reservations and unreservations, and slave removals works
+// correctly. See MESOS-5698 for context.
+TEST_F(PersistentVolumeEndpointsTest, ReserveAndSlaveRemoval)
+{
+ TestAllocator<> allocator;
+
+ EXPECT_CALL(allocator, initialize(_, _, _, _));
+
+ Try<PID<Master>> master = StartMaster(&allocator);
+ ASSERT_SOME(master);
+
+ Future<SlaveID> slave1Id;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slave1Id)));
+
+ slave::Flags slave1Flags = CreateSlaveFlags();
+ slave1Flags.resources = "cpus:4";
+ Try<PID<Slave>> slave1 = StartSlave(slave1Flags);
+
+ ASSERT_SOME(slave1);
+ AWAIT_READY(slave1Id);
+
+ Future<SlaveID> slave2Id;
+ EXPECT_CALL(allocator, addSlave(_, _, _, _, _))
+ .WillOnce(DoAll(InvokeAddSlave(&allocator),
+ FutureArg<0>(&slave2Id)));
+
+ // Each slave needs its own flags to ensure work_dirs are unique.
+ slave::Flags slave2Flags = CreateSlaveFlags();
+ slave2Flags.resources = "cpus:3";
+ Try<PID<Slave>> slave2 = StartSlave(slave2Flags);
+
+ ASSERT_SOME(slave2);
+ AWAIT_READY(slave2Id);
+
+ FrameworkInfo frameworkInfo = createFrameworkInfo();
+
+ // Reserve all CPUs on `slave1` via HTTP endpoint.
+ Resources slave1Unreserved = Resources::parse("cpus:4").get();
+ Resources slave1Reserved = slave1Unreserved.flatten(
+ frameworkInfo.role(),
+ createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ Future<Response> response = process::http::post(
+ master.get(),
+ "reserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slave1Id.get(), "resources", slave1Reserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(allocator, addFramework(_, _, _));
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer>> offers;
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(2u, offers.get().size());
+
+ Future<CheckpointResourcesMessage> checkpointResources =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(),
+ master.get(),
+ slave2.get());
+
+ // Use the offers API to reserve all CPUs on `slave2`.
+ Resources slave2Unreserved = Resources::parse("cpus:3").get();
+ Resources slave2Reserved = slave2Unreserved.flatten(
+ frameworkInfo.role(),
+ createReservationInfo(DEFAULT_CREDENTIAL.principal()));
+
+ for (size_t i = 0; i < offers.get().size(); i++) {
+ Offer offer = offers.get()[i];
+ SlaveID offeredSlaveId = offer.slave_id();
+
+ ASSERT_TRUE(offeredSlaveId == slave1Id.get() ||
+ offeredSlaveId == slave2Id.get());
+
+ if (offeredSlaveId == slave2Id.get()) {
+ driver.acceptOffers({offer.id()}, {RESERVE(slave2Reserved)});
+ break;
+ }
+ }
+
+ AWAIT_READY(checkpointResources);
+ EXPECT_EQ(Resources(checkpointResources.get().resources()),
+ slave2Reserved);
+
+ // Shutdown `slave2` with an explicit shutdown message.
+ Future<Nothing> removeSlave;
+ EXPECT_CALL(allocator, removeSlave(_))
+ .WillOnce(DoAll(InvokeRemoveSlave(&allocator),
+ FutureSatisfy(&removeSlave)));
+
+ EXPECT_CALL(sched, offerRescinded(_, _));
+
+ Future<Nothing> slaveLost;
+ EXPECT_CALL(sched, slaveLost(_, _))
+ .WillOnce(FutureSatisfy(&slaveLost));
+
+ this->Stop(slave2.get(), true);
+
+ AWAIT_READY(removeSlave);
+
+ AWAIT_READY(slaveLost);
+
+ response = process::http::post(
+ master.get(),
+ "unreserve",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ createRequestBody(slave1Id.get(), "resources", slave1Reserved));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ driver.stop();
+ driver.join();
+
+ EXPECT_CALL(allocator, removeSlave(_));
+
+ Shutdown();
+}
+
+
// This tests that dynamic reservations and persistent volumes are
// reflected in the "/slaves" master endpoint.
TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)