You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/06/15 23:49:28 UTC

[2/4] mesos git commit: Send oversubscribable resources during (re-)registration.

Send oversubscribable resources during (re-)registration.

Review: https://reviews.apache.org/r/35411


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/979a2c5e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/979a2c5e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/979a2c5e

Branch: refs/heads/master
Commit: 979a2c5e03a4da69d158391a734b71e9264ebad7
Parents: 6904504
Author: Benjamin Mahler <be...@gmail.com>
Authored: Fri Jun 12 16:44:50 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Mon Jun 15 14:27:03 2015 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                  | 107 +++++++++++++++++-------------
 src/slave/slave.hpp                  |   2 +-
 src/tests/oversubscription_tests.cpp |  60 +++++++++++++++++
 3 files changed, 121 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/979a2c5e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a4b1e3d..3614330 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -899,6 +899,19 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
       LOG(FATAL) << "Unexpected slave state " << state;
       break;
   }
+
+  // Send the latest estimate for oversubscribed resources.
+  if (oversubscribedResources.isSome()) {
+    LOG(INFO) << "Forwarding total oversubscribed resources "
+              << oversubscribedResources.get();
+
+    UpdateSlaveMessage message;
+    message.mutable_slave_id()->CopyFrom(info.id());
+    message.mutable_oversubscribed_resources()->CopyFrom(
+        oversubscribedResources.get());
+
+    send(master.get(), message);
+  }
 }
 
 
@@ -945,6 +958,19 @@ void Slave::reregistered(
       return;
   }
 
