You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/05/22 01:01:49 UTC

mesos git commit: Used the pull model to get estimations from resource estimator.

Repository: mesos
Updated Branches:
  refs/heads/master f15d37ce5 -> e530b4e43


Used the pull model to get estimations from resource estimator.

Review: https://reviews.apache.org/r/34559


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e530b4e4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e530b4e4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e530b4e4

Branch: refs/heads/master
Commit: e530b4e435d0d9a135701a229f376724fe90f10a
Parents: f15d37c
Author: Jie Yu <yu...@gmail.com>
Authored: Thu May 21 12:04:32 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu May 21 16:01:36 2015 -0700

----------------------------------------------------------------------
 include/mesos/slave/resource_estimator.hpp | 25 ++++++++-------
 src/slave/resource_estimator.cpp           | 41 ++++++++++++++-----------
 src/slave/resource_estimator.hpp           |  4 +--
 src/slave/slave.cpp                        | 27 ++++++++++------
 src/slave/slave.hpp                        |  2 +-
 src/tests/mesos.hpp                        | 14 ++++++---
 6 files changed, 67 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/include/mesos/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp
index d64c698..e7f5dec 100644
--- a/include/mesos/slave/resource_estimator.hpp
+++ b/include/mesos/slave/resource_estimator.hpp
@@ -25,7 +25,6 @@
 
 #include <process/future.hpp>
 
-#include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/try.hpp>
@@ -46,17 +45,21 @@ public:
 
   virtual ~ResourceEstimator() {}
 
-  // Initializes this resource estimator. It registers a callback with
-  // the resource estimator. The callback allows the resource
-  // estimator to tell the slave about the current estimation of the
-  // *maximum* amount of resources that can be oversubscribed on the
-  // slave. A new estimation will invalidate all the previously
-  // returned estimations. The slave will keep track of the most
-  // recent estimation and periodically send it to the master.
-  //
+  // Initializes this resource estimator. This method needs to be
+  // called before any other member method is called.
   // TODO(jieyu): Pass ResourceMonitor* once it's exposed.
-  virtual Try<Nothing> initialize(
-      const lambda::function<void(const Resources&)>& oversubscribe) = 0;
+  virtual Try<Nothing> initialize() = 0;
+
+  // Returns the current estimation about the *maximum* amount of
+  // resources that can be oversubscribed on the slave. A new
+  // estimation will invalidate all the previously returned
+  // estimations. The slave will be calling this method continuously
+  // to keep track of the most up-to-date estimation and periodically
+  // forward it to the master. As a result, to avoid overwhelming the
+  // slave, it is recommended that the resource estimator should
+  // return an estimation only if the current estimation is
+  // significantly different from the previous one.
+  virtual process::Future<Resources> oversubscribable() = 0;
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/slave/resource_estimator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.cpp b/src/slave/resource_estimator.cpp
index 7b7b499..a67640c 100644
--- a/src/slave/resource_estimator.cpp
+++ b/src/slave/resource_estimator.cpp
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-#include <process/delay.hpp>
 #include <process/dispatch.hpp>
 #include <process/process.hpp>
 
@@ -53,25 +52,20 @@ class NoopResourceEstimatorProcess :
   public Process<NoopResourceEstimatorProcess>
 {
 public:
-  NoopResourceEstimatorProcess(
-      const lambda::function<void(const Resources&)>& _oversubscribe)
-    : oversubscribe(_oversubscribe) {}
+  NoopResourceEstimatorProcess() : sent(false) {}
 
-protected:
-  virtual void initialize()
+  Future<Resources> oversubscribable()
   {
-    notify();
-  }
-
-  // Periodically notify the slave about oversubscribable resources.
-  void notify()
-  {
-    oversubscribe(Resources());
+    if (!sent) {
+      sent = true;
+      return Resources();
+    }
 
-    delay(Seconds(1), self(), &Self::notify);
+    return Future<Resources>();
   }
 
-  const lambda::function<void(const Resources&)> oversubscribe;
+private:
+  bool sent;
 };
 
 
@@ -84,19 +78,30 @@ NoopResourceEstimator::~NoopResourceEstimator()
 }
 
 
-Try<Nothing> NoopResourceEstimator::initialize(
-    const lambda::function<void(const Resources&)>& oversubscribe)
+Try<Nothing> NoopResourceEstimator::initialize()
 {
   if (process.get() != NULL) {
     return Error("Noop resource estimator has already been initialized");
   }
 
-  process.reset(new NoopResourceEstimatorProcess(oversubscribe));
+  process.reset(new NoopResourceEstimatorProcess());
   spawn(process.get());
 
   return Nothing();
 }
 
