You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by me...@apache.org on 2016/02/19 10:16:34 UTC

[1/2] mesos git commit: Extended allocator interface to support dynamic weights.

Repository: mesos
Updated Branches:
  refs/heads/master 83bf5671e -> 46cce8e35


Extended allocator interface to support dynamic weights.

Review: https://reviews.apache.org/r/41597/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/815a596f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/815a596f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/815a596f

Branch: refs/heads/master
Commit: 815a596f77ef3113ccb1b2d3d87ac6a3ff3b9e16
Parents: 83bf567
Author: Yongqiao Wang <yq...@cn.ibm.com>
Authored: Thu Feb 18 23:50:07 2016 -0800
Committer: Adam B <ad...@mesosphere.io>
Committed: Thu Feb 18 23:50:07 2016 -0800

----------------------------------------------------------------------
 include/mesos/master/allocator.hpp          |  7 ++++++
 include/mesos/mesos.proto                   | 12 +++++++++
 include/mesos/v1/mesos.proto                | 12 +++++++++
 src/master/allocator/mesos/allocator.hpp    | 17 +++++++++++++
 src/master/allocator/mesos/hierarchical.cpp | 31 ++++++++++++++++++++++++
 src/master/allocator/mesos/hierarchical.hpp |  3 +++
 src/master/allocator/sorter/drf/sorter.cpp  |  7 ++++++
 src/master/allocator/sorter/drf/sorter.hpp  |  2 ++
 src/master/allocator/sorter/sorter.hpp      |  3 +++
 src/tests/allocator.hpp                     | 14 +++++++++++
 10 files changed, 108 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index e163669..a4743c5 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -379,6 +379,13 @@ public:
    */
   virtual void removeQuota(
       const std::string& role) = 0;
+
+  /**
+   * Updates the weight of each provided role.
+   * Subsequent allocation calculations will use these updated weights.
+   */
+  virtual void updateWeights(
+      const std::vector<WeightInfo>& weightInfos) = 0;
 };
 
 } // namespace allocator {

http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 8047946..636550f 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -1764,3 +1764,15 @@ message DiscoveryInfo {
   optional Ports ports = 6;
   optional Labels labels = 7;
 }
+
+
+/**
+ * Named WeightInfo to indicate resource allocation
+ * priority between the different roles.
+ */
+message WeightInfo {
+  required double weight = 1;
+
+  // Related role name.
+  optional string role = 2;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index d909e60..1d5af88 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -1665,3 +1665,15 @@ message DiscoveryInfo {
   optional Ports ports = 6;
   optional Labels labels = 7;
 }
+
+
+/**
+ * Named WeightInfo to indicate resource allocation
+ * priority between the different roles.
+ */
+message WeightInfo {
+  required double weight = 1;
+
+  // Related role name.
+  optional string role = 2;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 581eaad..64bce0f 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -149,6 +149,9 @@ public:
   void removeQuota(
       const std::string& role);
 
+  void updateWeights(
+      const std::vector<WeightInfo>& weightInfos);
+
 private:
   MesosAllocator();
   MesosAllocator(const MesosAllocator&); // Not copyable.
@@ -272,6 +275,9 @@ public:
 
   virtual void removeQuota(
       const std::string& role) = 0;
+
+  virtual void updateWeights(
+      const std::vector<WeightInfo>& weightInfos) = 0;
 };
 
 
@@ -620,6 +626,17 @@ inline void MesosAllocator<AllocatorProcess>::removeQuota(
       role);
 }
 
+
+template <typename AllocatorProcess>
+inline void MesosAllocator<AllocatorProcess>::updateWeights(
+    const std::vector<WeightInfo>& weightInfos)
+{
+  process::dispatch(
+      process,
+      &MesosAllocatorProcess::updateWeights,
+      weightInfos);
+}
+
 } // namespace allocator {
 } // namespace master {
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index a9d2c23..5ef29f2 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -1053,6 +1053,37 @@ void HierarchicalAllocatorProcess::removeQuota(
 }
 
 
