You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/06/04 02:45:05 UTC
mesos git commit: Updated slave to query resource estimator whenever
it wants to forward an update.
Repository: mesos
Updated Branches:
refs/heads/master 4b214747c -> 100009853
Updated slave to query resource estimator whenever it wants to forward an update.
Review: https://reviews.apache.org/r/35038
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/10000985
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/10000985
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/10000985
Branch: refs/heads/master
Commit: 100009853959c5b348491c32a7fb0d1913e9e084
Parents: 4b21474
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed Jun 3 16:18:45 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Jun 3 17:44:16 2015 -0700
----------------------------------------------------------------------
include/mesos/slave/resource_estimator.hpp | 9 +--
src/slave/slave.cpp | 78 ++++++++++++++-----------
src/slave/slave.hpp | 14 ++---
src/tests/oversubscription_tests.cpp | 22 ++-----
4 files changed, 56 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/10000985/include/mesos/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp
index 45e0b19..7f78fd8 100644
--- a/include/mesos/slave/resource_estimator.hpp
+++ b/include/mesos/slave/resource_estimator.hpp
@@ -58,12 +58,9 @@ public:
// Returns the current estimation about the *maximum* amount of
// resources that can be oversubscribed on the slave. A new
// estimation will invalidate all the previously returned
- // estimations. The slave will be calling this method continuously
- // to keep track of the most up-to-date estimation and periodically
- // forward it to the master. As a result, to avoid overwhelming the
- // slave, it is recommended that the resource estimator should
- // return an estimation only if the current estimation is
- // significantly different from the previous one.
+ // estimations. The slave will be calling this method periodically
+ // to forward it to the master. As a result, the estimator should
+ // respond with an estimate every time this method is called.
virtual process::Future<Resources> oversubscribable() = 0;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/10000985/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e264fb9..30e0d8b 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3993,15 +3993,12 @@ void Slave::__recover(const Future<Nothing>& future)
if (flags.recover == "reconnect") {
state = DISCONNECTED;
- // Start to get estimations from the resource estimator and
- // forward the estimations to the master.
- resourceEstimator->oversubscribable()
- .onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1))
- .onAny(defer(self(), &Self::forwardOversubscribedResources));
-
// Start detecting masters.
detection = detector->detect()
.onAny(defer(self(), &Slave::detected, lambda::_1));
+
+ // Forward oversubscribed resources.
+ forwardOversubscribed();
} else {
// Slave started in cleanup mode.
CHECK_EQ("cleanup", flags.recover);
@@ -4088,27 +4085,40 @@ Future<Nothing> Slave::garbageCollect(const string& path)
}
-void Slave::updateOversubscribableResources(const Future<Resources>& future)
+void Slave::forwardOversubscribed()
{
- if (!future.isReady()) {
- LOG(ERROR) << "Failed to estimate oversubscribable resources: "
- << (future.isFailed() ? future.failure() : "discarded");
- } else {
- LOG(INFO) << "Received a new estimation of the oversubscribable "
- << "resources " << future.get();
-
- oversubscribableResources = future.get();
- }
+ LOG(INFO) << "Querying resource estimator for oversubscribable resources";
resourceEstimator->oversubscribable()
- .onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1));
+ .onAny(defer(self(), &Self::_forwardOversubscribed, lambda::_1));
}
-void Slave::forwardOversubscribedResources()
+void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
{
+ if (!oversubscribable.isReady()) {
+ LOG(ERROR) << "Failed to get oversubscribable resources: "
+ << (oversubscribable.isFailed()
+ ? oversubscribable.failure() : "future discarded");
+
+ delay(flags.oversubscribed_resources_interval,
+ self(),
+ &Self::forwardOversubscribed);
+
+ return;
+ }
+
+ LOG(INFO) << "Received oversubscribable resources " << oversubscribable.get()
+ << " from the resource estimator";
+
if (state != RUNNING) {
- delay(Seconds(1), self(), &Self::forwardOversubscribedResources);
+ LOG(INFO) << "No master detected. Re-querying resource estimator after "
+ << flags.oversubscribed_resources_interval;
+
+ delay(flags.oversubscribed_resources_interval,
+ self(),
+ &Self::forwardOversubscribed);
+
return;
}
@@ -4126,29 +4136,27 @@ void Slave::forwardOversubscribedResources()
}
// Add oversubscribable resources to the total.
- oversubscribed += oversubscribableResources;
+ oversubscribed += oversubscribable.get();
- if (oversubscribed == oversubscribedResources) {
- VLOG(1) << "Not forwarding total oversubscribed resources because the"
- << " previous estimate " << oversubscribed << " hasn't changed";
- return;
- }
+ // Only forward the estimate if it's different from the previous
+ // estimate.
+ if (oversubscribed != oversubscribedResources) {
+ LOG(INFO) << "Forwarding total oversubscribed resources " << oversubscribed;
- LOG(INFO) << "Forwarding total oversubscribed resources " << oversubscribed;
+ UpdateSlaveMessage message;
+ message.mutable_slave_id()->CopyFrom(info.id());
+ message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
- UpdateSlaveMessage message;
- message.mutable_slave_id()->CopyFrom(info.id());
- message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
+ CHECK_SOME(master);
+ send(master.get(), message);
- CHECK_SOME(master);
- send(master.get(), message);
+ // Update the estimate.
+ oversubscribedResources = oversubscribed;
+ }
delay(flags.oversubscribed_resources_interval,
self(),
- &Self::forwardOversubscribedResources);
-
- // Update the estimate.
- oversubscribedResources = oversubscribed;
+ &Self::forwardOversubscribed);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/10000985/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 245ea06..37e85af 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -433,8 +433,10 @@ private:
const FrameworkID& frameworkId,
const Executor* executor);
- void updateOversubscribableResources(const Future<Resources>& future);
- void forwardOversubscribedResources();
+ // Forwards the current total of oversubscribed resources.
+ void forwardOversubscribed();
+ void _forwardOversubscribed(
+ const process::Future<Resources>& oversubscribable);
const Flags flags;
@@ -508,12 +510,8 @@ private:
mesos::slave::ResourceEstimator* resourceEstimator;
- // The most recent estimation about the maximum amount of resources
- // that can be oversubscribed on the slave.
- Resources oversubscribableResources;
-
- // The total amount of oversubscribed (allocated and
- // oversubscribable) resources.
+ // The most recent estimate of the total amount of oversubscribed
+ // (allocated and oversubscribable) resources.
Resources oversubscribedResources;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/10000985/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index b1a10a9..43a13ee 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -90,13 +90,8 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
EXPECT_CALL(resourceEstimator, initialize(_));
Queue<Resources> estimations;
- // We expect 2 calls:
- // - First for some slack estimation.
- // - Second ensures that slave will wait
- // asynchronously for next estimation.
EXPECT_CALL(resourceEstimator, oversubscribable())
- .Times(2)
- .WillRepeatedly(Invoke(&estimations, &Queue<Resources>::get));
+ .WillOnce(Invoke(&estimations, &Queue<Resources>::get));
slave::Flags flags = CreateSlaveFlags();
Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
@@ -150,13 +145,8 @@ TEST_F(OversubscriptionTest, RevocableOffer)
EXPECT_CALL(resourceEstimator, initialize(_));
Queue<Resources> estimations;
- // We expect 2 calls:
- // - First for some slack estimation.
- // - Second ensures that slave will wait
- // asynchronously for next estimation.
EXPECT_CALL(resourceEstimator, oversubscribable())
- .Times(2)
- .WillRepeatedly(Invoke(&estimations, &Queue<Resources>::get));
+ .WillOnce(Invoke(&estimations, &Queue<Resources>::get));
slave::Flags flags = CreateSlaveFlags();
@@ -220,13 +210,9 @@ TEST_F(OversubscriptionTest, RescindRevocableOffer)
EXPECT_CALL(resourceEstimator, initialize(_));
Queue<Resources> estimations;
- // We expect 3 calls:
- // - First for some slack estimation.
- // - Second for extended slack resources.
- // - Third ensures that slave will wait
- // asynchronously for next estimation.
+ // We expect 2 calls for 2 estimations.
EXPECT_CALL(resourceEstimator, oversubscribable())
- .Times(3)
+ .Times(2)
.WillRepeatedly(Invoke(&estimations, &Queue<Resources>::get));
slave::Flags flags = CreateSlaveFlags();