You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/12/18 01:57:53 UTC

[2/2] mesos git commit: Updated Allocator to allow transforming allocated resources.

Updated Allocator to allow transforming allocated resources.

Review: https://reviews.apache.org/r/29084


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dacc8829
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dacc8829
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dacc8829

Branch: refs/heads/master
Commit: dacc88292cc13d4b08fe8cda4df71110a96cb12a
Parents: 5a02d5b
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon Dec 15 20:35:49 2014 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Wed Dec 17 16:44:50 2014 -0800

----------------------------------------------------------------------
 src/master/allocator.hpp                      | 33 +++++++++-
 src/master/hierarchical_allocator_process.hpp | 72 ++++++++++++++++++++
 src/tests/hierarchical_allocator_tests.cpp    | 77 ++++++++++++++++++++++
 src/tests/mesos.hpp                           | 19 ++++++
 4 files changed, 198 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dacc8829/src/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator.hpp b/src/master/allocator.hpp
index f4068aa..224569a 100644
--- a/src/master/allocator.hpp
+++ b/src/master/allocator.hpp
@@ -27,6 +27,7 @@
 #include <process/dispatch.hpp>
 #include <process/pid.hpp>
 #include <process/process.hpp>
+#include <process/shared.hpp>
 
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
@@ -49,9 +50,11 @@ class AllocatorProcess; // Forward declaration.
 
 // Basic model of an allocator: resources are allocated to a framework
 // in the form of offers. A framework can refuse some resources in
-// offers and run tasks in others. Resources can be recovered from a
-// framework when tasks finish/fail (or are lost due to a slave
-// failure) or when an offer is rescinded.
+// offers and run tasks in others. Allocated resources can have
+// transformations applied to them in order for frameworks to alter
+// the resource metadata (e.g. persistent disk). Resources can be
+// recovered from a framework when tasks finish/fail (or are lost
+// due to a slave failure) or when an offer is rescinded.
 //
 // NOTE: DO NOT subclass this class when implementing a new allocator.
 // Implement AllocatorProcess (above) instead!
@@ -115,6 +118,11 @@ public:
       const FrameworkID& frameworkId,
       const std::vector<Request>& requests);
 
+  void transformAllocation(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const process::Shared<Resources::Transformation>& transformation);
+
   // Informs the allocator to recover resources that are considered
   // used by the framework.
   void recoverResources(
@@ -190,6 +198,11 @@ public:
       const FrameworkID& frameworkId,
       const std::vector<Request>& requests) = 0;
 
+  virtual void transformAllocation(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const process::Shared<Resources::Transformation>& transformation) = 0;
+
   virtual void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -340,6 +353,20 @@ inline void Allocator::requestResources(
 }
 
 
+inline void Allocator::transformAllocation(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const process::Shared<Resources::Transformation>& transformation)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::transformAllocation,
+      frameworkId,
+      slaveId,
+      transformation);
+}
+
+
 inline void Allocator::recoverResources(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/dacc8829/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index 95fa520..12eb1d1 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -112,6 +112,11 @@ public:
       const FrameworkID& frameworkId,
       const std::vector<Request>& requests);
 
+  void transformAllocation(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const process::Shared<Resources::Transformation>& transformation);
+
   void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -543,6 +548,73 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::requestResources(
 
 template <class RoleSorter, class FrameworkSorter>
 void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::transformAllocation(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const process::Shared<Resources::Transformation>& transformation)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+  CHECK(frameworks.contains(frameworkId));
+
+  // The total resources on the slave are composed of both allocated
+  // and available resources:
+  //
+  //    total = available + allocated
+  //
+  // Here we apply a transformation to the allocated resources,
+  // which in turns leads to a transformation of the total. The
+  // available resources remain unchanged.
+
+  FrameworkSorter* frameworkSorter =
+    frameworkSorters[frameworks[frameworkId].role];
+
+  Resources allocation = frameworkSorter->allocation(frameworkId.value());
+
+  // The role sorter should only contain the unreserved allocation.
+  CHECK_EQ(allocation.unreserved(),
+           roleSorter->allocation(frameworks[frameworkId].role));
+
+  // Update the allocated resources.
+  // TODO(bmahler): Check transformation invariants! Namely,
+  // we don't want the quantity or the static roles of the
+  // allocation to be altered.
+  Try<Resources> transformedAllocation = (*transformation)(allocation);
+
+  CHECK_SOME(transformedAllocation);
+
+  frameworkSorter->transform(
+      frameworkId.value(),
+      allocation,
+      transformedAllocation.get());
+
+  roleSorter->transform(
+      frameworks[frameworkId].role,
+      allocation.unreserved(),
+      transformedAllocation.get().unreserved());
+
+  // Update the total resources.
+  // TODO(bmahler): Check transformation invariants! Namely,
+  // we don't want the quantity or the static roles of the
+  // total to be altered.
+  Try<Resources> transformedTotal = (*transformation)(slaves[slaveId].total);
+
+  CHECK_SOME(transformedTotal);
+
+  slaves[slaveId].total = transformedTotal.get();
+
+  // The available resources should be unaffected.
+  CHECK_EQ(slaves[slaveId].total - transformedAllocation.get(),
+           slaves[slaveId].available);
+
+  LOG(INFO) << "Updated allocation of framework " << frameworkId
+            << " on slave " << slaveId
+            << " from " << allocation << " to " << transformedAllocation.get();
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/dacc8829/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 1f62738..7c05123 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -25,8 +25,10 @@
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gtest.hpp>
+#include <process/shared.hpp>
 #include <process/queue.hpp>
 
+#include <stout/gtest.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
 #include <stout/utils.hpp>
@@ -48,6 +50,7 @@ using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess;
 
 using process::Clock;
 using process::Future;
+using process::Shared;
 
 using std::queue;
 using std::string;
@@ -666,6 +669,80 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
 }
 
 
