You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/12/18 01:57:53 UTC
[2/2] mesos git commit: Updated Allocator to allow transforming
allocated resources.
Updated Allocator to allow transforming allocated resources.
Review: https://reviews.apache.org/r/29084
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dacc8829
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dacc8829
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dacc8829
Branch: refs/heads/master
Commit: dacc88292cc13d4b08fe8cda4df71110a96cb12a
Parents: 5a02d5b
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon Dec 15 20:35:49 2014 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Wed Dec 17 16:44:50 2014 -0800
----------------------------------------------------------------------
src/master/allocator.hpp | 33 +++++++++-
src/master/hierarchical_allocator_process.hpp | 72 ++++++++++++++++++++
src/tests/hierarchical_allocator_tests.cpp | 77 ++++++++++++++++++++++
src/tests/mesos.hpp | 19 ++++++
4 files changed, 198 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dacc8829/src/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator.hpp b/src/master/allocator.hpp
index f4068aa..224569a 100644
--- a/src/master/allocator.hpp
+++ b/src/master/allocator.hpp
@@ -27,6 +27,7 @@
#include <process/dispatch.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
+#include <process/shared.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
@@ -49,9 +50,11 @@ class AllocatorProcess; // Forward declaration.
// Basic model of an allocator: resources are allocated to a framework
// in the form of offers. A framework can refuse some resources in
-// offers and run tasks in others. Resources can be recovered from a
-// framework when tasks finish/fail (or are lost due to a slave
-// failure) or when an offer is rescinded.
+// offers and run tasks in others. Allocated resources can have
+// transformations applied to them in order for frameworks to alter
+// the resource metadata (e.g. persistent disk). Resources can be
+// recovered from a framework when tasks finish/fail (or are lost
+// due to a slave failure) or when an offer is rescinded.
//
// NOTE: DO NOT subclass this class when implementing a new allocator.
// Implement AllocatorProcess (above) instead!
@@ -115,6 +118,11 @@ public:
const FrameworkID& frameworkId,
const std::vector<Request>& requests);
+ void transformAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const process::Shared<Resources::Transformation>& transformation);
+
// Informs the allocator to recover resources that are considered
// used by the framework.
void recoverResources(
@@ -190,6 +198,11 @@ public:
const FrameworkID& frameworkId,
const std::vector<Request>& requests) = 0;
+ virtual void transformAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const process::Shared<Resources::Transformation>& transformation) = 0;
+
virtual void recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
@@ -340,6 +353,20 @@ inline void Allocator::requestResources(
}
+inline void Allocator::transformAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const process::Shared<Resources::Transformation>& transformation)
+{
+ process::dispatch(
+ process,
+ &AllocatorProcess::transformAllocation,
+ frameworkId,
+ slaveId,
+ transformation);
+}
+
+
inline void Allocator::recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/dacc8829/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index 95fa520..12eb1d1 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -112,6 +112,11 @@ public:
const FrameworkID& frameworkId,
const std::vector<Request>& requests);
+ void transformAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const process::Shared<Resources::Transformation>& transformation);
+
void recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
@@ -543,6 +548,73 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::requestResources(
template <class RoleSorter, class FrameworkSorter>
void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::transformAllocation(
+ const FrameworkID& frameworkId,
+ const SlaveID& slaveId,
+ const process::Shared<Resources::Transformation>& transformation)
+{
+ CHECK(initialized);
+ CHECK(slaves.contains(slaveId));
+ CHECK(frameworks.contains(frameworkId));
+
+ // The total resources on the slave are composed of both allocated
+ // and available resources:
+ //
+ // total = available + allocated
+ //
+ // Here we apply a transformation to the allocated resources,
+ // which in turns leads to a transformation of the total. The
+ // available resources remain unchanged.
+
+ FrameworkSorter* frameworkSorter =
+ frameworkSorters[frameworks[frameworkId].role];
+
+ Resources allocation = frameworkSorter->allocation(frameworkId.value());
+
+ // The role sorter should only contain the unreserved allocation.
+ CHECK_EQ(allocation.unreserved(),
+ roleSorter->allocation(frameworks[frameworkId].role));
+
+ // Update the allocated resources.
+ // TODO(bmahler): Check transformation invariants! Namely,
+ // we don't want the quantity or the static roles of the
+ // allocation to be altered.
+ Try<Resources> transformedAllocation = (*transformation)(allocation);
+
+ CHECK_SOME(transformedAllocation);
+
+ frameworkSorter->transform(
+ frameworkId.value(),
+ allocation,
+ transformedAllocation.get());
+
+ roleSorter->transform(
+ frameworks[frameworkId].role,
+ allocation.unreserved(),
+ transformedAllocation.get().unreserved());
+
+ // Update the total resources.
+ // TODO(bmahler): Check transformation invariants! Namely,
+ // we don't want the quantity or the static roles of the
+ // total to be altered.
+ Try<Resources> transformedTotal = (*transformation)(slaves[slaveId].total);
+
+ CHECK_SOME(transformedTotal);
+
+ slaves[slaveId].total = transformedTotal.get();
+
+ // The available resources should be unaffected.
+ CHECK_EQ(slaves[slaveId].total - transformedAllocation.get(),
+ slaves[slaveId].available);
+
+ LOG(INFO) << "Updated allocation of framework " << frameworkId
+ << " on slave " << slaveId
+ << " from " << allocation << " to " << transformedAllocation.get();
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
const FrameworkID& frameworkId,
const SlaveID& slaveId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/dacc8829/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 1f62738..7c05123 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -25,8 +25,10 @@
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gtest.hpp>
+#include <process/shared.hpp>
#include <process/queue.hpp>
+#include <stout/gtest.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/utils.hpp>
@@ -48,6 +50,7 @@ using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess;
using process::Clock;
using process::Future;
+using process::Shared;
using std::queue;
using std::string;
@@ -666,6 +669,80 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
}
+// This test ensures that frameworks can apply resource
+// transformations on their allocations. This allows them
+// to augment the resource metadata (e.g. persistent disk).
+TEST_F(HierarchicalAllocatorTest, TransformAllocation)
+{
+ Clock::pause();
+ initialize(vector<string>{"role1"});
+
+ hashmap<FrameworkID, Resources> EMPTY;
+
+ SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
+ allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+
+ // Initially, all the resources are allocated.
+ FrameworkInfo framework = createFrameworkInfo("role1");
+ allocator->addFramework(framework.id(), framework, Resources());
+
+ Future<Allocation> allocation = queue.get();
+ AWAIT_READY(allocation);
+ EXPECT_EQ(framework.id(), allocation.get().frameworkId);
+ EXPECT_EQ(1u, allocation.get().resources.size());
+ EXPECT_TRUE(allocation.get().resources.contains(slave.id()));
+ EXPECT_EQ(slave.resources(), sum(allocation.get().resources.values()));
+
+ // Construct a transformation for the framework's allocation.
+ Resource disk = Resources::parse("disk", "5", "*").get();
+ disk.mutable_disk()->mutable_persistence()->set_id("ID");
+ disk.mutable_disk()->mutable_volume()->set_container_path("data");
+
+ Shared<Resources::Transformation> transformation(
+ new Resources::AcquirePersistentDisk(disk));
+
+ // Ensure the transformation can be applied.
+ Try<Resources> transformed = (*transformation)(
+ sum(allocation.get().resources.values()));
+
+ ASSERT_SOME(transformed);
+
+ // Transform the allocation in the allocator.
+ allocator->transformAllocation(
+ framework.id(),
+ slave.id(),
+ transformation);
+
+ // Now recover the resources, and expect the next allocation
+ // to contain the disk transformation!
+ allocator->recoverResources(
+ framework.id(),
+ slave.id(),
+ transformed.get(),
+ None());
+
+ Clock::advance(flags.allocation_interval);
+
+ allocation = queue.get();
+ AWAIT_READY(allocation);
+ EXPECT_EQ(framework.id(), allocation.get().frameworkId);
+ EXPECT_EQ(1u, allocation.get().resources.size());
+ EXPECT_TRUE(allocation.get().resources.contains(slave.id()));
+
+ // The allocation should be the slave's resources with the
+ // disk transformation applied.
+ transformed = (*transformation)(slave.resources());
+
+ ASSERT_SOME(transformed);
+
+ EXPECT_NE(Resources(slave.resources()),
+ sum(allocation.get().resources.values()));
+
+ EXPECT_EQ(transformed.get(),
+ sum(allocation.get().resources.values()));
+}
+
+
// Checks that a slave that is not whitelisted will not have its
// resources get offered, and that if the whitelist is updated so
// that it is whitelisted, its resources will then be offered.
http://git-wip-us.apache.org/repos/asf/mesos/blob/dacc8829/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index bb24222..f93c3f1 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -719,6 +719,9 @@ public:
ON_CALL(*this, requestResources(_, _))
.WillByDefault(InvokeResourcesRequested(this));
+ ON_CALL(*this, transformAllocation(_, _, _))
+ .WillByDefault(InvokeTransformAllocation(this));
+
ON_CALL(*this, recoverResources(_, _, _, _))
.WillByDefault(InvokeResourcesRecovered(this));
@@ -775,6 +778,11 @@ public:
const FrameworkID&,
const std::vector<Request>&));
+ MOCK_METHOD3(transformAllocation, void(
+ const FrameworkID&,
+ const SlaveID&,
+ const process::Shared<Resources::Transformation>&));
+
MOCK_METHOD4(recoverResources, void(
const FrameworkID&,
const SlaveID&,
@@ -903,6 +911,17 @@ ACTION_P(InvokeResourcesRequested, allocator)
}
+ACTION_P(InvokeTransformAllocation, allocator)
+{
+ process::dispatch(
+ allocator->real,
+ &master::allocator::AllocatorProcess::transformAllocation,
+ arg0,
+ arg1,
+ arg2);
+}
+
+
ACTION_P(InvokeResourcesRecovered, allocator)
{
process::dispatch(