+  // Send the latest estimate for oversubscribed resources.
+  if (oversubscribedResources.isSome()) {
+    LOG(INFO) << "Forwarding total oversubscribed resources "
+              << oversubscribedResources.get();
+
+    UpdateSlaveMessage message;
+    message.mutable_slave_id()->CopyFrom(info.id());
+    message.mutable_oversubscribed_resources()->CopyFrom(
+        oversubscribedResources.get());
+
+    send(master.get(), message);
+  }
+
   // Reconcile any tasks per the master's request.
   foreach (const ReconcileTasksMessage& reconcile, reconciliations) {
     Framework* framework = getFramework(reconcile.framework_id());
@@ -4056,55 +4082,40 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
     LOG(ERROR) << "Failed to get oversubscribable resources: "
                << (oversubscribable.isFailed()
                    ? oversubscribable.failure() : "future discarded");
-
-    delay(flags.oversubscribed_resources_interval,
-          self(),
-          &Self::forwardOversubscribed);
-
-    return;
-  }
-
-  LOG(INFO) << "Received oversubscribable resources " << oversubscribable.get()
-            << " from the resource estimator";
-
-  if (state != RUNNING) {
-    LOG(INFO) << "No master detected. Re-querying resource estimator after "
-              << flags.oversubscribed_resources_interval;
-
-    delay(flags.oversubscribed_resources_interval,
-          self(),
-          &Self::forwardOversubscribed);
-
-    return;
-  }
-
-  // Calculate the latest allocation of oversubscribed resources.
-  // Note that this allocation value might be different from the
-  // master's view because new task/executor might be in flight from
-  // the master or pending on the slave etc. This is ok because the
-  // allocator only considers the slave's view of allocation when
-  // calculating the available oversubscribed resources to offer.
-  Resources oversubscribed;
-  foreachvalue (Framework* framework, frameworks) {
-    foreachvalue (Executor* executor, framework->executors) {
-      oversubscribed += executor->resources.revocable();
+  } else {
+    LOG(INFO) << "Received oversubscribable resources "
+              << oversubscribable.get() << " from the resource estimator";
+
+    // Calculate the latest allocation of oversubscribed resources.
+    // Note that this allocation value might be different from the
+    // master's view because new task/executor might be in flight from
+    // the master or pending on the slave etc. This is ok because the
+    // allocator only considers the slave's view of allocation when
+    // calculating the available oversubscribed resources to offer.
+    Resources oversubscribed;
+    foreachvalue (Framework* framework, frameworks) {
+      foreachvalue (Executor* executor, framework->executors) {
+        oversubscribed += executor->resources.revocable();
+      }
     }
-  }
 
-  // Add oversubscribable resources to the total.
-  oversubscribed += oversubscribable.get();
+    // Add oversubscribable resources to the total.
+    oversubscribed += oversubscribable.get();
 
-  // Only forward the estimate if it's different from the previous
-  // estimate.
-  if (oversubscribed != oversubscribedResources) {
-    LOG(INFO) << "Forwarding total oversubscribed resources " << oversubscribed;
+    // Only forward the estimate if it's different from the previous
+    // estimate. We also send this whenever we get (re-)registered
+    // (i.e. whenever we transition into the RUNNING state).
+    if (state == RUNNING && oversubscribedResources != oversubscribed) {
+      LOG(INFO) << "Forwarding total oversubscribed resources "
+                << oversubscribed;
 
-    UpdateSlaveMessage message;
-    message.mutable_slave_id()->CopyFrom(info.id());
-    message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
+      UpdateSlaveMessage message;
+      message.mutable_slave_id()->CopyFrom(info.id());
+      message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
 
-    CHECK_SOME(master);
-    send(master.get(), message);
+      CHECK_SOME(master);
+      send(master.get(), message);
+    }
 
     // Update the estimate.
     oversubscribedResources = oversubscribed;
@@ -4366,9 +4377,11 @@ double Slave::_resources_revocable_total(const string& name)
 {
   double total = 0.0;
 
-  foreach (const Resource& resource, oversubscribedResources) {
-    if (resource.name() == name && resource.type() == Value::SCALAR) {
-      total += resource.scalar().value();
+  if (oversubscribedResources.isSome()) {
+    foreach (const Resource& resource, oversubscribedResources.get()) {
+      if (resource.name() == name && resource.type() == Value::SCALAR) {
+        total += resource.scalar().value();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/979a2c5e/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0df1b55..dbed46d 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -529,7 +529,7 @@ private:
 
   // The most recent estimate of the total amount of oversubscribed
   // (allocated and oversubscribable) resources.
-  Resources oversubscribedResources;
+  Option<Resources> oversubscribedResources;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/979a2c5e/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index fe6a848..3481ad2 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -686,6 +686,66 @@ TEST_F(OversubscriptionTest, QoSFetchResourceUsageFromMonitor)
 }
 
 
+// Ensures the slave forwards the estimation whenever receiving
+// a registered or re-registered message from the master, even
+// if the total oversubscribable resources does not change.
+TEST_F(OversubscriptionTest, Reregistration)
+{
+  loadFixedResourceEstimatorModule("cpus(*):2");
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.resource_estimator = FIXED_RESOURCE_ESTIMATOR_NAME;
+
+  Future<Nothing> slaveRecover = FUTURE_DISPATCH(_, &Slave::recover);
+
+  StandaloneMasterDetector detector;
+
+  Try<PID<Slave>> slave = StartSlave(&detector, flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRecover);
+
+  // Advance the clock for the slave to compute an estimate.
+  Clock::pause();
+  Clock::advance(flags.oversubscribed_resources_interval);
+  Clock::settle();
+
+  // Start a master, we expect the slave to send the update
+  // message after registering!
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegistered =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Future<UpdateSlaveMessage> update =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  detector.appoint(master.get());
+
+  AWAIT_READY(slaveRegistered);
+  AWAIT_READY(update);
+
+  Resources resources = update.get().oversubscribed_resources();
+  EXPECT_SOME_EQ(2.0, resources.cpus());
+
+  // Trigger a re-registration and expect another update message.
+  Future<SlaveReregisteredMessage> slaveReregistered =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  update = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  detector.appoint(master.get());
+
+  AWAIT_READY(slaveReregistered);
+  AWAIT_READY(update);
+
+  // Need to shutdown explicitly because the slave holds
+  // a pointer to the detector on our test stack!
+  Shutdown();
+}
+
+
 // Tests interactions between QoS Controller and slave. The
 // TestQoSController's correction queue is filled and a mocked slave
 // is checked for receiving the given correction.