You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/12/18 01:57:52 UTC

[1/2] mesos git commit: Updated Sorter to allow transforming allocated resources.

Repository: mesos
Updated Branches:
  refs/heads/master 7a6d4e652 -> dacc88292


Updated Sorter to allow transforming allocated resources.

Review: https://reviews.apache.org/r/29083


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5a02d5bd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5a02d5bd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5a02d5bd

Branch: refs/heads/master
Commit: 5a02d5bdc75d3b1149dcda519016374be06ec6bd
Parents: 7a6d4e6
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon Dec 15 20:28:08 2014 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Wed Dec 17 16:42:01 2014 -0800

----------------------------------------------------------------------
 src/master/drf_sorter.cpp  | 27 +++++++++++++++++++++++++++
 src/master/drf_sorter.hpp  |  4 ++++
 src/master/sorter.hpp      | 15 +++++++++++++++
 src/tests/sorter_tests.cpp | 33 +++++++++++++++++++++++++++++++++
 4 files changed, 79 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5a02d5bd/src/master/drf_sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/drf_sorter.cpp b/src/master/drf_sorter.cpp
index ebc0284..967c8aa 100644
--- a/src/master/drf_sorter.cpp
+++ b/src/master/drf_sorter.cpp
@@ -117,6 +117,33 @@ void DRFSorter::allocated(
 }
 
 
+void DRFSorter::transform(
+    const string& name,
+    const Resources& oldAllocation,
+    const Resources& newAllocation)
+{
+  CHECK(contains(name));
+
+  // TODO(bmahler): Check invariants between old and new allocations.
+  // Namely, the roles and quantities of resources should be the same!
+  // Otherwise, we need to ensure we re-calculate the shares, as
+  // is being currently done, for safety.
+
+  CHECK(resources.contains(oldAllocation));
+
+  resources -= oldAllocation;
+  resources += newAllocation;
+
+  CHECK(allocations[name].contains(oldAllocation));
+
+  allocations[name] -= oldAllocation;
+  allocations[name] += newAllocation;
+
+  // Just assume the total has changed, per the TODO above.
+  dirty = true;
+}
+
+
 Resources DRFSorter::allocation(
     const string& name)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/5a02d5bd/src/master/drf_sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/drf_sorter.hpp b/src/master/drf_sorter.hpp
index c47b56a..e0ec58b 100644
--- a/src/master/drf_sorter.hpp
+++ b/src/master/drf_sorter.hpp
@@ -76,6 +76,10 @@ public:
   virtual void allocated(const std::string& name,
                          const Resources& resources);
 
+  virtual void transform(const std::string& name,
+                         const Resources& oldAllocation,
+                         const Resources& newAllocation);
+
   virtual void unallocated(const std::string& name,
                            const Resources& resources);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/5a02d5bd/src/master/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/sorter.hpp b/src/master/sorter.hpp
index 0150f90..333c293 100644
--- a/src/master/sorter.hpp
+++ b/src/master/sorter.hpp
@@ -32,6 +32,13 @@ namespace allocator {
 // Sorters implement the logic for determining the
 // order in which users or frameworks should receive
 // resource allocations.
+//
+// TODO(bmahler): The total and allocated resources are currently
+// aggregated across slaves, which only works for scalar resources.
+// Also, persistent disks are a bit tricky because there will be
+// duplicated persistence IDs within the resources. Consider storing
+// maps keyed off of the slave ID to fix these issues.
+//
 // TODO(bmahler): Templatize this on Client, so that callers can
 // don't need to do string conversion, e.g. FrameworkID, string role,
 // etc.
@@ -57,6 +64,14 @@ public:
   virtual void allocated(const std::string& client,
                          const Resources& resources) = 0;
 
+  // Transforms a portion of the allocation for the client, in
+  // order to augment the resources with additional metadata.
+  // This means that the new allocation must not affect the static
+  // roles, or the overall quantities of resources!
+  virtual void transform(const std::string& client,
+                         const Resources& oldAllocation,
+                         const Resources& newAllocation) = 0;
+
   // Specify that resources have been unallocated from the given client.
   virtual void unallocated(const std::string& client,
                            const Resources& resources) = 0;

http://git-wip-us.apache.org/repos/asf/mesos/blob/5a02d5bd/src/tests/sorter_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/sorter_tests.cpp b/src/tests/sorter_tests.cpp
index c2f4aa1..56e5714 100644
--- a/src/tests/sorter_tests.cpp
+++ b/src/tests/sorter_tests.cpp
@@ -26,6 +26,8 @@
 
 #include <mesos/resources.hpp>
 
+#include <stout/gtest.hpp>
+
 #include "master/drf_sorter.hpp"
 
 using namespace mesos;
@@ -194,3 +196,34 @@ TEST(SorterTest, SplitResourceShares)
 
   EXPECT_EQ(list<string>({"a", "b"}), sorter.sort());
 }
