You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2016/10/07 19:21:07 UTC

mesos git commit: Fixed a race in the master when updating oversubscribed resources.

Repository: mesos
Updated Branches:
  refs/heads/master 513bdf040 -> bb8b14456


Fixed a race in the master when updating oversubscribed resources.

The reason that we need `updateSlave` first and then rescind offer
is because of a race condition case: there may be a batch allocation
triggered between rescind offer and `updateSlave`. In this case, the
order will be rescind offer -> batch allocation -> update slave. This
order will cause some issues when the oversubscribed resources was
decreased.

Suppose the oversubscribed resources was decreased from 2 to 1, then
after rescind offer finished, the batch allocation will allocate the
old 2 oversubscribed resources again, then update slave will update
the total oversubscribed resources to 1. This will cause the agent
host have some time overcommitted due to the tasks can still use 2
oversubscribed resources but not 1 oversubscribed resources, once
the tasks using the 2 oversubscribed resources finished, everything
goes back.

If we update slave first then rescind offer, the order will be update
slave -> batch allocation -> rescind offer, this order will have no
problem when shrinking resources. Suppose the oversubscribed resources
was shrinked from 2 to 1, then update slave will update total
oversubscribed resources to 1 directly, then the batch allocation will
not allocate any oversubscribed resources since there are more
allocated than total oversubscribed resources, then rescind offer
will rescind all offers using oversubscribed resources. This will
not lead the agent host to be overcommitted.

Review: https://reviews.apache.org/r/52465/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bb8b1445
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bb8b1445
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bb8b1445

Branch: refs/heads/master
Commit: bb8b14456d3bbe8126a3078e7577c0995b35fe45
Parents: 513bdf0
Author: Guangya Liu <gy...@gmail.com>
Authored: Fri Oct 7 12:16:54 2016 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri Oct 7 12:20:52 2016 -0700