+void HierarchicalAllocatorProcess::updateWeights(
+    const vector<WeightInfo>& weightInfos)
+{
+  CHECK(initialized);
+
+  bool rebalance = false;
+
+  // Update the weight for each specified role.
+  foreach (const WeightInfo& weightInfo, weightInfos) {
+    CHECK(weightInfo.has_role());
+    weights[weightInfo.role()] = weightInfo.weight();
+
+    if (quotas.contains(weightInfo.role())) {
+      quotaRoleSorter->update(weightInfo.role(), weightInfo.weight());
+    }
+
+    if (roleSorter->contains(weightInfo.role())) {
+      rebalance = true;
+      roleSorter->update(weightInfo.role(), weightInfo.weight());
+    }
+  }
+
+  // If at least one of the updated roles has registered frameworks,
+  // then trigger the allocation explicitly in order to promptly
+  // react to the operator's request.
+  if (rebalance) {
+    allocate();
+  }
+}
+
+
 void HierarchicalAllocatorProcess::pause()
 {
   if (!paused) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 20d7ceb..0d39d3f 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -187,6 +187,9 @@ public:
   void removeQuota(
       const std::string& role);
 
+  void updateWeights(
+      const std::vector<WeightInfo>& weightInfos);
+
 protected:
   // Useful typedefs for dispatch/delay/defer to self()/this.
   typedef HierarchicalAllocatorProcess Self;

http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/master/allocator/sorter/drf/sorter.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.cpp b/src/master/allocator/sorter/drf/sorter.cpp
index 18797e4..9e863dd 100644
--- a/src/master/allocator/sorter/drf/sorter.cpp
+++ b/src/master/allocator/sorter/drf/sorter.cpp
@@ -49,6 +49,13 @@ void DRFSorter::add(const string& name, double weight)
 }
 
 
+void DRFSorter::update(const string& name, double weight)
+{
+  CHECK(weights.contains(name));
+  weights[name] = weight;
+}
+
+
 void DRFSorter::remove(const string& name)
 {
   set<Client, DRFComparator>::iterator it = find(name);

http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/master/allocator/sorter/drf/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/drf/sorter.hpp b/src/master/allocator/sorter/drf/sorter.hpp
index 4669149..46b2a9c 100644
--- a/src/master/allocator/sorter/drf/sorter.hpp
+++ b/src/master/allocator/sorter/drf/sorter.hpp
@@ -65,6 +65,8 @@ public:
 
   virtual void add(const std::string& name, double weight = 1);
 
+  virtual void update(const std::string& name, double weight);
+
   virtual void remove(const std::string& name);
 
   virtual void activate(const std::string& name);

http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/master/allocator/sorter/sorter.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/sorter/sorter.hpp b/src/master/allocator/sorter/sorter.hpp
index a0a779b..ba91a38 100644
--- a/src/master/allocator/sorter/sorter.hpp
+++ b/src/master/allocator/sorter/sorter.hpp
@@ -44,6 +44,9 @@ public:
   // may be a user or a framework.
   virtual void add(const std::string& client, double weight = 1) = 0;
 
+  // Update weight of a client.
+  virtual void update(const std::string& client, double weight) = 0;
+
   // Removes a client.
   virtual void remove(const std::string& client) = 0;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/815a596f/src/tests/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator.hpp b/src/tests/allocator.hpp
index 206e9ac..4081193 100644
--- a/src/tests/allocator.hpp
+++ b/src/tests/allocator.hpp
@@ -196,6 +196,12 @@ ACTION_P(InvokeRemoveQuota, allocator)
 }
 
 
+ACTION_P(InvokeUpdateWeights, allocator)
+{
+  allocator->real->updateWeights(arg0);
+}
+
+
 template <typename T = master::allocator::HierarchicalDRFAllocator>
 mesos::master::allocator::Allocator* createAllocator()
 {
@@ -342,6 +348,11 @@ public:
       .WillByDefault(InvokeRemoveQuota(this));
     EXPECT_CALL(*this, removeQuota(_))
       .WillRepeatedly(DoDefault());
+
+    ON_CALL(*this, updateWeights(_))
+      .WillByDefault(InvokeUpdateWeights(this));
+    EXPECT_CALL(*this, updateWeights(_))
+      .WillRepeatedly(DoDefault());
   }
 
   virtual ~TestAllocator() {}
@@ -449,6 +460,9 @@ public:
   MOCK_METHOD1(removeQuota, void(
       const std::string&));
 
+  MOCK_METHOD1(updateWeights, void(
+      const std::vector<WeightInfo>&));
+
   process::Owned<mesos::master::allocator::Allocator> real;
 };
 


[2/2] mesos git commit: Test cases for dynamic weights + allocation behaviour.

Posted by me...@apache.org.
Test cases for dynamic weights + allocation behaviour.

Review: https://reviews.apache.org/r/41672/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/46cce8e3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/46cce8e3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/46cce8e3

Branch: refs/heads/master
Commit: 46cce8e3523eaa58ea8b28d35508bf1b3d09e5a4
Parents: 815a596
Author: Yongqiao Wang <yq...@cn.ibm.com>
Authored: Thu Feb 18 23:51:10 2016 -0800
Committer: Adam B <ad...@mesosphere.io>
Committed: Thu Feb 18 23:51:10 2016 -0800