+
+
+TEST(SorterTest, Transform)
+{
+  DRFSorter sorter;
+
+  sorter.add("a");
+  sorter.add("b");
+
+  sorter.add(Resources::parse("cpus:10;mem:10;disk:10").get());
+
+  sorter.allocated("a", Resources::parse("cpus:10;mem:10;disk:10").get());
+
+  // Construct a transformation.
+  Resource disk = Resources::parse("disk", "5", "*").get();
+  disk.mutable_disk()->mutable_persistence()->set_id("ID");
+  disk.mutable_disk()->mutable_volume()->set_container_path("data");
+
+  Resources::AcquirePersistentDisk transformation(disk);
+
+  // Compute the updated allocation.
+  Resources allocation = sorter.allocation("a");
+  Try<Resources> newAllocation = transformation(allocation);
+
+  ASSERT_SOME(newAllocation);
+
+  // Transform the resources for the client.
+  sorter.transform("a", allocation, newAllocation.get());
+
+  EXPECT_EQ(newAllocation.get(), sorter.allocation("a"));
+}


[2/2] mesos git commit: Updated Allocator to allow transforming allocated resources.

Posted by bm...@apache.org.
Updated Allocator to allow transforming allocated resources.

Review: https://reviews.apache.org/r/29084


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dacc8829
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dacc8829
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dacc8829

Branch: refs/heads/master
Commit: dacc88292cc13d4b08fe8cda4df71110a96cb12a
Parents: 5a02d5b
Author: Benjamin Mahler <be...@gmail.com>
Authored: Mon Dec 15 20:35:49 2014 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Wed Dec 17 16:44:50 2014 -0800

----------------------------------------------------------------------
 src/master/allocator.hpp                      | 33 +++++++++-
 src/master/hierarchical_allocator_process.hpp | 72 ++++++++++++++++++++
 src/tests/hierarchical_allocator_tests.cpp    | 77 ++++++++++++++++++++++
 src/tests/mesos.hpp                           | 19 ++++++
 4 files changed, 198 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dacc8829/src/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator.hpp b/src/master/allocator.hpp
index f4068aa..224569a 100644
--- a/src/master/allocator.hpp
+++ b/src/master/allocator.hpp
@@ -27,6 +27,7 @@
 #include <process/dispatch.hpp>
 #include <process/pid.hpp>
 #include <process/process.hpp>
+#include <process/shared.hpp>
 
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
@@ -49,9 +50,11 @@ class AllocatorProcess; // Forward declaration.
 
 // Basic model of an allocator: resources are allocated to a framework
 // in the form of offers. A framework can refuse some resources in
-// offers and run tasks in others. Resources can be recovered from a
-// framework when tasks finish/fail (or are lost due to a slave
-// failure) or when an offer is rescinded.
+// offers and run tasks in others. Allocated resources can have
+// transformations applied to them in order for frameworks to alter
+// the resource metadata (e.g. persistent disk). Resources can be
+// recovered from a framework when tasks finish/fail (or are lost
+// due to a slave failure) or when an offer is rescinded.
 //
 // NOTE: DO NOT subclass this class when implementing a new allocator.
 // Implement AllocatorProcess (above) instead!
