You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/05/29 03:02:47 UTC

[3/3] mesos git commit: Implemented 'updateSlave()' call in the master.

Implemented 'updateSlave()' call in the master.

Review: https://reviews.apache.org/r/34736


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fbf5c7e7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fbf5c7e7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fbf5c7e7

Branch: refs/heads/master
Commit: fbf5c7e703c691f8b8bcf20ea7c324e9987beab1
Parents: 949e6ad
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed May 27 16:07:59 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu May 28 17:11:04 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp                |  53 ++++++++-
 src/tests/oversubscription_tests.cpp | 172 +++++++++++++++++++++++++++++-
 2 files changed, 219 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fbf5c7e7/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d61b77b..710b814 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3463,11 +3463,52 @@ void Master::updateSlave(
 {
   ++metrics->messages_update_slave;
 
-  LOG(INFO) << "Received update of slave " << slaveId
-            << " with oversubscribed resources " <<  oversubscribedResources;
+  if (slaves.removed.get(slaveId).isSome()) {
+    // If the slave is removed, we have already informed
+    // frameworks that its tasks were LOST, so the slave should
+    // shut down.
+    LOG(WARNING)
+      << "Ignoring update of slave with total oversubscribed resources "
+      << oversubscribedResources << " on removed slave " << slaveId
+      << " ; asking slave to shutdown";
+
+    ShutdownMessage message;
+    message.set_message("Update slave message from unknown slave");
+    reply(message);
+    return;
+  }
+
+  if (!slaves.registered.contains(slaveId)) {
+    LOG(WARNING)
+      << "Ignoring update of slave with total oversubscribed resources "
+      << oversubscribedResources << " on unknown slave " << slaveId;
+    return;
+  }
+
+  Slave* slave = CHECK_NOTNULL(slaves.registered.get(slaveId));
+
+  LOG(INFO) << "Received update of slave " << *slave << " with total"
+            << " oversubscribed resources " <<  oversubscribedResources;
+
+  // First, rescind any oustanding offers with revocable resources.
+  // NOTE: Need a copy of offers because the offers are removed inside
+  // the loop.
+  foreach (Offer* offer, utils::copy(slave->offers)) {
+    const Resources offered = offer->resources();
+    if (!offered.revocable().empty()) {
+      LOG(INFO) << "Removing offer " << offer->id()
+                << " with revocable resources " << offered
+                << " on slave " << *slave;
+
+      allocator->recoverResources(
+          offer->framework_id(), offer->slave_id(), offer->resources(), None());
+
+      removeOffer(offer, true); // Rescind.
+    }
+  }
 
-  // TODO(vinod): Rescind any oustanding revocable offers from this
-  // slave and update the allocator.
+  // Now, update the allocator with the new estimate.
+  allocator->updateSlave(slaveId, oversubscribedResources);
 }
 
 
@@ -3984,6 +4025,10 @@ void Master::offer(const FrameworkID& frameworkId,
     }
 #endif // WITH_NETWORK_ISOLATOR
 
+    // TODO(vinod): Split regular and revocable resources into
+    // separate offers, so that rescinding offers with revocable
+    // resources does not affect offers with regular resources.
+
     Offer* offer = new Offer();
     offer->mutable_id()->MergeFrom(newOfferId());
     offer->mutable_framework_id()->MergeFrom(framework->id());

http://git-wip-us.apache.org/repos/asf/mesos/blob/fbf5c7e7/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 1dda63e..ea5857c 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -16,15 +16,21 @@
  * limitations under the License.
  */
 
+#include <string>
+#include <vector>
+
 #include <gmock/gmock.h>
 
 #include <mesos/resources.hpp>
 
 #include <process/clock.hpp>
+#include <process/future.hpp>
 #include <process/gtest.hpp>
 
 #include <stout/gtest.hpp>
 
+#include "common/resources_utils.hpp"
+
 #include "master/master.hpp"
 
 #include "messages/messages.hpp"
@@ -41,11 +47,28 @@ using mesos::internal::master::Master;
 
 using mesos::internal::slave::Slave;
 
+using std::string;
+using std::vector;
+
 namespace mesos {
 namespace internal {
 namespace tests {
 
-class OversubscriptionTest : public MesosTest {};
+class OversubscriptionTest : public MesosTest
+{
+protected:
+  // TODO(vinod): Make this a global helper that other tests (e.g.,
+  // hierarchical allocator tests) can use.
+  Resources createRevocableResources(
+      const string& name,
+      const string& value,
+      const string& role = "*")
+  {
+    Resource resource = Resources::parse(name, value, role).get();
+    resource.mutable_revocable();
+    return resource;
+  }
+};
 
 
 // This test verifies that slave will forward the estimation of the
@@ -79,7 +102,7 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
   ASSERT_FALSE(update.isReady());
 
   // Inject an estimation of oversubscribable resources.
-  Resources resources = Resources::parse("cpus:1;mem:32").get();
+  Resources resources = createRevocableResources("cpus", "1");
   resourceEstimator.estimate(resources);
 
   AWAIT_READY(update);
@@ -97,6 +120,151 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
   Shutdown();
 }
 
+
+// This test verifies that a framework that desires revocable
+// resources gets an offer with revocable resources.
+TEST_F(OversubscriptionTest, RevocableOffer)
+{
+  // Start the master.
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Start the slave with test resource estimator.
+  TestResourceEstimator resourceEstimator;
+  slave::Flags flags = CreateSlaveFlags();
+
+  Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
+  ASSERT_SOME(slave);
+
+  // Start the framework which desires revocable resources.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers1));
+
+  driver.start();
+
+  // Initially the framework will get all regular resources.
+  AWAIT_READY(offers1);
+  EXPECT_NE(0u, offers1.get().size());
+  EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty());
+
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Inject an estimation of oversubscribable resources.
+  Resources resources = createRevocableResources("cpus", "1");
+  resourceEstimator.estimate(resources);
+
+  // Now the framework will get revocable resources.
+  AWAIT_READY(offers2);
+  EXPECT_NE(0u, offers2.get().size());
+  EXPECT_EQ(resources, Resources(offers2.get()[0].resources()));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that when the master receives a new estimate for
+// oversubscribed resources it rescinds outstanding revocable offers.
+TEST_F(OversubscriptionTest, RescindRevocableOffer)
+{
+  // Start the master.
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Start the slave with test resource estimator.
+  TestResourceEstimator resourceEstimator;
+  slave::Flags flags = CreateSlaveFlags();
+
+  Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
+  ASSERT_SOME(slave);
+
+  // Start the framework which desires revocable resources.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers1));
+
+  driver.start();
+
+  // Initially the framework will get all regular resources.
+  AWAIT_READY(offers1);
+  EXPECT_NE(0u, offers1.get().size());
+  EXPECT_TRUE(Resources(offers1.get()[0].resources()).revocable().empty());
+
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers2));
+
+  // Inject an estimation of oversubscribable resources.
+  Resources resources = createRevocableResources("cpus", "1");
+  resourceEstimator.estimate(resources);
+
+  // Now the framework will get revocable resources.
+  AWAIT_READY(offers2);
+  EXPECT_NE(0u, offers2.get().size());
+  EXPECT_EQ(resources, Resources(offers2.get()[0].resources()));
+
+  Future<OfferID> offerId;
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillOnce(FutureArg<1>(&offerId));
+
+  Future<vector<Offer>> offers3;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers3))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Inject another estimation of oversubscribable resources while the
+  // previous revocable offer is oustanding.
+  Resources resources2 = createRevocableResources("cpus", "2");
+  resourceEstimator.estimate(resources2);
+
+  // Advance the clock for the slave to send the new estimate.
+  Clock::pause();
+  Clock::advance(flags.oversubscribed_resources_interval);
+  Clock::settle();
+
+  // The previous revocable offer should be rescinded.
+  AWAIT_EXPECT_EQ(offers2.get()[0].id(), offerId);
+
+  // Resume the clock for next allocation.
+  Clock::resume();
+
+  // The new offer should include the latest oversubscribed resources.
+  AWAIT_READY(offers3);
+  EXPECT_NE(0u, offers3.get().size());
+  EXPECT_EQ(resources2, Resources(offers3.get()[0].resources()));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {