You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/07/25 00:43:31 UTC
mesos git commit: Added a new API call 'updateAvailable' to the
allocator.
Repository: mesos
Updated Branches:
refs/heads/master cbb209991 -> dc2e130a7
Added a new API call 'updateAvailable' to the allocator.
Needed to implement the master HTTP endpoints: `/reserve`, `/unreserve`,
`/create` and `/destroy`.
This API is similar to `updateSlave` which is currently very specific to
oversubscription. I considered consolidating `updateAvailable` and
`updateSlave` but it will require making offers be generated within the
allocator to enable this.
In specific, `updateAvailable` could fail if there aren't sufficient
available resources to update, whereas `updateSlave` avoids failing by
keeping the allocator in an over-allocated state. For `updateSlave`,
leaving the allocator in an over-allocated state is ok. This is because
it does not modify resources therefore `total - allocated` will work out
to __empty__. `updateAvailable` cannot leave the allocator in an
over-allocated state, because it modifies resources, and therefore
`total - allocated` is not guaranteed to yield __empty__.
Review: https://reviews.apache.org/r/35947
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dc2e130a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dc2e130a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dc2e130a
Branch: refs/heads/master
Commit: dc2e130a7faccb6ee28c207c8337cb58dfc3ca5c
Parents: cbb2099
Author: Michael Park <mc...@gmail.com>
Authored: Fri Jul 24 15:43:00 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Jul 24 15:43:01 2015 -0700
----------------------------------------------------------------------
include/mesos/master/allocator.hpp | 6 ++
src/master/allocator/mesos/allocator.hpp | 23 +++++++
src/master/allocator/mesos/hierarchical.hpp | 46 ++++++++++++++
src/tests/hierarchical_allocator_tests.cpp | 80 ++++++++++++++++++++++++
src/tests/mesos.hpp | 15 +++++
5 files changed, 170 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 22992c0..659f37b 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -27,6 +27,8 @@
#include <mesos/resources.hpp>
+#include <process/future.hpp>
+
#include <stout/duration.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
@@ -128,6 +130,10 @@ public:
const SlaveID& slaveId,
const std::vector<Offer::Operation>& operations) = 0;
+ virtual process::Future<Nothing> updateAvailable(
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations) = 0;
+
// Informs the Allocator to recover resources that are considered
// used by the framework.
virtual void recoverResources(
http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 72470ec..aa55755 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -22,6 +22,7 @@
#include <mesos/master/allocator.hpp>
#include <process/dispatch.hpp>
+#include <process/future.hpp>
#include <process/process.hpp>
#include <stout/try.hpp>
@@ -102,6 +103,10 @@ public:
const SlaveID& slaveId,
const std::vector<Offer::Operation>& operations);
+ process::Future<Nothing> updateAvailable(
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations);
+
void recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
@@ -188,6 +193,10 @@ public:
const SlaveID& slaveId,
const std::vector<Offer::Operation>& operations) = 0;
+ virtual process::Future<Nothing> updateAvailable(
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations) = 0;
+
virtual void recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
@@ -406,6 +415,20 @@ inline void MesosAllocator<AllocatorProcess>::updateAllocation(
template <typename AllocatorProcess>
+inline process::Future<Nothing>
+MesosAllocator<AllocatorProcess>::updateAvailable(
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations)
+{
+ return process::dispatch(
+ process,
+ &MesosAllocatorProcess::updateAvailable,
+ slaveId,
+ operations);
+}
+
+
+template <typename AllocatorProcess>
inline void MesosAllocator<AllocatorProcess>::recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 3264d14..eaf9c6a 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -27,6 +27,7 @@
#include <process/event.hpp>
#include <process/delay.hpp>
+#include <process/future.hpp>
#include <process/id.hpp>
#include <process/metrics/gauge.hpp>
#include <process/metrics/metrics.hpp>
@@ -139,6 +140,10 @@ public:
const SlaveID& slaveId,
const std::vector<Offer::Operation>& operations);
+ process::Future<Nothing> updateAvailable(
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations);
+
void recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
@@ -716,6 +721,47 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAllocation(
template <class RoleSorter, class FrameworkSorter>
+process::Future<Nothing>
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAvailable(
+ const SlaveID& slaveId,
+ const std::vector<Offer::Operation>& operations)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+
+ Resources available = slaves[slaveId].total - slaves[slaveId].allocated;
+
+ // It's possible for this 'apply' to fail here because a call to
+ // 'allocate' could have been enqueued by the allocator itself
+ // just before master's request to enqueue 'updateAvailable'
+ // arrives to the allocator.
+ //
+ // Master -------R------------
+ // \___
+ // \
+ // Allocator --A-----A-U---A--
+ // \___/ \___/
+ //
+ // where A = allocate, R = reserve, U = updateAvailable
+ Try<Resources> updatedAvailable = available.apply(operations);
+ if (updatedAvailable.isError()) {
+ return process::Failure(updatedAvailable.error());
+ }
+
+ // Update the total resources.
+ Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
+ CHECK_SOME(updatedTotal);
+
+ slaves[slaveId].total = updatedTotal.get();
+
+ // Now, update the total resources in the role sorter.
+ roleSorter->update(slaveId, slaves[slaveId].total.unreserved());
+
+ return Nothing();
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
const FrameworkID& frameworkId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 3258840..c92d47a 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -782,6 +782,86 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
}
+// This test ensures that a call to 'updateAvailable' succeeds when the
+// allocator has sufficient available resources.
+TEST_F(HierarchicalAllocatorTest, UpdateAvailableSuccess)
+{
+ initialize(vector<string>{"role1"});
+
+ hashmap<FrameworkID, Resources> EMPTY;
+
+ SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
+ allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+
+ // Construct an offer operation for the framework's allocation.
+ Resources unreserved = Resources::parse("cpus:25;mem:50").get();
+ Resources dynamicallyReserved =
+ unreserved.flatten("role1", createReservationInfo("ops"));
+
+ Offer::Operation reserve = RESERVE(dynamicallyReserved);
+
+ // Update the allocation in the allocator.
+ Future<Nothing> update = allocator->updateAvailable(slave.id(), {reserve});
+ AWAIT_EXPECT_READY(update);
+
+ // Expect to receive the updated available resources.
+ FrameworkInfo framework = createFrameworkInfo("role1");
+ allocator->addFramework(
+ framework.id(), framework, hashmap<SlaveID, Resources>());
+
+ Future<Allocation> allocation = queue.get();
+ AWAIT_READY(allocation);
+ EXPECT_EQ(framework.id(), allocation.get().frameworkId);
+ EXPECT_EQ(1u, allocation.get().resources.size());
+ EXPECT_TRUE(allocation.get().resources.contains(slave.id()));
+
+ // The allocation should be the slave's resources with the offer
+ // operation applied.
+ Try<Resources> updated = Resources(slave.resources()).apply(reserve);
+ ASSERT_SOME(updated);
+
+ EXPECT_NE(Resources(slave.resources()),
+ Resources::sum(allocation.get().resources));
+
+ EXPECT_EQ(updated.get(), Resources::sum(allocation.get().resources));
+}
+
+
+// This test ensures that a call to 'updateAvailable' fails when the
+// allocator has insufficient available resources.
+TEST_F(HierarchicalAllocatorTest, UpdateAvailableFail)
+{
+ initialize(vector<string>{"role1"});
+
+ hashmap<FrameworkID, Resources> EMPTY;
+
+ SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
+ allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+
+ // Expect to receive the all of the available resources.
+ FrameworkInfo framework = createFrameworkInfo("role1");
+ allocator->addFramework(
+ framework.id(), framework, hashmap<SlaveID, Resources>());
+
+ Future<Allocation> allocation = queue.get();
+ AWAIT_READY(allocation);
+ EXPECT_EQ(framework.id(), allocation.get().frameworkId);
+ EXPECT_EQ(1u, allocation.get().resources.size());
+ EXPECT_TRUE(allocation.get().resources.contains(slave.id()));
+ EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources));
+
+ // Construct an offer operation for the framework's allocation.
+ Resources unreserved = Resources::parse("cpus:25;mem:50").get();
+ Resources dynamicallyReserved =
+ unreserved.flatten("role1", createReservationInfo("ops"));
+
+ Offer::Operation reserve = RESERVE(dynamicallyReserved);
+
+ // Update the allocation in the allocator.
+ Future<Nothing> update = allocator->updateAvailable(slave.id(), {reserve});
+ AWAIT_EXPECT_FAILED(update);
+}
+
// This test ensures that when oversubscribed resources are updated
// subsequent allocations properly account for that.
TEST_F(HierarchicalAllocatorTest, UpdateSlave)
http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 69134e1..8a76b4f 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1036,6 +1036,12 @@ ACTION_P(InvokeUpdateAllocation, allocator)
}
+ACTION_P(InvokeUpdateResources, allocator)
+{
+ return allocator->real->updateAvailable(arg0, arg1);
+}
+
+
ACTION_P(InvokeRecoverResources, allocator)
{
allocator->real->recoverResources(arg0, arg1, arg2, arg3);
@@ -1154,6 +1160,11 @@ public:
EXPECT_CALL(*this, updateAllocation(_, _, _))
.WillRepeatedly(DoDefault());
+ ON_CALL(*this, updateAvailable(_, _))
+ .WillByDefault(InvokeUpdateResources(this));
+ EXPECT_CALL(*this, updateAvailable(_, _))
+ .WillRepeatedly(DoDefault());
+
ON_CALL(*this, recoverResources(_, _, _, _))
.WillByDefault(InvokeRecoverResources(this));
EXPECT_CALL(*this, recoverResources(_, _, _, _))
@@ -1223,6 +1234,10 @@ public:
const SlaveID&,
const std::vector<Offer::Operation>&));
+ MOCK_METHOD2(updateAvailable, process::Future<Nothing>(
+ const SlaveID&,
+ const std::vector<Offer::Operation>&));
+
MOCK_METHOD4(recoverResources, void(
const FrameworkID&,
const SlaveID&,