@@ -115,6 +118,11 @@ public:
       const FrameworkID& frameworkId,
       const std::vector<Request>& requests);
 
+  void transformAllocation(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const process::Shared<Resources::Transformation>& transformation);
+
   // Informs the allocator to recover resources that are considered
   // used by the framework.
   void recoverResources(
@@ -190,6 +198,11 @@ public:
       const FrameworkID& frameworkId,
       const std::vector<Request>& requests) = 0;
 
+  virtual void transformAllocation(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const process::Shared<Resources::Transformation>& transformation) = 0;
+
   virtual void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -340,6 +353,20 @@ inline void Allocator::requestResources(
 }
 
 
+inline void Allocator::transformAllocation(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const process::Shared<Resources::Transformation>& transformation)
+{
+  process::dispatch(
+      process,
+      &AllocatorProcess::transformAllocation,
+      frameworkId,
+      slaveId,
+      transformation);
+}
+
+
 inline void Allocator::recoverResources(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/dacc8829/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index 95fa520..12eb1d1 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -112,6 +112,11 @@ public:
       const FrameworkID& frameworkId,
       const std::vector<Request>& requests);
 
+  void transformAllocation(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      const process::Shared<Resources::Transformation>& transformation);
+
   void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -543,6 +548,73 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::requestResources(
 
 template <class RoleSorter, class FrameworkSorter>
 void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::transformAllocation(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const process::Shared<Resources::Transformation>& transformation)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+  CHECK(frameworks.contains(frameworkId));
+
+  // The total resources on the slave are composed of both allocated
+  // and available resources:
+  //
+  //    total = available + allocated
+  //
+  // Here we apply a transformation to the allocated resources,
+  // which in turns leads to a transformation of the total. The
+  // available resources remain unchanged.
+
+  FrameworkSorter* frameworkSorter =
+    frameworkSorters[frameworks[frameworkId].role];
+
+  Resources allocation = frameworkSorter->allocation(frameworkId.value());
+
+  // The role sorter should only contain the unreserved allocation.
+  CHECK_EQ(allocation.unreserved(),
+           roleSorter->allocation(frameworks[frameworkId].role));
+
+  // Update the allocated resources.
+  // TODO(bmahler): Check transformation invariants! Namely,
+  // we don't want the quantity or the static roles of the
+  // allocation to be altered.
+  Try<Resources> transformedAllocation = (*transformation)(allocation);
+
+  CHECK_SOME(transformedAllocation);
+
+  frameworkSorter->transform(
+      frameworkId.value(),
+      allocation,
+      transformedAllocation.get());
+
+  roleSorter->transform(
+      frameworks[frameworkId].role,
+      allocation.unreserved(),
+      transformedAllocation.get().unreserved());
+
+  // Update the total resources.
+  // TODO(bmahler): Check transformation invariants! Namely,
+  // we don't want the quantity or the static roles of the
+  // total to be altered.
+  Try<Resources> transformedTotal = (*transformation)(slaves[slaveId].total);
+
+  CHECK_SOME(transformedTotal);
+
+  slaves[slaveId].total = transformedTotal.get();
+
+  // The available resources should be unaffected.
+  CHECK_EQ(slaves[slaveId].total - transformedAllocation.get(),
+           slaves[slaveId].available);
+
+  LOG(INFO) << "Updated allocation of framework " << frameworkId
+            << " on slave " << slaveId
+            << " from " << allocation << " to " << transformedAllocation.get();
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
+void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/dacc8829/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 1f62738..7c05123 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -25,8 +25,10 @@
 #include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gtest.hpp>
+#include <process/shared.hpp>
 #include <process/queue.hpp>
 
+#include <stout/gtest.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
 #include <stout/utils.hpp>
@@ -48,6 +50,7 @@ using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess;
 
 using process::Clock;
 using process::Future;
+using process::Shared;
 
 using std::queue;
 using std::string;
@@ -666,6 +669,80 @@ TEST_F(HierarchicalAllocatorTest, Allocatable)
 }
 
 
+// This test ensures that frameworks can apply resource
+// transformations on their allocations. This allows them
+// to augment the resource metadata (e.g. persistent disk).
+TEST_F(HierarchicalAllocatorTest, TransformAllocation)
+{
+  Clock::pause();
+  initialize(vector<string>{"role1"});
+
+  hashmap<FrameworkID, Resources> EMPTY;
+
+  SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
+  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+
+  // Initially, all the resources are allocated.
+  FrameworkInfo framework = createFrameworkInfo("role1");
+  allocator->addFramework(framework.id(), framework, Resources());
+
+  Future<Allocation> allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(framework.id(), allocation.get().frameworkId);
+  EXPECT_EQ(1u, allocation.get().resources.size());
+  EXPECT_TRUE(allocation.get().resources.contains(slave.id()));
+  EXPECT_EQ(slave.resources(), sum(allocation.get().resources.values()));
+
+  // Construct a transformation for the framework's allocation.
+  Resource disk = Resources::parse("disk", "5", "*").get();
+  disk.mutable_disk()->mutable_persistence()->set_id("ID");
+  disk.mutable_disk()->mutable_volume()->set_container_path("data");
+
+  Shared<Resources::Transformation> transformation(
+      new Resources::AcquirePersistentDisk(disk));
+
+  // Ensure the transformation can be applied.
+  Try<Resources> transformed = (*transformation)(
+      sum(allocation.get().resources.values()));
+
+  ASSERT_SOME(transformed);
+
+  // Transform the allocation in the allocator.
+  allocator->transformAllocation(
+      framework.id(),
+      slave.id(),
+      transformation);
+
+  // Now recover the resources, and expect the next allocation
+  // to contain the disk transformation!
+  allocator->recoverResources(
+      framework.id(),
+      slave.id(),
+      transformed.get(),
+      None());
+
+  Clock::advance(flags.allocation_interval);
+
+  allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(framework.id(), allocation.get().frameworkId);
+  EXPECT_EQ(1u, allocation.get().resources.size());
+  EXPECT_TRUE(allocation.get().resources.contains(slave.id()));
+
+  // The allocation should be the slave's resources with the
+  // disk transformation applied.
+  transformed = (*transformation)(slave.resources());
+
+  ASSERT_SOME(transformed);
+
+  EXPECT_NE(Resources(slave.resources()),
+            sum(allocation.get().resources.values()));
+
+  EXPECT_EQ(transformed.get(),
+            sum(allocation.get().resources.values()));
+}
+
+
 // Checks that a slave that is not whitelisted will not have its
 // resources get offered, and that if the whitelist is updated so
 // that it is whitelisted, its resources will then be offered.

http://git-wip-us.apache.org/repos/asf/mesos/blob/dacc8829/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index bb24222..f93c3f1 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -719,6 +719,9 @@ public:
     ON_CALL(*this, requestResources(_, _))
       .WillByDefault(InvokeResourcesRequested(this));
 
+    ON_CALL(*this, transformAllocation(_, _, _))
+      .WillByDefault(InvokeTransformAllocation(this));
+
     ON_CALL(*this, recoverResources(_, _, _, _))
       .WillByDefault(InvokeResourcesRecovered(this));
 
@@ -775,6 +778,11 @@ public:
       const FrameworkID&,
       const std::vector<Request>&));
 
+  MOCK_METHOD3(transformAllocation, void(
+      const FrameworkID&,
+      const SlaveID&,
+      const process::Shared<Resources::Transformation>&));
+
   MOCK_METHOD4(recoverResources, void(
       const FrameworkID&,
       const SlaveID&,
@@ -903,6 +911,17 @@ ACTION_P(InvokeResourcesRequested, allocator)
 }
 
 
+ACTION_P(InvokeTransformAllocation, allocator)
+{
+  process::dispatch(
+      allocator->real,
+      &master::allocator::AllocatorProcess::transformAllocation,
+      arg0,
+      arg1,
+      arg2);
+}
+
+
 ACTION_P(InvokeResourcesRecovered, allocator)
 {
   process::dispatch(