----------------------------------------------------------------------
 src/tests/hierarchical_allocator_tests.cpp | 206 ++++++++++++++++++++++++
 1 file changed, 206 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/46cce8e3/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 990f372..5f771f0 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -219,6 +219,15 @@ protected:
     return resource;
   }
 
+  static WeightInfo createWeightInfo(const string& role, double weight)
+  {
+    WeightInfo weightInfo;
+    weightInfo.set_role(role);
+    weightInfo.set_weight(weight);
+
+    return weightInfo;
+  }
+
 protected:
   master::Flags flags;
 
@@ -2594,6 +2603,203 @@ TEST_F(HierarchicalAllocator_BENCHMARK_Test, DeclineOffers)
   Clock::resume();
 }
 
+
+// This test ensures that resource allocation is done per role's weight.
+// This is done by having six slaves and three frameworks and making sure each
+// framework gets the number of resources by their role's weight.
+TEST_F(HierarchicalAllocatorTest, UpdateWeight)
+{
+  // Pausing the clock is not necessary, but ensures that the test
+  // doesn't rely on the periodic allocation in the allocator, which
+  // would slow down the test.
+  Clock::pause();
+
+  initialize();
+
+  // Register six slaves with the same resources (cpus:2;mem:1024).
+  vector<SlaveInfo> slaves;
+  const string SINGLE_RESOURCE = "cpus:2;mem:1024";
+  const string DOUBLE_RESOURCES = "cpus:4;mem:2048";
+  const string TRIPLE_RESOURCES = "cpus:6;mem:3072";
+  const string FOURFOLD_RESOURCES = "cpus:8;mem:4096";
+  const string TOTAL_RESOURCES = "cpus:12;mem:6144";
+  for (unsigned i = 0; i < 6; i++) {
+    slaves.push_back(createSlaveInfo(SINGLE_RESOURCE));
+  }
+
+  foreach (const SlaveInfo& slave, slaves) {
+    allocator->addSlave(
+        slave.id(),
+        slave,
+        None(),
+        slave.resources(),
+        hashmap<FrameworkID, Resources>());
+  }
+
+  // Framework1 registers with 'role1' which uses the default
+  // weight (1.0), and all resources will be offered to this framework.
+  FrameworkInfo framework1 = createFrameworkInfo("role1");
+  allocator->addFramework(
+      framework1.id(), framework1, hashmap<SlaveID, Resources>());
+
+  // Framework2 registers with 'role2' which also uses the
+  // default weight (1.0).
+  FrameworkInfo framework2 = createFrameworkInfo("role2");
+  allocator->addFramework(
+      framework2.id(), framework2, hashmap<SlaveID, Resources>());
+
+  // Framework1 gets one allocation with all resources, and Framework2
+  // does not get any offers due to all resources having outstanding
+  // offers to framework1 when it registered.
+  // Recover all resources owned by framework1 so they can be offered
+  // again next time.
+  Future<Allocation> allocation = allocations.get();
+  AWAIT_READY(allocation);
+  ASSERT_EQ(allocation.get().frameworkId, framework1.id());
+  ASSERT_EQ(6u, allocation.get().resources.size());
+  EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(),
+            Resources::sum(allocation.get().resources));
+  foreachpair (const SlaveID& slaveId,
+               const Resources& resources,
+               allocation.get().resources) {
+    allocator->recoverResources(
+        allocation.get().frameworkId,
+        slaveId,
+        resources,
+        None());
+  }
+
+  // Because each framework's role has a weight of 1.0 by default, test to
+  // ensure that all resources are offered equally between both frameworks.
+  hashmap<FrameworkID, size_t> counts;
+  Clock::advance(flags.allocation_interval);
+  Resources totalAllocatedResources1;
+  for (unsigned i = 0; i < 2; i++) {
+    Future<Allocation> allocation = allocations.get();
+    AWAIT_READY(allocation);
+    counts[allocation.get().frameworkId]++;
+    totalAllocatedResources1 += Resources::sum(allocation.get().resources);
+
+    // Each framework will get one allocation with three slaves.
+    ASSERT_EQ(3u, allocation.get().resources.size());
+    EXPECT_EQ(Resources::parse(TRIPLE_RESOURCES).get(),
+              Resources::sum(allocation.get().resources));
+
+    // Recover the offered resources so they can be offered again next time.
+    foreachpair (const SlaveID& slaveId,
+                 const Resources& resources,
+                 allocation.get().resources) {
+      allocator->recoverResources(
+          allocation.get().frameworkId,
+          slaveId,
+          resources,
+          None());
+    }
+  }
+
+  // Check to ensure that these two allocations sum to the total resources,
+  // this check can ensure there are only two allocations in this case.
+  EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), totalAllocatedResources1);
+  EXPECT_EQ(1u, counts[framework1.id()]);
+  EXPECT_EQ(1u, counts[framework2.id()]);
+
+  // Update the weight of framework2's role to 2.0, then their
+  // weights should be 1:2.
+  vector<WeightInfo> weightInfos1;
+  weightInfos1.push_back(createWeightInfo(framework2.role(), 2.0));
+  allocator->updateWeights(weightInfos1);
+
+  // Now that the frameworks's weights are 1:2, test to ensure that all
+  // resources are offered with a ratio of 1:2 between both frameworks.
+  counts.clear();
+  Resources totalAllocatedResources2;
+  Clock::advance(flags.allocation_interval);
+  for (unsigned i = 0; i < 2; i++) {
+    Future<Allocation> allocation = allocations.get();
+    AWAIT_READY(allocation);
+    counts[allocation.get().frameworkId]++;
+    totalAllocatedResources2 += Resources::sum(allocation.get().resources);
+
+    // Framework1 should get one allocation with two slaves.
+    if (allocation.get().frameworkId == framework1.id()) {
+      ASSERT_EQ(2u, allocation.get().resources.size());
+      EXPECT_EQ(Resources::parse(DOUBLE_RESOURCES).get(),
+                Resources::sum(allocation.get().resources));
+    } else {
+      // Framework2 should get one allocation with four slaves.
+      ASSERT_EQ(allocation.get().frameworkId, framework2.id());
+      ASSERT_EQ(4u, allocation.get().resources.size());
+      EXPECT_EQ(Resources::parse(FOURFOLD_RESOURCES).get(),
+                Resources::sum(allocation.get().resources));
+    }
+
+    // Recover the allocated resources so they can be offered again next time.
+    foreachpair (const SlaveID& slaveId,
+                 const Resources& resources,
+                 allocation.get().resources) {
+      allocator->recoverResources(
+          allocation.get().frameworkId,
+          slaveId,
+          resources,
+          None());
+    }
+  }
+  // Check to ensure that these two allocations sum to the total resources,
+  // this check can ensure there are only two allocations in this case.
+  EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), totalAllocatedResources2);
+  EXPECT_EQ(1u, counts[framework1.id()]);
+  EXPECT_EQ(1u, counts[framework2.id()]);
+
+  // Add a new role with a weight of 3.0.
+  vector<WeightInfo> weightInfos2;
+  weightInfos2.push_back(createWeightInfo("role3", 3.0));
+  allocator->updateWeights(weightInfos2);
+
+  // Framework3 registers with 'role3'.
+  FrameworkInfo framework3 = createFrameworkInfo("role3");
+  allocator->addFramework(
+      framework3.id(), framework3, hashmap<SlaveID, Resources>());
+
+  // Currently, there are three frameworks and six slaves in this cluster,
+  // and the weight ratio of these frameworks is 1:2:3, therefore frameworks
+  // will get the proper resource ratio of 1:2:3.
+  counts.clear();
+  Resources totalAllocatedResources3;
+  for (unsigned i = 0; i < 3; i++) {
+    Future<Allocation> allocation = allocations.get();
+    AWAIT_READY(allocation);
+    counts[allocation.get().frameworkId]++;
+    totalAllocatedResources3 += Resources::sum(allocation.get().resources);
+
+    // Framework1 should get one allocation with one slave.
+    if (allocation.get().frameworkId == framework1.id()) {
+      ASSERT_EQ(1u, allocation.get().resources.size());
+      EXPECT_EQ(Resources::parse(SINGLE_RESOURCE).get(),
+                Resources::sum(allocation.get().resources));
+    } else if (allocation.get().frameworkId == framework2.id()) {
+      // Framework2 should get one allocation with two slaves.
+      ASSERT_EQ(2u, allocation.get().resources.size());
+      EXPECT_EQ(Resources::parse(DOUBLE_RESOURCES).get(),
+                Resources::sum(allocation.get().resources));
+    } else {
+      // Framework3 should get one allocation with three slaves.
+      ASSERT_EQ(allocation.get().frameworkId, framework3.id());
+      ASSERT_EQ(3u, allocation.get().resources.size());
+      EXPECT_EQ(Resources::parse(TRIPLE_RESOURCES).get(),
+                Resources::sum(allocation.get().resources));
+    }
+  }
+
+  // Check to ensure that these three allocations sum to the total resources,
+  // this check can ensure there are only three allocations in this case.
+  EXPECT_EQ(Resources::parse(TOTAL_RESOURCES).get(), totalAllocatedResources3);
+  EXPECT_EQ(1u, counts[framework1.id()]);
+  EXPECT_EQ(1u, counts[framework2.id()]);
+  EXPECT_EQ(1u, counts[framework3.id()]);
+
+  Clock::resume();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {