You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/05/22 01:01:49 UTC
mesos git commit: Used the pull model to get estimations from
resource estimator.
Repository: mesos
Updated Branches:
refs/heads/master f15d37ce5 -> e530b4e43
Used the pull model to get estimations from resource estimator.
Review: https://reviews.apache.org/r/34559
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e530b4e4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e530b4e4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e530b4e4
Branch: refs/heads/master
Commit: e530b4e435d0d9a135701a229f376724fe90f10a
Parents: f15d37c
Author: Jie Yu <yu...@gmail.com>
Authored: Thu May 21 12:04:32 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu May 21 16:01:36 2015 -0700
----------------------------------------------------------------------
include/mesos/slave/resource_estimator.hpp | 25 ++++++++-------
src/slave/resource_estimator.cpp | 41 ++++++++++++++-----------
src/slave/resource_estimator.hpp | 4 +--
src/slave/slave.cpp | 27 ++++++++++------
src/slave/slave.hpp | 2 +-
src/tests/mesos.hpp | 14 ++++++---
6 files changed, 67 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/include/mesos/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp
index d64c698..e7f5dec 100644
--- a/include/mesos/slave/resource_estimator.hpp
+++ b/include/mesos/slave/resource_estimator.hpp
@@ -25,7 +25,6 @@
#include <process/future.hpp>
-#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/try.hpp>
@@ -46,17 +45,21 @@ public:
virtual ~ResourceEstimator() {}
- // Initializes this resource estimator. It registers a callback with
- // the resource estimator. The callback allows the resource
- // estimator to tell the slave about the current estimation of the
- // *maximum* amount of resources that can be oversubscribed on the
- // slave. A new estimation will invalidate all the previously
- // returned estimations. The slave will keep track of the most
- // recent estimation and periodically send it to the master.
- //
+ // Initializes this resource estimator. This method needs to be
+ // called before any other member method is called.
// TODO(jieyu): Pass ResourceMonitor* once it's exposed.
- virtual Try<Nothing> initialize(
- const lambda::function<void(const Resources&)>& oversubscribe) = 0;
+ virtual Try<Nothing> initialize() = 0;
+
+ // Returns the current estimation about the *maximum* amount of
+ // resources that can be oversubscribed on the slave. A new
+ // estimation will invalidate all the previously returned
+ // estimations. The slave will be calling this method continuously
+ // to keep track of the most up-to-date estimation and periodically
+ // forward it to the master. As a result, to avoid overwhelming the
+ // slave, it is recommended that the resource estimator should
+ // return an estimation only if the current estimation is
+ // significantly different from the previous one.
+ virtual process::Future<Resources> oversubscribable() = 0;
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/slave/resource_estimator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.cpp b/src/slave/resource_estimator.cpp
index 7b7b499..a67640c 100644
--- a/src/slave/resource_estimator.cpp
+++ b/src/slave/resource_estimator.cpp
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/process.hpp>
@@ -53,25 +52,20 @@ class NoopResourceEstimatorProcess :
public Process<NoopResourceEstimatorProcess>
{
public:
- NoopResourceEstimatorProcess(
- const lambda::function<void(const Resources&)>& _oversubscribe)
- : oversubscribe(_oversubscribe) {}
+ NoopResourceEstimatorProcess() : sent(false) {}
-protected:
- virtual void initialize()
+ Future<Resources> oversubscribable()
{
- notify();
- }
-
- // Periodically notify the slave about oversubscribable resources.
- void notify()
- {
- oversubscribe(Resources());
+ if (!sent) {
+ sent = true;
+ return Resources();
+ }
- delay(Seconds(1), self(), &Self::notify);
+ return Future<Resources>();
}
- const lambda::function<void(const Resources&)> oversubscribe;
+private:
+ bool sent;
};
@@ -84,19 +78,30 @@ NoopResourceEstimator::~NoopResourceEstimator()
}
-Try<Nothing> NoopResourceEstimator::initialize(
- const lambda::function<void(const Resources&)>& oversubscribe)
+Try<Nothing> NoopResourceEstimator::initialize()
{
if (process.get() != NULL) {
return Error("Noop resource estimator has already been initialized");
}
- process.reset(new NoopResourceEstimatorProcess(oversubscribe));
+ process.reset(new NoopResourceEstimatorProcess());
spawn(process.get());
return Nothing();
}
+
+Future<Resources> NoopResourceEstimator::oversubscribable()
+{
+ if (process.get() == NULL) {
+ return Failure("Noop resource estimator is not initialized");
+ }
+
+ return dispatch(
+ process.get(),
+ &NoopResourceEstimatorProcess::oversubscribable);
+}
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.hpp b/src/slave/resource_estimator.hpp
index 5a6367c..717804d 100644
--- a/src/slave/resource_estimator.hpp
+++ b/src/slave/resource_estimator.hpp
@@ -39,8 +39,8 @@ class NoopResourceEstimator : public mesos::slave::ResourceEstimator
public:
virtual ~NoopResourceEstimator();
- virtual Try<Nothing> initialize(
- const lambda::function<void(const Resources&)>& oversubscribe);
+ virtual Try<Nothing> initialize();
+ virtual process::Future<Resources> oversubscribable();
protected:
process::Owned<NoopResourceEstimatorProcess> process;
http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8e88482..b4d2029 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -324,9 +324,7 @@ void Slave::initialize()
}
// TODO(jieyu): Pass ResourceMonitor* to 'initialize'.
- Try<Nothing> initialize = resourceEstimator->initialize(
- defer(self(), &Self::updateOversubscribableResources, lambda::_1));
-
+ Try<Nothing> initialize = resourceEstimator->initialize();
if (initialize.isError()) {
EXIT(1) << "Failed to initialize the resource estimator: "
<< initialize.error();
@@ -3980,8 +3978,11 @@ void Slave::__recover(const Future<Nothing>& future)
if (flags.recover == "reconnect") {
state = DISCONNECTED;
- // Start to send updates about oversubscribable resources.
- forwardOversubscribableResources();
+ // Start to get estimations from the resource estimator and
+ // forward the estimations to the master.
+ resourceEstimator->oversubscribable()
+ .onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1))
+ .onAny(defer(self(), &Self::forwardOversubscribableResources));
// Start detecting masters.
detection = detector->detect()
@@ -4072,12 +4073,20 @@ Future<Nothing> Slave::garbageCollect(const string& path)
}
-void Slave::updateOversubscribableResources(const Resources& resources)
+void Slave::updateOversubscribableResources(const Future<Resources>& future)
{
- LOG(INFO) << "Received a new estimation of the oversubscribable "
- << "resources " << resources;
+ if (!future.isReady()) {
+ LOG(ERROR) << "Failed to estimate oversubscribable resources: "
+ << (future.isFailed() ? future.failure() : "discarded");
+ } else {
+ LOG(INFO) << "Received a new estimation of the oversubscribable "
+ << "resources " << future.get();
+
+ oversubscribableResources = future.get();
+ }
- oversubscribableResources = resources;
+ resourceEstimator->oversubscribable()
+ .onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d82b10c..0207eaf 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -433,7 +433,7 @@ private:
const FrameworkID& frameworkId,
const Executor* executor);
- void updateOversubscribableResources(const Resources& resources);
+ void updateOversubscribableResources(const Future<Resources>& future);
void forwardOversubscribableResources();
const Flags flags;
http://git-wip-us.apache.org/repos/asf/mesos/blob/e530b4e4/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index a60df75..924b0ff 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -38,6 +38,7 @@
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
+#include <process/queue.hpp>
#include <stout/bytes.hpp>
#include <stout/foreach.hpp>
@@ -704,20 +705,23 @@ public:
class TestResourceEstimator : public mesos::slave::ResourceEstimator
{
public:
- virtual Try<Nothing> initialize(
- const lambda::function<void(const Resources&)>& _oversubscribe)
+ virtual Try<Nothing> initialize()
{
- oversubscribe = _oversubscribe;
return Nothing();
}
+ virtual process::Future<Resources> oversubscribable()
+ {
+ return queue.get();
+ }
+
void estimate(const Resources& resources)
{
- oversubscribe(resources);
+ queue.put(resources);
}
private:
- lambda::function<void(const Resources&)> oversubscribe;
+ process::Queue<Resources> queue;
};