You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/06/15 23:49:28 UTC
[2/4] mesos git commit: Send oversubscribable resources during
(re-)registration.
Send oversubscribable resources during (re-)registration.
Review: https://reviews.apache.org/r/35411
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/979a2c5e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/979a2c5e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/979a2c5e
Branch: refs/heads/master
Commit: 979a2c5e03a4da69d158391a734b71e9264ebad7
Parents: 6904504
Author: Benjamin Mahler <be...@gmail.com>
Authored: Fri Jun 12 16:44:50 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Jun 15 14:27:03 2015 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 107 +++++++++++++++++-------------
src/slave/slave.hpp | 2 +-
src/tests/oversubscription_tests.cpp | 60 +++++++++++++++++
3 files changed, 121 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/979a2c5e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a4b1e3d..3614330 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -899,6 +899,19 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
LOG(FATAL) << "Unexpected slave state " << state;
break;
}
+
+ // Send the latest estimate for oversubscribed resources.
+ if (oversubscribedResources.isSome()) {
+ LOG(INFO) << "Forwarding total oversubscribed resources "
+ << oversubscribedResources.get();
+
+ UpdateSlaveMessage message;
+ message.mutable_slave_id()->CopyFrom(info.id());
+ message.mutable_oversubscribed_resources()->CopyFrom(
+ oversubscribedResources.get());
+
+ send(master.get(), message);
+ }
}
@@ -945,6 +958,19 @@ void Slave::reregistered(
return;
}
+ // Send the latest estimate for oversubscribed resources.
+ if (oversubscribedResources.isSome()) {
+ LOG(INFO) << "Forwarding total oversubscribed resources "
+ << oversubscribedResources.get();
+
+ UpdateSlaveMessage message;
+ message.mutable_slave_id()->CopyFrom(info.id());
+ message.mutable_oversubscribed_resources()->CopyFrom(
+ oversubscribedResources.get());
+
+ send(master.get(), message);
+ }
+
// Reconcile any tasks per the master's request.
foreach (const ReconcileTasksMessage& reconcile, reconciliations) {
Framework* framework = getFramework(reconcile.framework_id());
@@ -4056,55 +4082,40 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
LOG(ERROR) << "Failed to get oversubscribable resources: "
<< (oversubscribable.isFailed()
? oversubscribable.failure() : "future discarded");
-
- delay(flags.oversubscribed_resources_interval,
- self(),
- &Self::forwardOversubscribed);
-
- return;
- }
-
- LOG(INFO) << "Received oversubscribable resources " << oversubscribable.get()
- << " from the resource estimator";
-
- if (state != RUNNING) {
- LOG(INFO) << "No master detected. Re-querying resource estimator after "
- << flags.oversubscribed_resources_interval;
-
- delay(flags.oversubscribed_resources_interval,
- self(),
- &Self::forwardOversubscribed);
-
- return;
- }
-
- // Calculate the latest allocation of oversubscribed resources.
- // Note that this allocation value might be different from the
- // master's view because new task/executor might be in flight from
- // the master or pending on the slave etc. This is ok because the
- // allocator only considers the slave's view of allocation when
- // calculating the available oversubscribed resources to offer.
- Resources oversubscribed;
- foreachvalue (Framework* framework, frameworks) {
- foreachvalue (Executor* executor, framework->executors) {
- oversubscribed += executor->resources.revocable();
+ } else {
+ LOG(INFO) << "Received oversubscribable resources "
+ << oversubscribable.get() << " from the resource estimator";
+
+ // Calculate the latest allocation of oversubscribed resources.
+ // Note that this allocation value might be different from the
+ // master's view because new task/executor might be in flight from
+ // the master or pending on the slave etc. This is ok because the
+ // allocator only considers the slave's view of allocation when
+ // calculating the available oversubscribed resources to offer.
+ Resources oversubscribed;
+ foreachvalue (Framework* framework, frameworks) {
+ foreachvalue (Executor* executor, framework->executors) {
+ oversubscribed += executor->resources.revocable();
+ }
}
- }
- // Add oversubscribable resources to the total.
- oversubscribed += oversubscribable.get();
+ // Add oversubscribable resources to the total.
+ oversubscribed += oversubscribable.get();
- // Only forward the estimate if it's different from the previous
- // estimate.
- if (oversubscribed != oversubscribedResources) {
- LOG(INFO) << "Forwarding total oversubscribed resources " << oversubscribed;
+ // Only forward the estimate if it's different from the previous
+ // estimate. We also send this whenever we get (re-)registered
+ // (i.e. whenever we transition into the RUNNING state).
+ if (state == RUNNING && oversubscribedResources != oversubscribed) {
+ LOG(INFO) << "Forwarding total oversubscribed resources "
+ << oversubscribed;
- UpdateSlaveMessage message;
- message.mutable_slave_id()->CopyFrom(info.id());
- message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
+ UpdateSlaveMessage message;
+ message.mutable_slave_id()->CopyFrom(info.id());
+ message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
- CHECK_SOME(master);
- send(master.get(), message);
+ CHECK_SOME(master);
+ send(master.get(), message);
+ }
// Update the estimate.
oversubscribedResources = oversubscribed;
@@ -4366,9 +4377,11 @@ double Slave::_resources_revocable_total(const string& name)
{
double total = 0.0;
- foreach (const Resource& resource, oversubscribedResources) {
- if (resource.name() == name && resource.type() == Value::SCALAR) {
- total += resource.scalar().value();
+ if (oversubscribedResources.isSome()) {
+ foreach (const Resource& resource, oversubscribedResources.get()) {
+ if (resource.name() == name && resource.type() == Value::SCALAR) {
+ total += resource.scalar().value();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/979a2c5e/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0df1b55..dbed46d 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -529,7 +529,7 @@ private:
// The most recent estimate of the total amount of oversubscribed
// (allocated and oversubscribable) resources.
- Resources oversubscribedResources;
+ Option<Resources> oversubscribedResources;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/979a2c5e/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index fe6a848..3481ad2 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -686,6 +686,66 @@ TEST_F(OversubscriptionTest, QoSFetchResourceUsageFromMonitor)
}
+// Ensures the slave forwards the estimation whenever receiving
+// a registered or re-registered message from the master, even
+// if the total oversubscribable resources does not change.
+TEST_F(OversubscriptionTest, Reregistration)
+{
+ loadFixedResourceEstimatorModule("cpus(*):2");
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.resource_estimator = FIXED_RESOURCE_ESTIMATOR_NAME;
+
+ Future<Nothing> slaveRecover = FUTURE_DISPATCH(_, &Slave::recover);
+
+ StandaloneMasterDetector detector;
+
+ Try<PID<Slave>> slave = StartSlave(&detector, flags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRecover);
+
+ // Advance the clock for the slave to compute an estimate.
+ Clock::pause();
+ Clock::advance(flags.oversubscribed_resources_interval);
+ Clock::settle();
+
+ // Start a master, we expect the slave to send the update
+ // message after registering!
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Future<SlaveRegisteredMessage> slaveRegistered =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Future<UpdateSlaveMessage> update =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ detector.appoint(master.get());
+
+ AWAIT_READY(slaveRegistered);
+ AWAIT_READY(update);
+
+ Resources resources = update.get().oversubscribed_resources();
+ EXPECT_SOME_EQ(2.0, resources.cpus());
+
+ // Trigger a re-registration and expect another update message.
+ Future<SlaveReregisteredMessage> slaveReregistered =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ update = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ detector.appoint(master.get());
+
+ AWAIT_READY(slaveReregistered);
+ AWAIT_READY(update);
+
+ // Need to shutdown explicitly because the slave holds
+ // a pointer to the detector on our test stack!
+ Shutdown();
+}
+
+
// Tests interactions between QoS Controller and slave. The
// TestQoSController's correction queue is filled and a mocked slave
// is checked for receiving the given correction.