+// This test ensures that frameworks can apply resource
+// transformations on their allocations. This allows them
+// to augment the resource metadata (e.g. persistent disk).
+TEST_F(HierarchicalAllocatorTest, TransformAllocation)
+{
+  Clock::pause();
+  initialize(vector<string>{"role1"});
+
+  hashmap<FrameworkID, Resources> EMPTY;
+
+  SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
+  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+
+  // Initially, all the resources are allocated.
+  FrameworkInfo framework = createFrameworkInfo("role1");
+  allocator->addFramework(framework.id(), framework, Resources());
+
+  Future<Allocation> allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(framework.id(), allocation.get().frameworkId);
+  EXPECT_EQ(1u, allocation.get().resources.size());
+  EXPECT_TRUE(allocation.get().resources.contains(slave.id()));
+  EXPECT_EQ(slave.resources(), sum(allocation.get().resources.values()));
+
+  // Construct a transformation for the framework's allocation.
+  Resource disk = Resources::parse("disk", "5", "*").get();
+  disk.mutable_disk()->mutable_persistence()->set_id("ID");
+  disk.mutable_disk()->mutable_volume()->set_container_path("data");
+
+  Shared<Resources::Transformation> transformation(
+      new Resources::AcquirePersistentDisk(disk));
+
+  // Ensure the transformation can be applied.
+  Try<Resources> transformed = (*transformation)(
+      sum(allocation.get().resources.values()));
+
+  ASSERT_SOME(transformed);
+
+  // Transform the allocation in the allocator.
+  allocator->transformAllocation(
+      framework.id(),
+      slave.id(),
+      transformation);
+
+  // Now recover the resources, and expect the next allocation
+  // to contain the disk transformation!
+  allocator->recoverResources(
+      framework.id(),
+      slave.id(),
+      transformed.get(),
+      None());
+
+  Clock::advance(flags.allocation_interval);
+
+  allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(framework.id(), allocation.get().frameworkId);
+  EXPECT_EQ(1u, allocation.get().resources.size());
+  EXPECT_TRUE(allocation.get().resources.contains(slave.id()));
+
+  // The allocation should be the slave's resources with the
+  // disk transformation applied.
+  transformed = (*transformation)(slave.resources());
+
+  ASSERT_SOME(transformed);
+
+  EXPECT_NE(Resources(slave.resources()),
+            sum(allocation.get().resources.values()));
+
+  EXPECT_EQ(transformed.get(),
+            sum(allocation.get().resources.values()));
+}
+
+
 // Checks that a slave that is not whitelisted will not have its
 // resources get offered, and that if the whitelist is updated so
 // that it is whitelisted, its resources will then be offered.

http://git-wip-us.apache.org/repos/asf/mesos/blob/dacc8829/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index bb24222..f93c3f1 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -719,6 +719,9 @@ public:
     ON_CALL(*this, requestResources(_, _))
       .WillByDefault(InvokeResourcesRequested(this));
 
+    ON_CALL(*this, transformAllocation(_, _, _))
+      .WillByDefault(InvokeTransformAllocation(this));
+
     ON_CALL(*this, recoverResources(_, _, _, _))
       .WillByDefault(InvokeResourcesRecovered(this));
 
@@ -775,6 +778,11 @@ public:
       const FrameworkID&,
       const std::vector<Request>&));
 
+  MOCK_METHOD3(transformAllocation, void(
+      const FrameworkID&,
+      const SlaveID&,
+      const process::Shared<Resources::Transformation>&));
+
   MOCK_METHOD4(recoverResources, void(
       const FrameworkID&,
       const SlaveID&,
@@ -903,6 +911,17 @@ ACTION_P(InvokeResourcesRequested, allocator)
 }
 
 
+ACTION_P(InvokeTransformAllocation, allocator)
+{
+  process::dispatch(
+      allocator->real,
+      &master::allocator::AllocatorProcess::transformAllocation,
+      arg0,
+      arg1,
+      arg2);
+}
+
+
 ACTION_P(InvokeResourcesRecovered, allocator)
 {
   process::dispatch(