You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/07/25 00:43:31 UTC

mesos git commit: Added a new API call 'updateAvailable' to the allocator.

Repository: mesos
Updated Branches:
  refs/heads/master cbb209991 -> dc2e130a7


Added a new API call 'updateAvailable' to the allocator.

Needed to implement the master HTTP endpoints: `/reserve`, `/unreserve`,
`/create` and `/destroy`.

This API is similar to `updateSlave` which is currently very specific to
oversubscription.  I considered consolidating `updateAvailable` and
`updateSlave` but it will require making offers be generated within the
allocator to enable this.

In specific, `updateAvailable` could fail if there aren't sufficient
available resources to update, whereas `updateSlave` avoids failing by
keeping the allocator in an over-allocated state. For `updateSlave`,
leaving the allocator in an over-allocated state is ok. This is because
it does not modify resources therefore `total - allocated` will work out
to __empty__. `updateAvailable` cannot leave the allocator in an
over-allocated state, because it modifies resources, and therefore
`total - allocated` is not guaranteed to yield __empty__.

Review: https://reviews.apache.org/r/35947


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dc2e130a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dc2e130a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dc2e130a

Branch: refs/heads/master
Commit: dc2e130a7faccb6ee28c207c8337cb58dfc3ca5c
Parents: cbb2099
Author: Michael Park <mc...@gmail.com>
Authored: Fri Jul 24 15:43:00 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Jul 24 15:43:01 2015 -0700

----------------------------------------------------------------------
 include/mesos/master/allocator.hpp          |  6 ++
 src/master/allocator/mesos/allocator.hpp    | 23 +++++++
 src/master/allocator/mesos/hierarchical.hpp | 46 ++++++++++++++
 src/tests/hierarchical_allocator_tests.cpp  | 80 ++++++++++++++++++++++++
 src/tests/mesos.hpp                         | 15 +++++
 5 files changed, 170 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 22992c0..659f37b 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -27,6 +27,8 @@
 
 #include <mesos/resources.hpp>
 
+#include <process/future.hpp>
+
 #include <stout/duration.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
@@ -128,6 +130,10 @@ public:
       const SlaveID& slaveId,
       const std::vector<Offer::Operation>& operations) = 0;
 
+  virtual process::Future<Nothing> updateAvailable(
+      const SlaveID& slaveId,
+      const std::vector<Offer::Operation>& operations) = 0;
+
   // Informs the Allocator to recover resources that are considered
   // used by the framework.
   virtual void recoverResources(

http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 72470ec..aa55755 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -22,6 +22,7 @@
 #include <mesos/master/allocator.hpp>
 
 #include <process/dispatch.hpp>
+#include <process/future.hpp>
 #include <process/process.hpp>
 
 #include <stout/try.hpp>
@@ -102,6 +103,10 @@ public:
       const SlaveID& slaveId,
       const std::vector<Offer::Operation>& operations);
 
+  process::Future<Nothing> updateAvailable(
+      const SlaveID& slaveId,
+      const std::vector<Offer::Operation>& operations);
+
   void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -188,6 +193,10 @@ public:
       const SlaveID& slaveId,
       const std::vector<Offer::Operation>& operations) = 0;
 