+
+Future<Resources> NoopResourceEstimator::oversubscribable()
+{
+  if (process.get() == NULL) {
+    return Failure("Noop resource estimator is not initialized");
+  }
+
+  return dispatch(
+      process.get(),
+      &NoopResourceEstimatorProcess::oversubscribable);
+}
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.hpp b/src/slave/resource_estimator.hpp
index 5a6367c..717804d 100644
--- a/src/slave/resource_estimator.hpp
+++ b/src/slave/resource_estimator.hpp
@@ -39,8 +39,8 @@ class NoopResourceEstimator : public mesos::slave::ResourceEstimator
 public:
   virtual ~NoopResourceEstimator();
 
-  virtual Try<Nothing> initialize(
-      const lambda::function<void(const Resources&)>& oversubscribe);
+  virtual Try<Nothing> initialize();
+  virtual process::Future<Resources> oversubscribable();
 
 protected:
   process::Owned<NoopResourceEstimatorProcess> process;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8e88482..b4d2029 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -324,9 +324,7 @@ void Slave::initialize()
   }
 
   // TODO(jieyu): Pass ResourceMonitor* to 'initialize'.
-  Try<Nothing> initialize = resourceEstimator->initialize(
-      defer(self(), &Self::updateOversubscribableResources, lambda::_1));
-
+  Try<Nothing> initialize = resourceEstimator->initialize();
   if (initialize.isError()) {
     EXIT(1) << "Failed to initialize the resource estimator: "
             << initialize.error();
@@ -3980,8 +3978,11 @@ void Slave::__recover(const Future<Nothing>& future)
   if (flags.recover == "reconnect") {
     state = DISCONNECTED;
 
-    // Start to send updates about oversubscribable resources.
-    forwardOversubscribableResources();
+    // Start to get estimations from the resource estimator and
+    // forward the estimations to the master.
+    resourceEstimator->oversubscribable()
+      .onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1))
+      .onAny(defer(self(), &Self::forwardOversubscribableResources));
 
     // Start detecting masters.
     detection = detector->detect()
@@ -4072,12 +4073,20 @@ Future<Nothing> Slave::garbageCollect(const string& path)
 }
 
 
-void Slave::updateOversubscribableResources(const Resources& resources)
+void Slave::updateOversubscribableResources(const Future<Resources>& future)
 {
-  LOG(INFO) << "Received a new estimation of the oversubscribable "
-            << "resources " << resources;
+  if (!future.isReady()) {
+    LOG(ERROR) << "Failed to estimate oversubscribable resources: "
+               << (future.isFailed() ? future.failure() : "discarded");
+  } else {
+    LOG(INFO) << "Received a new estimation of the oversubscribable "
+              << "resources " << future.get();
+
+    oversubscribableResources = future.get();
+  }
 
-  oversubscribableResources = resources;
+  resourceEstimator->oversubscribable()
+    .onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d82b10c..0207eaf 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -433,7 +433,7 @@ private:
       const FrameworkID& frameworkId,
       const Executor* executor);
 
-  void updateOversubscribableResources(const Resources& resources);
+  void updateOversubscribableResources(const Future<Resources>& future);
   void forwardOversubscribableResources();
 
   const Flags flags;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index a60df75..924b0ff 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -38,6 +38,7 @@
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 #include <process/process.hpp>
+#include <process/queue.hpp>
 
 #include <stout/bytes.hpp>
 #include <stout/foreach.hpp>
@@ -704,20 +705,23 @@ public:
 class TestResourceEstimator : public mesos::slave::ResourceEstimator
 {
 public:
-  virtual Try<Nothing> initialize(
-      const lambda::function<void(const Resources&)>& _oversubscribe)
+  virtual Try<Nothing> initialize()
   {
-    oversubscribe = _oversubscribe;
     return Nothing();
   }
 
+  virtual process::Future<Resources> oversubscribable()
+  {
+    return queue.get();
+  }
+
   void estimate(const Resources& resources)
   {
-    oversubscribe(resources);
+    queue.put(resources);
   }
 
 private:
-  lambda::function<void(const Resources&)> oversubscribe;
+  process::Queue<Resources> queue;
 };