----------------------------------------------------------------------
 src/master/master.cpp                |  20 ++--
 src/tests/oversubscription_tests.cpp | 158 ++++++++++++++++++++++++++----
 2 files changed, 153 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bb8b1445/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c7e74df..8837f3d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5542,7 +5542,19 @@ void Master::updateSlave(
   LOG(INFO) << "Received update of agent " << *slave << " with total"
             << " oversubscribed resources " <<  oversubscribedResources;
 
-  // First, rescind any outstanding offers with revocable resources.
+  // NOTE: We must *first* update the agent's resources before we
+  // recover the resources. If we recovered the resources first,
+  // an allocation could trigger between recovering resources and
+  // updating the agent in the allocator. This would lead us to
+  // re-send out the stale oversubscribed resources!
+
+  slave->totalResources =
+    slave->totalResources.nonRevocable() + oversubscribedResources.revocable();
+
+  // First update the agent's resources in the allocator.
+  allocator->updateSlave(slaveId, oversubscribedResources);
+
+  // Then rescind any outstanding offers with revocable resources.
   // NOTE: Need a copy of offers because the offers are removed inside the loop.
   foreach (Offer* offer, utils::copy(slave->offers)) {
     const Resources& offered = offer->resources();
@@ -5560,12 +5572,6 @@ void Master::updateSlave(
 
   // NOTE: We don't need to rescind inverse offers here as they are unrelated to
   // oversubscription.
-
-  slave->totalResources =
-    slave->totalResources.nonRevocable() + oversubscribedResources.revocable();
-
-  // Now, update the allocator with the new estimate.
-  allocator->updateSlave(slaveId, oversubscribedResources);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bb8b1445/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 3dd34ea..b356fb6 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -440,10 +440,18 @@ TEST_F(OversubscriptionTest, RevocableOffer)
 
 // This test verifies that when the master receives a new estimate for
 // oversubscribed resources it rescinds outstanding revocable offers.
-TEST_F(OversubscriptionTest, RescindRevocableOffer)
+// In this test the oversubscribed resources are increased, so the master
+// will send out two offers, the first one is the increased oversubscribed
+// resources and the second one is the oversubscribed resources from the
+// rescind offered resources.
+TEST_F(OversubscriptionTest, RescindRevocableOfferWithIncreasedRevocable)
 {
+  // Pause the clock because we want to manually drive the allocations.
+  Clock::pause();
+
   // Start the master.
-  Try<Owned<cluster::Master>> master = StartMaster();
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
   ASSERT_SOME(master);
 
   // Start the slave with test resource estimator.
@@ -457,12 +465,12 @@ TEST_F(OversubscriptionTest, RescindRevocableOffer)
     .Times(2)
     .WillRepeatedly(InvokeWithoutArgs(&estimations, &Queue<Resources>::get));
 
-  slave::Flags flags = CreateSlaveFlags();
+  slave::Flags agentFlags = CreateSlaveFlags();
 
   Owned<MasterDetector> detector = master.get()->createDetector();
 
   Try<Owned<cluster::Slave>> slave =
-    StartSlave(detector.get(), &resourceEstimator, flags);
+    StartSlave(detector.get(), &resourceEstimator, agentFlags);
   ASSERT_SOME(slave);
 
   // Start the framework which desires revocable resources.
@@ -484,7 +492,7 @@ TEST_F(OversubscriptionTest, RescindRevocableOffer)
 
   // Initially the framework will get all regular resources.
   AWAIT_READY(offers1);
-  EXPECT_NE(0u, offers1.get().size());
+  EXPECT_NE(0u, offers1->size());
   EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty());
 
   Future<vector<Offer>> offers2;
@@ -492,13 +500,13 @@ TEST_F(OversubscriptionTest, RescindRevocableOffer)
     .WillOnce(FutureArg<1>(&offers2));
 
   // Inject an estimation of oversubscribable resources.
-  Resources resources = createRevocableResources("cpus", "1");
-  estimations.put(resources);
+  Resources resources1 = createRevocableResources("cpus", "1");
+  estimations.put(resources1);
 
   // Now the framework will get revocable resources.
   AWAIT_READY(offers2);
-  EXPECT_NE(0u, offers2.get().size());
-  EXPECT_EQ(resources, Resources(offers2.get()[0].resources()));
+  EXPECT_NE(0u, offers2->size());
+  EXPECT_EQ(resources1, Resources(offers2.get()[0].resources()));
 
   Future<OfferID> offerId;
   EXPECT_CALL(sched, offerRescinded(&driver, _))
@@ -506,28 +514,142 @@ TEST_F(OversubscriptionTest, RescindRevocableOffer)
 
   Future<vector<Offer>> offers3;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers3))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
+    .WillOnce(FutureArg<1>(&offers3));
 
-  // Inject another estimation of oversubscribable resources while the
-  // previous revocable offer is outstanding.
-  Resources resources2 = createRevocableResources("cpus", "2");
+  // Inject another estimation of increased oversubscribable resources
+  // while the previous revocable offer is outstanding.
+  Resources resources2 = createRevocableResources("cpus", "3");
   estimations.put(resources2);
 
   // Advance the clock for the slave to send the new estimate.
+  Clock::advance(agentFlags.oversubscribed_resources_interval);
+  Clock::settle();
+
+  // The previous revocable offer should be rescinded.
+  AWAIT_EXPECT_EQ(offers2.get()[0].id(), offerId);
+
+  // The new offer should be the increased oversubscribed resources.
+  AWAIT_READY(offers3);
+  EXPECT_NE(0u, offers3->size());
+  EXPECT_EQ(createRevocableResources("cpus", "2"),
+            Resources(offers3.get()[0].resources()));
+
+  Future<vector<Offer>> offers4;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers4));
+
+  // Advance the clock to trigger a batch allocation, this will
+  // allocate the oversubscribed resources that were rescinded.
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::settle();
+
+  // The new offer should be the old oversubscribed resources.
+  AWAIT_READY(offers4);
+  EXPECT_NE(0u, offers4->size());
+  EXPECT_EQ(resources1, Resources(offers4.get()[0].resources()));
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test verifies that when the master receives a new estimate for
+// oversubscribed resources it rescinds outstanding revocable offers.
+// In this test the oversubscribed resources are decreased, so the
+// master will send out only one offer with the latest oversubscribed
+// resources from the resource estimator.
+TEST_F(OversubscriptionTest, RescindRevocableOfferWithDecreasedRevocable)
+{
+  // Pause the clock because we want to manually drive the allocations.
   Clock::pause();
-  Clock::advance(flags.oversubscribed_resources_interval);
+
+  // Start the master.
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Start the slave with test resource estimator.
+  MockResourceEstimator resourceEstimator;
+
+  EXPECT_CALL(resourceEstimator, initialize(_));
+
+  Queue<Resources> estimations;
+  // We expect 2 calls for 2 estimations.
+  EXPECT_CALL(resourceEstimator, oversubscribable())
+    .Times(2)
+    .WillRepeatedly(InvokeWithoutArgs(&estimations, &Queue<Resources>::get));
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &resourceEstimator, agentFlags);
+  ASSERT_SOME(slave);
+
+  // Start the framework which desires revocable resources.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers1));
+
+  driver.start();
+
+  // Initially the framework will get all regular resources.
+  AWAIT_READY(offers1);
+  EXPECT_NE(0u, offers1->size());
+  EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty());
+
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers2));
+
+  // Inject an estimation of oversubscribable resources.
+  Resources resources1 = createRevocableResources("cpus", "3");
+  estimations.put(resources1);
+
+  // Now the framework will get revocable resources.
+  AWAIT_READY(offers2);
+  EXPECT_NE(0u, offers2->size());
+  EXPECT_EQ(resources1, Resources(offers2.get()[0].resources()));
+
+  Future<OfferID> offerId;
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillOnce(FutureArg<1>(&offerId));
+
+  Future<vector<Offer>> offers3;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers3));
+
+  // Inject another estimation of decreased oversubscribable resources
+  // while the previous revocable offer is outstanding.
+  Resources resources2 = createRevocableResources("cpus", "1");
+  estimations.put(resources2);
+
+  // Advance the clock for the slave to send the new estimate.
+  Clock::advance(agentFlags.oversubscribed_resources_interval);
   Clock::settle();
 
   // The previous revocable offer should be rescinded.
   AWAIT_EXPECT_EQ(offers2.get()[0].id(), offerId);
 
-  // Resume the clock for next allocation.
-  Clock::resume();
+  // Advance the clock to trigger a batch allocation, this will
+  // allocate the oversubscribed resources that were rescinded.
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::settle();
 
   // The new offer should include the latest oversubscribed resources.
   AWAIT_READY(offers3);
-  EXPECT_NE(0u, offers3.get().size());
+  EXPECT_NE(0u, offers3->size());
   EXPECT_EQ(resources2, Resources(offers3.get()[0].resources()));
 
   driver.stop();