You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/06/11 20:36:25 UTC

[6/6] mesos git commit: Implemented the fixed resource estimator.

Implemented the fixed resource estimator.

Review: https://reviews.apache.org/r/35321


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a89ba3f1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a89ba3f1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a89ba3f1

Branch: refs/heads/master
Commit: a89ba3f1934e02e585ddcbe15029c41baefc5f23
Parents: 9f0bf57
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Jun 10 12:47:30 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Jun 11 11:31:06 2015 -0700

----------------------------------------------------------------------
 src/slave/resource_estimators/fixed.cpp | 30 ++++++++-----
 src/tests/oversubscription_tests.cpp    | 65 +++++++++++++++++++++++++++-
 2 files changed, 84 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a89ba3f1/src/slave/resource_estimators/fixed.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimators/fixed.cpp b/src/slave/resource_estimators/fixed.cpp
index ca16137..305664c 100644
--- a/src/slave/resource_estimators/fixed.cpp
+++ b/src/slave/resource_estimators/fixed.cpp
@@ -20,6 +20,7 @@
 
 #include <mesos/slave/resource_estimator.hpp>
 
+#include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/owned.hpp>
 #include <process/process.hpp>
@@ -40,30 +41,39 @@ class FixedResourceEstimatorProcess
 public:
   FixedResourceEstimatorProcess(
       const lambda::function<Future<ResourceUsage>()>& _usage,
-      const Resources& _resources)
+      const Resources& _totalRevocable)
     : usage(_usage),
-      resources(_resources) {}
+      totalRevocable(_totalRevocable) {}
 
   Future<Resources> oversubscribable()
   {
-    // TODO(jieyu): This is a stub implementation.
-    return resources;
+    return usage().then(defer(self(), &Self::_oversubscribable, lambda::_1));
+  }
+
+  Future<Resources> _oversubscribable(const ResourceUsage& usage)
+  {
+    Resources allocatedRevocable;
+    foreach (const ResourceUsage::Executor& executor, usage.executors()) {
+      allocatedRevocable += Resources(executor.allocated()).revocable();
+    }
+
+    return totalRevocable - allocatedRevocable;
   }
 
 protected:
   const lambda::function<Future<ResourceUsage>()> usage;
-  const Resources resources;
+  const Resources totalRevocable;
 };
 
 
 class FixedResourceEstimator : public ResourceEstimator
 {
 public:
-  FixedResourceEstimator(const Resources& _resources)
-    : resources(_resources)
+  FixedResourceEstimator(const Resources& _totalRevocable)
+    : totalRevocable(_totalRevocable)
   {
     // Mark all resources as revocable.
-    foreach (Resource& resource, resources) {
+    foreach (Resource& resource, totalRevocable) {
       resource.mutable_revocable();
     }
   }
@@ -83,7 +93,7 @@ public:
       return Error("Fixed resource estimator has already been initialized");
     }
 
-    process.reset(new FixedResourceEstimatorProcess(usage, resources));
+    process.reset(new FixedResourceEstimatorProcess(usage, totalRevocable));
     spawn(process.get());
 
     return Nothing();
@@ -101,7 +111,7 @@ public:
   }
 
 private:
-  Resources resources;
+  Resources totalRevocable;
   Owned<FixedResourceEstimatorProcess> process;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a89ba3f1/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 58a20d4..e8ae053 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -425,9 +425,72 @@ TEST_F(OversubscriptionTest, FixedResourceEstimator)
   AWAIT_READY(update);
 
   Resources resources = update.get().oversubscribed_resources();
-
   EXPECT_SOME_EQ(2.0, resources.cpus());
 
+  Clock::resume();
+
+  // Launch a task that uses revocable resources and verify that the
+  // total oversubscribed resources does not change.
+
+  // We don't expect to receive an UpdateSlaveMessage because the
+  // total oversubscribed resources does not change.
+  EXPECT_NO_FUTURE_PROTOBUFS(UpdateSlaveMessage(), _, _);
+
+  // Start the framework which desires revocable resources.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::REVOCABLE_RESOURCES);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+  const Offer offer = offers.get()[0];
+
+  // The offer should contain revocable resources.
+  ASSERT_SOME_EQ(2.0, Resources(offer.resources()).revocable().cpus());
+
+  // Now, launch a task that uses revocable resources.
+  Resources taskResources = createRevocableResources("cpus", "1");
+  taskResources += Resources::parse("mem:32").get();
+
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      taskResources,
+      "sleep 1000");
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // Advance the clock for the slave to trigger the calculation of the
+  // total oversubscribed resources. As we described above, we don't
+  // expect a new UpdateSlaveMessage being generated.
+  Clock::pause();
+  Clock::advance(flags.oversubscribed_resources_interval);
+  Clock::settle();
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
   Shutdown();
 }