+  virtual process::Future<Nothing> updateAvailable(
+      const SlaveID& slaveId,
+      const std::vector<Offer::Operation>& operations) = 0;
+
   virtual void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -406,6 +415,20 @@ inline void MesosAllocator<AllocatorProcess>::updateAllocation(
 
 
 template <typename AllocatorProcess>
+inline process::Future<Nothing>
+MesosAllocator<AllocatorProcess>::updateAvailable(
+    const SlaveID& slaveId,
+    const std::vector<Offer::Operation>& operations)
+{
+  return process::dispatch(
+      process,
+      &MesosAllocatorProcess::updateAvailable,
+      slaveId,
+      operations);
+}
+
+
+template <typename AllocatorProcess>
 inline void MesosAllocator<AllocatorProcess>::recoverResources(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 3264d14..eaf9c6a 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -27,6 +27,7 @@
 
 #include <process/event.hpp>
 #include <process/delay.hpp>
+#include <process/future.hpp>
 #include <process/id.hpp>
 #include <process/metrics/gauge.hpp>
 #include <process/metrics/metrics.hpp>
@@ -139,6 +140,10 @@ public:
       const SlaveID& slaveId,
       const std::vector<Offer::Operation>& operations);
 
+  process::Future<Nothing> updateAvailable(
+      const SlaveID& slaveId,
+      const std::vector<Offer::Operation>& operations);
+
   void recoverResources(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
@@ -716,6 +721,47 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAllocation(
 
 
 template <class RoleSorter, class FrameworkSorter>
+process::Future<Nothing>
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAvailable(
+    const SlaveID& slaveId,
+    const std::vector<Offer::Operation>& operations)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+
+  Resources available = slaves[slaveId].total - slaves[slaveId].allocated;
+
+  // It's possible for this 'apply' to fail here because a call to
+  // 'allocate' could have been enqueued by the allocator itself
+  // just before master's request to enqueue 'updateAvailable'
+  // arrives to the allocator.
+  //
+  //   Master -------R------------
+  //                  \___
+  //                      \
+  //   Allocator --A-----A-U---A--
+  //                \___/ \___/
+  //
+  //   where A = allocate, R = reserve, U = updateAvailable
+  Try<Resources> updatedAvailable = available.apply(operations);
+  if (updatedAvailable.isError()) {
+    return process::Failure(updatedAvailable.error());
+  }
+
+  // Update the total resources.
+  Try<Resources> updatedTotal = slaves[slaveId].total.apply(operations);
+  CHECK_SOME(updatedTotal);
+
+  slaves[slaveId].total = updatedTotal.get();
+
+  // Now, update the total resources in the role sorter.
+  roleSorter->update(slaveId, slaves[slaveId].total.unreserved());
+
+  return Nothing();
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
 void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
     const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 3258840..c92d47a 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -782,6 +782,86 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation)
 }
 
 
+// This test ensures that a call to 'updateAvailable' succeeds when the
+// allocator has sufficient available resources.
+TEST_F(HierarchicalAllocatorTest, UpdateAvailableSuccess)
+{
+  initialize(vector<string>{"role1"});
+
+  hashmap<FrameworkID, Resources> EMPTY;
+
+  SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
+  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+
+  // Construct an offer operation for the framework's allocation.
+  Resources unreserved = Resources::parse("cpus:25;mem:50").get();
+  Resources dynamicallyReserved =
+    unreserved.flatten("role1", createReservationInfo("ops"));
+
+  Offer::Operation reserve = RESERVE(dynamicallyReserved);
+
+  // Update the allocation in the allocator.
+  Future<Nothing> update = allocator->updateAvailable(slave.id(), {reserve});
+  AWAIT_EXPECT_READY(update);
+
+  // Expect to receive the updated available resources.
+  FrameworkInfo framework = createFrameworkInfo("role1");
+  allocator->addFramework(
+      framework.id(), framework, hashmap<SlaveID, Resources>());
+
+  Future<Allocation> allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(framework.id(), allocation.get().frameworkId);
+  EXPECT_EQ(1u, allocation.get().resources.size());
+  EXPECT_TRUE(allocation.get().resources.contains(slave.id()));
+
+  // The allocation should be the slave's resources with the offer
+  // operation applied.
+  Try<Resources> updated = Resources(slave.resources()).apply(reserve);
+  ASSERT_SOME(updated);
+
+  EXPECT_NE(Resources(slave.resources()),
+            Resources::sum(allocation.get().resources));
+
+  EXPECT_EQ(updated.get(), Resources::sum(allocation.get().resources));
+}
+
+
+// This test ensures that a call to 'updateAvailable' fails when the
+// allocator has insufficient available resources.
+TEST_F(HierarchicalAllocatorTest, UpdateAvailableFail)
+{
+  initialize(vector<string>{"role1"});
+
+  hashmap<FrameworkID, Resources> EMPTY;
+
+  SlaveInfo slave = createSlaveInfo("cpus:100;mem:100;disk:100");
+  allocator->addSlave(slave.id(), slave, slave.resources(), EMPTY);
+
+  // Expect to receive the all of the available resources.
+  FrameworkInfo framework = createFrameworkInfo("role1");
+  allocator->addFramework(
+      framework.id(), framework, hashmap<SlaveID, Resources>());
+
+  Future<Allocation> allocation = queue.get();
+  AWAIT_READY(allocation);
+  EXPECT_EQ(framework.id(), allocation.get().frameworkId);
+  EXPECT_EQ(1u, allocation.get().resources.size());
+  EXPECT_TRUE(allocation.get().resources.contains(slave.id()));
+  EXPECT_EQ(slave.resources(), Resources::sum(allocation.get().resources));
+
+  // Construct an offer operation for the framework's allocation.
+  Resources unreserved = Resources::parse("cpus:25;mem:50").get();
+  Resources dynamicallyReserved =
+    unreserved.flatten("role1", createReservationInfo("ops"));
+
+  Offer::Operation reserve = RESERVE(dynamicallyReserved);
+
+  // Update the allocation in the allocator.
+  Future<Nothing> update = allocator->updateAvailable(slave.id(), {reserve});
+  AWAIT_EXPECT_FAILED(update);
+}
+
 // This test ensures that when oversubscribed resources are updated
 // subsequent allocations properly account for that.
 TEST_F(HierarchicalAllocatorTest, UpdateSlave)

http://git-wip-us.apache.org/repos/asf/mesos/blob/dc2e130a/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 69134e1..8a76b4f 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1036,6 +1036,12 @@ ACTION_P(InvokeUpdateAllocation, allocator)
 }
 
 
+ACTION_P(InvokeUpdateResources, allocator)
+{
+  return allocator->real->updateAvailable(arg0, arg1);
+}
+
+
 ACTION_P(InvokeRecoverResources, allocator)
 {
   allocator->real->recoverResources(arg0, arg1, arg2, arg3);
@@ -1154,6 +1160,11 @@ public:
     EXPECT_CALL(*this, updateAllocation(_, _, _))
       .WillRepeatedly(DoDefault());
 
+    ON_CALL(*this, updateAvailable(_, _))
+      .WillByDefault(InvokeUpdateResources(this));
+    EXPECT_CALL(*this, updateAvailable(_, _))
+      .WillRepeatedly(DoDefault());
+
     ON_CALL(*this, recoverResources(_, _, _, _))
       .WillByDefault(InvokeRecoverResources(this));
     EXPECT_CALL(*this, recoverResources(_, _, _, _))
@@ -1223,6 +1234,10 @@ public:
       const SlaveID&,
       const std::vector<Offer::Operation>&));
 
+  MOCK_METHOD2(updateAvailable, process::Future<Nothing>(
+      const SlaveID&,
+      const std::vector<Offer::Operation>&));
+
   MOCK_METHOD4(recoverResources, void(
       const FrameworkID&,
       const SlaveID&,