You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/02/01 20:18:37 UTC

[1/6] mesos git commit: Added a simple AllocatorBacklog benchmark.

Repository: mesos
Updated Branches:
  refs/heads/master e69160ad2 -> e05a44540


Added a simple AllocatorBacklog benchmark.

Review: https://reviews.apache.org/r/55874


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e05a4454
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e05a4454
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e05a4454

Branch: refs/heads/master
Commit: e05a445407024cfcff8eee7d03400fac3762280e
Parents: f6aaddb
Author: Jiang Yan Xu <xu...@apple.com>
Authored: Mon Jan 23 18:00:11 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed Feb 1 12:16:18 2017 -0800

----------------------------------------------------------------------
 src/tests/hierarchical_allocator_tests.cpp | 101 ++++++++++++++++++++++++
 1 file changed, 101 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e05a4454/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index 4a90651..3ecedb8 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -4314,6 +4314,107 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, Metrics)
        << " and " << frameworkCount << " frameworks" << endl;
 }
 
+
+// This test uses `reviveOffers` to add allocation-triggering events
+// to the allocator queue in order to measure the impact of allocation
+// batching (MESOS-6904).
+TEST_P(HierarchicalAllocator_BENCHMARK_Test, AllocatorBacklog)
+{
+  size_t agentCount = std::tr1::get<0>(GetParam());
+  size_t frameworkCount = std::tr1::get<1>(GetParam());
+
+  // Pause the clock because we want to manually drive the allocations.
+  Clock::pause();
+
+  cout << "Using " << agentCount << " agents and "
+       << frameworkCount << " frameworks" << endl;
+
+  master::Flags flags;
+  initialize(flags);
+
+  // 1. Add frameworks.
+  vector<FrameworkInfo> frameworks;
+  frameworks.reserve(frameworkCount);
+
+  for (size_t i = 0; i < frameworkCount; i++) {
+    frameworks.push_back(createFrameworkInfo("*"));
+  }
+
+  Stopwatch watch;
+  watch.start();
+
+  for (size_t i = 0; i < frameworkCount; i++) {
+    allocator->addFramework(frameworks.at(i).id(), frameworks.at(i), {}, true);
+  }
+
+  // Wait for all the `addFramework` operations to be processed.
+  Clock::settle();
+
+  watch.stop();
+
+  const string metric = "allocator/mesos/allocation_runs";
+
+  JSON::Object metrics = Metrics();
+  int runs1 = metrics.values[metric].as<JSON::Number>().as<int>();
+
+  cout << "Added " << frameworkCount << " frameworks in "
+       << watch.elapsed() << " with " << runs1
+       << " allocation runs" << endl;
+
+  // 2. Add agents.
+  vector<SlaveInfo> agents;
+  agents.reserve(agentCount);
+
+  const Resources agentResources = Resources::parse(
+      "cpus:24;mem:4096;disk:4096;ports:[31000-32000]").get();
+
+  for (size_t i = 0; i < agentCount; i++) {
+    agents.push_back(createSlaveInfo(agentResources));
+  }
+
+  watch.start();
+
+  for (size_t i = 0; i < agentCount; i++) {
+    allocator->addSlave(
+        agents.at(i).id(), agents.at(i), None(), agents.at(i).resources(), {});
+  }
+
+  // Wait for all the `addSlave` operations to be processed.
+  Clock::settle();
+
+  watch.stop();
+
+  metrics = Metrics();
+  ASSERT_EQ(1, metrics.values.count(metric));
+  int runs2 = metrics.values[metric].as<JSON::Number>().as<int>();
+
+  cout << "Added " << agentCount << " agents in "
+       << watch.elapsed() << " with " << runs2 - runs1
+       << " allocation runs" << endl;
+
+  watch.start();
+
+  // 3. Invoke a `reviveOffers` call for each framework to enqueue
+  // events. The allocator doesn't have more resources to allocate
+  // but still incurs the overhead of additional allocation runs.
+  for (size_t i = 0; i < frameworkCount; i++) {
+    allocator->reviveOffers(frameworks.at(i).id());
+  }
+
+  // Wait for all the `reviveOffers` operations to be processed.
+  Clock::settle();
+
+  watch.stop();
+
+  metrics = Metrics();
+  ASSERT_EQ(1, metrics.values.count(metric));
+  int runs3 = metrics.values[metric].as<JSON::Number>().as<int>();
+
+  cout << "Processed " << frameworkCount << " `reviveOffers` calls"
+       << " in " << watch.elapsed() << " with " << runs3 - runs2
+       << " allocation runs" << endl;
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[2/6] mesos git commit: Fixed MasterAllocatorTest/1.RebalancedForUpdatedWeights.

Posted by ya...@apache.org.
Fixed MasterAllocatorTest/1.RebalancedForUpdatedWeights.

- This test is broken by changes introduced in MESOS-6904.

Review: https://reviews.apache.org/r/55852/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/daa15285
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/daa15285
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/daa15285

Branch: refs/heads/master
Commit: daa15285fe7fa829cd5ec19c5bb6728df9701280
Parents: f68ed9b
Author: Jiang Yan Xu <xu...@apple.com>
Authored: Wed Feb 1 11:07:45 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed Feb 1 12:16:18 2017 -0800

----------------------------------------------------------------------
 src/tests/master_allocator_tests.cpp | 25 +++++++++++++++++++------
 1 file changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/daa15285/src/tests/master_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp
index 996762f..d22862d 100644
--- a/src/tests/master_allocator_tests.cpp
+++ b/src/tests/master_allocator_tests.cpp
@@ -34,6 +34,7 @@
 #include <process/gmock.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
+#include <process/queue.hpp>
 
 #include <stout/some.hpp>
 #include <stout/strings.hpp>
@@ -66,6 +67,7 @@ using process::Clock;
 using process::Future;
 using process::Owned;
 using process::PID;
+using process::Queue;
 
 using process::http::OK;
 using process::http::Response;
@@ -1670,13 +1672,17 @@ TYPED_TEST(MasterAllocatorTest, RebalancedForUpdatedWeights)
   EXPECT_CALL(sched2, registered(&driver2, _, _))
     .WillOnce(FutureSatisfy(&registered2));
 
-  Future<vector<Offer>> framework2offers;
+  Queue<Offer> framework2offers;
   EXPECT_CALL(sched2, resourceOffers(&driver2, _))
-    .WillOnce(FutureArg<1>(&framework2offers));
+    .WillRepeatedly(EnqueueOffers(&framework2offers));
 
   driver2.start();
   AWAIT_READY(registered2);
 
+  // Settle to make sure the dispatched allocation is executed before
+  // the weights are updated.
+  Clock::settle();
+
   // role1 share = 1 (cpus=6, mem=3072)
   //   framework1 share = 1
   // role2 share = 0
@@ -1722,9 +1728,13 @@ TYPED_TEST(MasterAllocatorTest, RebalancedForUpdatedWeights)
   EXPECT_EQ(recoverResources3.get(),
             Resources::parse(agentResources).get());
 
-  // Trigger a batch allocation.
+  // Trigger a batch allocation to make sure all resources are
+  // offered out again.
   Clock::advance(masterFlags.allocation_interval);
 
+  // Settle to make sure all offers are received.
+  Clock::settle();
+
   // role1 share = 0.33 (cpus=2, mem=1024)
   //   framework1 share = 1
   // role2 share = 0.66 (cpus=4, mem=2048)
@@ -1735,10 +1745,13 @@ TYPED_TEST(MasterAllocatorTest, RebalancedForUpdatedWeights)
   EXPECT_EQ(Resources(framework1offers2.get()[0].resources()),
             Resources::parse(agentResources).get());
 
-  AWAIT_READY(framework2offers);
-  ASSERT_EQ(2u, framework2offers.get().size());
+  ASSERT_EQ(2u, framework2offers.size());
   for (int i = 0; i < 2; i++) {
-    EXPECT_EQ(Resources(framework2offers.get()[i].resources()),
+    Future<Offer> offer = framework2offers.get();
+
+    // All offers for framework2 are enqueued by now.
+    AWAIT_READY(offer);
+    EXPECT_EQ(Resources(offer->resources()),
               Resources::parse(agentResources).get());
   }
 


[4/6] mesos git commit: Dispatch filter expiration twice.

Posted by ya...@apache.org.
Dispatch filter expiration twice.

- With an asynchronous `batch()` allocation,
  this ensures that filters do not expire
  before the next allocation.

Review: https://reviews.apache.org/r/52534/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2f170f30
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2f170f30
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2f170f30

Branch: refs/heads/master
Commit: 2f170f302fe94c47c7eeec574041dff4e7057020
Parents: 69875e5
Author: Jacob Janco <jj...@gmail.com>
Authored: Wed Feb 1 10:43:46 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed Feb 1 12:16:18 2017 -0800

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.cpp | 36 ++++++++++++++++++------
 src/master/allocator/mesos/hierarchical.hpp |  5 ++++
 2 files changed, 32 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2f170f30/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 5ff8567..ffa0708 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -931,7 +931,7 @@ void HierarchicalAllocatorProcess::updateInverseOffer(
       .inverseOfferFilters[slaveId].insert(inverseOfferFilter);
 
     // We need to disambiguate the function call to pick the correct
-    // expire() overload.
+    // `expire()` overload.
     void (Self::*expireInverseOffer)(
              const FrameworkID&,
              const SlaveID&,
@@ -1055,22 +1055,26 @@ void HierarchicalAllocatorProcess::recoverResources(
     OfferFilter* offerFilter = new RefusedOfferFilter(resources);
     frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter);
 
-    // We need to disambiguate the function call to pick the correct
-    // expire() overload.
-    void (Self::*expireOffer)(
-              const FrameworkID&,
-              const SlaveID&,
-              OfferFilter*) = &Self::expire;
-
     // Expire the filter after both an `allocationInterval` and the
     // `timeout` have elapsed. This ensures that the filter does not
     // expire before we perform the next allocation for this agent,
     // see MESOS-4302 for more information.
     //
+    // Because the next batched allocation goes through a dispatch
+    // after `allocationInterval`, we do the same for `expire()`
+    // (with a hepler `_expire()`) to achieve the above.
+    //
     // TODO(alexr): If we allocated upon resource recovery
     // (MESOS-3078), we would not need to increase the timeout here.
     timeout = std::max(allocationInterval, timeout.get());
 
+    // We need to disambiguate the function call to pick the correct
+    // `expire()` overload.
+    void (Self::*expireOffer)(
+              const FrameworkID&,
+              const SlaveID&,
+              OfferFilter*) = &Self::expire;
+
     delay(timeout.get(),
           self(),
           expireOffer,
@@ -1789,7 +1793,7 @@ void HierarchicalAllocatorProcess::deallocate()
 }
 
 
-void HierarchicalAllocatorProcess::expire(
+void HierarchicalAllocatorProcess::_expire(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,
     OfferFilter* offerFilter)
@@ -1815,6 +1819,20 @@ void HierarchicalAllocatorProcess::expire(
 void HierarchicalAllocatorProcess::expire(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,
+    OfferFilter* offerFilter)
+{
+  dispatch(
+      self(),
+      &Self::_expire,
+      frameworkId,
+      slaveId,
+      offerFilter);
+}
+
+
+void HierarchicalAllocatorProcess::expire(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
     InverseOfferFilter* inverseOfferFilter)
 {
   // The filter might have already been removed (e.g., if the

http://git-wip-us.apache.org/repos/asf/mesos/blob/2f170f30/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 124628c..339b3d2 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -239,6 +239,11 @@ protected:
       const SlaveID& slaveId,
       OfferFilter* offerFilter);
 
+  void _expire(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      OfferFilter* offerFilter);
+
   // Remove an inverse offer filter for the specified framework.
   void expire(
       const FrameworkID& frameworkId,


[3/6] mesos git commit: Perform batching of allocations to reduce allocator queue backlogging.

Posted by ya...@apache.org.
Perform batching of allocations to reduce allocator queue backlogging.

- New allocation runs are now dispatched only if there is none pending
  in the queue. Otherwise allocation candidates are accumulated into
  the pending allocation run.
- Allocation candidates are cleared when the enqueued allocation
  run is processed.

Review: https://reviews.apache.org/r/51027/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/69875e52
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/69875e52
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/69875e52

Branch: refs/heads/master
Commit: 69875e52ba6bf70e4415c212d2caeb61e5bafaf3
Parents: e69160a
Author: Jacob Janco <jj...@gmail.com>
Authored: Wed Feb 1 10:37:03 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed Feb 1 12:16:18 2017 -0800

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.cpp | 104 +++++++++++++----------
 src/master/allocator/mesos/hierarchical.hpp |  32 +++++--
 2 files changed, 82 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/69875e52/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index f471b68..5ff8567 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -24,8 +24,9 @@
 #include <mesos/resources.hpp>
 #include <mesos/type_utils.hpp>
 
-#include <process/event.hpp>
 #include <process/delay.hpp>
+#include <process/dispatch.hpp>
+#include <process/event.hpp>
 #include <process/id.hpp>
 #include <process/timeout.hpp>
 
@@ -501,6 +502,7 @@ void HierarchicalAllocatorProcess::removeSlave(
   quotaRoleSorter->remove(slaveId, slaves[slaveId].total.nonRevocable());
 
   slaves.erase(slaveId);
+  allocationCandidates.erase(slaveId);
 
   // Note that we DO NOT actually delete any filters associated with
   // this slave, that will occur when the delayed
@@ -1165,8 +1167,6 @@ void HierarchicalAllocatorProcess::setQuota(
   LOG(INFO) << "Set quota " << quota.info.guarantee() << " for role '" << role
             << "'";
 
-  // Trigger the allocation explicitly in order to promptly react to the
-  // operator's request.
   allocate();
 }
 
@@ -1190,8 +1190,6 @@ void HierarchicalAllocatorProcess::removeQuota(
 
   metrics.removeQuota(role);
 
-  // Trigger the allocation explicitly in order to promptly react to the
-  // operator's request.
   allocate();
 }
 
@@ -1223,9 +1221,8 @@ void HierarchicalAllocatorProcess::updateWeights(
     }
   }
 
-  // If at least one of the updated roles has registered frameworks,
-  // then trigger the allocation explicitly in order to promptly
-  // react to the operator's request.
+  // If at least one of the updated roles has registered
+  // frameworks, then trigger the allocation.
   if (rebalance) {
     allocate();
   }
@@ -1254,62 +1251,83 @@ void HierarchicalAllocatorProcess::resume()
 
 void HierarchicalAllocatorProcess::batch()
 {
-  allocate();
-  delay(allocationInterval, self(), &Self::batch);
+  auto pid = self();
+
+  allocate()
+    .onAny([pid, this]() {
+      delay(allocationInterval, pid, &Self::batch);
+    });
 }
 
 
-void HierarchicalAllocatorProcess::allocate()
+Future<Nothing> HierarchicalAllocatorProcess::allocate()
+{
+  return allocate(slaves.keys());
+}
+
+
+Future<Nothing> HierarchicalAllocatorProcess::allocate(
+    const SlaveID& slaveId)
+{
+  hashset<SlaveID> slaves({slaveId});
+  return allocate(slaves);
+}
+
+
+Future<Nothing> HierarchicalAllocatorProcess::allocate(
+    const hashset<SlaveID>& slaveIds)
 {
   if (paused) {
     VLOG(1) << "Skipped allocation because the allocator is paused";
 
-    return;
+    return Nothing();
   }
 
-  Stopwatch stopwatch;
-  stopwatch.start();
-
-  metrics.allocation_run.start();
-
-  allocate(slaves.keys());
+  allocationCandidates |= slaveIds;
 
-  metrics.allocation_run.stop();
+  if (allocation.isNone() || !allocation->isPending()) {
+    allocation = dispatch(self(), &Self::_allocate);
+  }
 
-  VLOG(1) << "Performed allocation for " << slaves.size() << " agents in "
-            << stopwatch.elapsed();
+  return allocation.get();
 }
 
 
-void HierarchicalAllocatorProcess::allocate(
-    const SlaveID& slaveId)
-{
+Nothing HierarchicalAllocatorProcess::_allocate() {
   if (paused) {
     VLOG(1) << "Skipped allocation because the allocator is paused";
 
-    return;
+    return Nothing();
   }
 
+  ++metrics.allocation_runs;
+
   Stopwatch stopwatch;
   stopwatch.start();
   metrics.allocation_run.start();
 
-  hashset<SlaveID> slaves({slaveId});
-  allocate(slaves);
+  __allocate();
+
+  // NOTE: For now, we implement maintenance inverse offers within the
+  // allocator. We leverage the existing timer/cycle of offers to also do any
+  // "deallocation" (inverse offers) necessary to satisfy maintenance needs.
+  deallocate();
 
   metrics.allocation_run.stop();
 
-  VLOG(1) << "Performed allocation for agent " << slaveId << " in "
-          << stopwatch.elapsed();
+  VLOG(1) << "Performed allocation for " << allocationCandidates.size()
+          << " agents in " << stopwatch.elapsed();
+
+  // Clear the candidates on completion of the allocation run.
+  allocationCandidates.clear();
+
+  return Nothing();
 }
 
 
 // TODO(alexr): Consider factoring out the quota allocation logic.
-void HierarchicalAllocatorProcess::allocate(
-    const hashset<SlaveID>& slaveIds_)
+void HierarchicalAllocatorProcess::__allocate()
 {
-  ++metrics.allocation_runs;
-
   // Compute the offerable resources, per framework:
   //   (1) For reserved resources on the slave, allocate these to a
   //       framework having the corresponding role.
@@ -1317,16 +1335,16 @@ void HierarchicalAllocatorProcess::allocate(
   //       to a framework of any role.
   hashmap<FrameworkID, hashmap<SlaveID, Resources>> offerable;
 
-  // NOTE: This function can operate on a small subset of slaves, we have to
-  // make sure that we don't assume cluster knowledge when summing resources
-  // from that set.
+  // NOTE: This function can operate on a small subset of
+  // `allocationCandidates`, we have to make sure that we don't
+  // assume cluster knowledge when summing resources from that set.
 
   vector<SlaveID> slaveIds;
-  slaveIds.reserve(slaveIds_.size());
+  slaveIds.reserve(allocationCandidates.size());
 
   // Filter out non-whitelisted and deactivated slaves in order not to send
   // offers for them.
-  foreach (const SlaveID& slaveId, slaveIds_) {
+  foreach (const SlaveID& slaveId, allocationCandidates) {
     if (isWhitelisted(slaveId) && slaves[slaveId].activated) {
       slaveIds.push_back(slaveId);
     }
@@ -1679,16 +1697,10 @@ void HierarchicalAllocatorProcess::allocate(
       offerCallback(frameworkId, offerable[frameworkId]);
     }
   }
-
-  // NOTE: For now, we implement maintenance inverse offers within the
-  // allocator. We leverage the existing timer/cycle of offers to also do any
-  // "deallocation" (inverse offers) necessary to satisfy maintenance needs.
-  deallocate(slaveIds_);
 }
 
 
-void HierarchicalAllocatorProcess::deallocate(
-    const hashset<SlaveID>& slaveIds_)
+void HierarchicalAllocatorProcess::deallocate()
 {
   // If no frameworks are currently registered, no work to do.
   if (activeRoles.empty()) {
@@ -1712,7 +1724,7 @@ void HierarchicalAllocatorProcess::deallocate(
   // responded yet.
 
   foreachvalue (const Owned<Sorter>& frameworkSorter, frameworkSorters) {
-    foreach (const SlaveID& slaveId, slaveIds_) {
+    foreach (const SlaveID& slaveId, allocationCandidates) {
       CHECK(slaves.contains(slaveId));
 
       if (slaves[slaveId].maintenance.isSome()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/69875e52/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 2cda3e3..124628c 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -214,17 +214,24 @@ protected:
   // Callback for doing batch allocations.
   void batch();
 
-  // Allocate any allocatable resources.
-  void allocate();
+  // Allocate any allocatable resources from all known agents.
+  process::Future<Nothing> allocate();
 
-  // Allocate resources just from the specified slave.
-  void allocate(const SlaveID& slaveId);
+  // Allocate resources from the specified agent.
+  process::Future<Nothing> allocate(const SlaveID& slaveId);
 
-  // Allocate resources from the specified slaves.
-  void allocate(const hashset<SlaveID>& slaveIds);
+  // Allocate resources from the specified agents. The allocation
+  // is deferred and batched with other allocation requests.
+  process::Future<Nothing> allocate(const hashset<SlaveID>& slaveIds);
 
-  // Send inverse offers from the specified slaves.
-  void deallocate(const hashset<SlaveID>& slaveIds);
+  // Method that performs allocation work.
+  Nothing _allocate();
+
+  // Helper for `_allocate()` that allocates resources for offers.
+  void __allocate();
+
+  // Helper for `_allocate()` that deallocates resources for inverse offers.
+  void deallocate();
 
   // Remove an offer filter for the specified framework.
   void expire(
@@ -385,6 +392,15 @@ protected:
 
   hashmap<SlaveID, Slave> slaves;
 
+  // A set of agents that are kept as allocation candidates. Events
+  // may add or remove candidates to the set. When an allocation is
+  // processed, the set of candidates is cleared.
+  hashset<SlaveID> allocationCandidates;
+
+  // Future for the dispatched allocation that becomes
+  // ready after the allocation run is complete.
+  Option<process::Future<Nothing>> allocation;
+
   // Number of registered frameworks for each role. When a role's active
   // count drops to zero, it is removed from this map; the role is also
   // removed from `roleSorter` and its `frameworkSorter` is deleted.


[5/6] mesos git commit: Fixed OversubscriptionTest.RescindRevocableOfferWithIncreasedRevocable.

Posted by ya...@apache.org.
Fixed OversubscriptionTest.RescindRevocableOfferWithIncreasedRevocable.

- This test is broken by changes introduced in MESOS-6904.

Review: https://reviews.apache.org/r/55893/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f6aaddb9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f6aaddb9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f6aaddb9

Branch: refs/heads/master
Commit: f6aaddb922b5265ede77afd83fef941da77ee0c4
Parents: daa1528
Author: Jiang Yan Xu <xu...@apple.com>
Authored: Wed Feb 1 11:08:27 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed Feb 1 12:16:18 2017 -0800

----------------------------------------------------------------------
 src/tests/oversubscription_tests.cpp | 67 ++++++++++++++-----------------
 1 file changed, 31 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f6aaddb9/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 22ae069..167beaf 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -441,9 +441,7 @@ TEST_F(OversubscriptionTest, RevocableOffer)
 // This test verifies that when the master receives a new estimate for
 // oversubscribed resources it rescinds outstanding revocable offers.
 // In this test the oversubscribed resources are increased, so the master
-// will send out two offers, the first one is the increased oversubscribed
-// resources and the second one is the oversubscribed resources from the
-// rescind offered resources.
+// will send out new offers with increased revocable resources.
 TEST_F(OversubscriptionTest, RescindRevocableOfferWithIncreasedRevocable)
 {
   // Pause the clock because we want to manually drive the allocations.
@@ -484,40 +482,36 @@ TEST_F(OversubscriptionTest, RescindRevocableOfferWithIncreasedRevocable)
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
-  Future<vector<Offer>> offers1;
+  Queue<Offer> offers;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers1));
+    .WillRepeatedly(EnqueueOffers(&offers));
 
   driver.start();
 
   // Initially the framework will get all regular resources.
   Clock::advance(agentFlags.registration_backoff_factor);
   Clock::advance(masterFlags.allocation_interval);
-  AWAIT_READY(offers1);
-  EXPECT_NE(0u, offers1->size());
-  EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty());
+  Clock::settle();
 
-  Future<vector<Offer>> offers2;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers2));
+  EXPECT_EQ(1u, offers.size());
+  EXPECT_TRUE(Resources(offers.get()->resources()).revocable().empty());
 
   // Inject an estimation of oversubscribable resources.
   Resources resources1 = createRevocableResources("cpus", "1");
   estimations.put(resources1);
 
   // Now the framework will get revocable resources.
-  AWAIT_READY(offers2);
-  EXPECT_NE(0u, offers2->size());
-  EXPECT_EQ(resources1, Resources(offers2.get()[0].resources()));
+  Clock::settle();
+
+  EXPECT_EQ(1u, offers.size());
+  Future<Offer> offer = offers.get();
+  AWAIT_READY(offer);
+  EXPECT_EQ(resources1, Resources(offer->resources()));
 
   Future<OfferID> offerId;
   EXPECT_CALL(sched, offerRescinded(&driver, _))
     .WillOnce(FutureArg<1>(&offerId));
 
-  Future<vector<Offer>> offers3;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers3));
-
   // Inject another estimation of increased oversubscribable resources
   // while the previous revocable offer is outstanding.
   Resources resources2 = createRevocableResources("cpus", "3");
@@ -528,27 +522,28 @@ TEST_F(OversubscriptionTest, RescindRevocableOfferWithIncreasedRevocable)
   Clock::settle();
 
   // The previous revocable offer should be rescinded.
-  AWAIT_EXPECT_EQ(offers2.get()[0].id(), offerId);
-
-  // The new offer should be the increased oversubscribed resources.
-  AWAIT_READY(offers3);
-  EXPECT_NE(0u, offers3->size());
-  EXPECT_EQ(createRevocableResources("cpus", "2"),
-            Resources(offers3.get()[0].resources()));
-
-  Future<vector<Offer>> offers4;
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers4));
-
-  // Advance the clock to trigger a batch allocation, this will
-  // allocate the oversubscribed resources that were rescinded.
+  AWAIT_EXPECT_EQ(offer->id(), offerId);
+
+  // The allocation run triggered by the agent resource update may or
+  // may not take into account the rescinded offer due to a race
+  // between the dispatched allocation and the recovery of the resources
+  // from the recinded offer. Therefore we advance the clock after these
+  // resources are recovered to trigger a batch allocation to make sure
+  // all resources are allocated.
+  Clock::settle();
   Clock::advance(masterFlags.allocation_interval);
   Clock::settle();
 
-  // The new offer should be the old oversubscribed resources.
-  AWAIT_READY(offers4);
-  EXPECT_NE(0u, offers4->size());
-  EXPECT_EQ(resources1, Resources(offers4.get()[0].resources()));
+  ASSERT_GT(offers.size(), 0);
+
+  // The total offered resources after the latest estimate.
+  Resources resources3;
+  for (size_t i = 0; i < offers.size(); i++) {
+    resources3 += offers.get()->resources();
+  }
+
+  // The offered resources should match the resource estimate.
+  EXPECT_EQ(resources2, resources3);
 
   driver.stop();
   driver.join();


[6/6] mesos git commit: Fix tests with rapidly triggered allocations.

Posted by ya...@apache.org.
Fix tests with rapidly triggered allocations.

- Per MESOS-6904, if cluster events with the possibility
  of triggering an allocation occur rapidly and test
  assertions depend on gleaning information from assumed
  order, it is likely they will fail due to lack of parity
  between event and actual allocation. This patch adjusts
  these expectations.

Review: https://reviews.apache.org/r/51028/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f68ed9b1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f68ed9b1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f68ed9b1

Branch: refs/heads/master
Commit: f68ed9b189fd1412eca8010435f4ac17d0cfab40
Parents: 2f170f3
Author: Jacob Janco <jj...@gmail.com>
Authored: Wed Feb 1 11:05:15 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Wed Feb 1 12:16:18 2017 -0800

----------------------------------------------------------------------
 src/tests/hierarchical_allocator_tests.cpp | 42 ++++++++++++++++++++-----
 1 file changed, 34 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f68ed9b1/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index e04d199..4a90651 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -2776,6 +2776,9 @@ TEST_F(HierarchicalAllocatorTest, SuppressAndReviveOffers)
   // framework's redundant REVIVE calls.
   allocator->reviveOffers(framework.id());
 
+  // Settle to ensure that the dispatched allocation is executed.
+  Clock::settle();
+
   // Nothing is allocated because of no additional resources.
   allocation = allocations.get();
   EXPECT_TRUE(allocation.isPending());
@@ -2923,14 +2926,21 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HierarchicalAllocatorTest, AllocationRunsMetric)
 
   SlaveInfo agent = createSlaveInfo("cpus:2;mem:1024;disk:0");
   allocator->addSlave(agent.id(), agent, None(), agent.resources(), {});
+
+  // Wait for the allocation triggered from `addSlave()` to complete.
+  // Otherwise `addFramework()` below may not trigger a new allocation
+  // because the allocator batches them.
+  Clock::settle();
+
   ++allocations; // Adding an agent triggers allocations.
 
   FrameworkInfo framework = createFrameworkInfo("role");
   allocator->addFramework(framework.id(), framework, {}, true);
-  ++allocations; // Adding a framework triggers allocations.
 
   Clock::settle();
 
+  ++allocations; // Adding a framework triggers allocations.
+
   expected.values = { {"allocator/mesos/allocation_runs", allocations} };
 
   metrics = Metrics();
@@ -2981,10 +2991,16 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
   SlaveInfo agent = createSlaveInfo("cpus:2;mem:1024;disk:0");
   allocator->addSlave(agent.id(), agent, None(), agent.resources(), {});
 
+  // Due to the batching of allocation work, wait for the `allocate()`
+  // call and subsequent work triggered by `addSlave()` to complete.
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
   FrameworkInfo framework = createFrameworkInfo("role1");
   allocator->addFramework(framework.id(), framework, {}, true);
 
-  // Wait for the allocation to complete.
+  // Wait for the allocation triggered by `addFramework()` to complete.
   AWAIT_READY(allocations.get());
 
   // Ensure the timer has been stopped so that
@@ -3327,6 +3343,15 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight)
   FrameworkInfo framework1 = createFrameworkInfo("role1");
   allocator->addFramework(framework1.id(), framework1, {}, true);
 
+  // Wait for the allocation triggered from `addFramework(framework1)`
+  // to complete. Otherwise due to a race between `addFramework(framework2)`
+  // and the next allocation (because it's run asynchronously), framework2
+  // may or may not be allocated resources. For simplicity here we give
+  // all resources to framework1 as all we wanted to achieve in this step
+  // is to recover all resources to set up the allocator for the next batch
+  // allocation.
+  Clock::settle();
+
   // Framework2 registers with 'role2' which also uses the default weight.
   // It will not get any offers due to all resources having outstanding offers
   // to framework1 when it registered.
@@ -3734,10 +3759,12 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, AddAndUpdateSlave)
 
   watch.stop();
 
-  ASSERT_EQ(slaveCount, offerCallbacks.load());
+  cout << "Added " << slaveCount << " agents in " << watch.elapsed()
+       << "; performed " << offerCallbacks.load() << " allocations" << endl;
 
-  cout << "Added " << slaveCount << " agents"
-       << " in " << watch.elapsed() << endl;
+  // Reset `offerCallbacks` to 0 to record allocations
+  // for the `updateSlave` operations.
+  offerCallbacks = 0;
 
   // Oversubscribed resources on each slave.
   Resource oversubscribed = Resources::parse("cpus", "10", "*").get();
@@ -3754,9 +3781,8 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, AddAndUpdateSlave)
 
   watch.stop();
 
-  ASSERT_EQ(slaveCount * 2, offerCallbacks.load());
-
-  cout << "Updated " << slaveCount << " agents in " << watch.elapsed() << endl;
+  cout << "Updated " << slaveCount << " agents" << " in " << watch.elapsed()
+       << " performing " << offerCallbacks.load() << " allocations" << endl;
 }