You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2015/06/04 02:45:05 UTC

mesos git commit: Updated slave to query resource estimator whenever it wants to forward an update.

Repository: mesos
Updated Branches:
  refs/heads/master 4b214747c -> 100009853


Updated slave to query resource estimator whenever it wants to forward an update.

Review: https://reviews.apache.org/r/35038


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/10000985
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/10000985
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/10000985

Branch: refs/heads/master
Commit: 100009853959c5b348491c32a7fb0d1913e9e084
Parents: 4b21474
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed Jun 3 16:18:45 2015 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Jun 3 17:44:16 2015 -0700

----------------------------------------------------------------------
 include/mesos/slave/resource_estimator.hpp |  9 +--
 src/slave/slave.cpp                        | 78 ++++++++++++++-----------
 src/slave/slave.hpp                        | 14 ++---
 src/tests/oversubscription_tests.cpp       | 22 ++-----
 4 files changed, 56 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/10000985/include/mesos/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp
index 45e0b19..7f78fd8 100644
--- a/include/mesos/slave/resource_estimator.hpp
+++ b/include/mesos/slave/resource_estimator.hpp
@@ -58,12 +58,9 @@ public:
   // Returns the current estimation about the *maximum* amount of
   // resources that can be oversubscribed on the slave. A new
   // estimation will invalidate all the previously returned
-  // estimations. The slave will be calling this method continuously
-  // to keep track of the most up-to-date estimation and periodically
-  // forward it to the master. As a result, to avoid overwhelming the
-  // slave, it is recommended that the resource estimator should
-  // return an estimation only if the current estimation is
-  // significantly different from the previous one.
+  // estimations. The slave will be calling this method periodically
+  // to forward it to the master. As a result, the estimator should
+  // respond with an estimate every time this method is called.
   virtual process::Future<Resources> oversubscribable() = 0;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/10000985/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e264fb9..30e0d8b 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3993,15 +3993,12 @@ void Slave::__recover(const Future<Nothing>& future)
   if (flags.recover == "reconnect") {
     state = DISCONNECTED;
 
-    // Start to get estimations from the resource estimator and
-    // forward the estimations to the master.
-    resourceEstimator->oversubscribable()
-      .onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1))
-      .onAny(defer(self(), &Self::forwardOversubscribedResources));
-
     // Start detecting masters.
     detection = detector->detect()
       .onAny(defer(self(), &Slave::detected, lambda::_1));
+
+    // Forward oversubscribed resources.
+    forwardOversubscribed();
   } else {
     // Slave started in cleanup mode.
     CHECK_EQ("cleanup", flags.recover);
@@ -4088,27 +4085,40 @@ Future<Nothing> Slave::garbageCollect(const string& path)
 }
 
 
-void Slave::updateOversubscribableResources(const Future<Resources>& future)
+void Slave::forwardOversubscribed()
 {
-  if (!future.isReady()) {
-    LOG(ERROR) << "Failed to estimate oversubscribable resources: "
-               << (future.isFailed() ? future.failure() : "discarded");
-  } else {
-    LOG(INFO) << "Received a new estimation of the oversubscribable "
-              << "resources " << future.get();
-
-    oversubscribableResources = future.get();
-  }
+  LOG(INFO) << "Querying resource estimator for oversubscribable resources";
 
   resourceEstimator->oversubscribable()
-    .onAny(defer(self(), &Self::updateOversubscribableResources, lambda::_1));
+    .onAny(defer(self(), &Self::_forwardOversubscribed, lambda::_1));
 }
 
 
-void Slave::forwardOversubscribedResources()
+void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
 {
+  if (!oversubscribable.isReady()) {
+    LOG(ERROR) << "Failed to get oversubscribable resources: "
+               << (oversubscribable.isFailed()
+                   ? oversubscribable.failure() : "future discarded");
+
+    delay(flags.oversubscribed_resources_interval,
+          self(),
+          &Self::forwardOversubscribed);
+
+    return;
+  }
+
+  LOG(INFO) << "Received oversubscribable resources " << oversubscribable.get()
+            << " from the resource estimator";
+
   if (state != RUNNING) {
-    delay(Seconds(1), self(), &Self::forwardOversubscribedResources);
+    LOG(INFO) << "No master detected. Re-querying resource estimator after "
+              << flags.oversubscribed_resources_interval;
+
+    delay(flags.oversubscribed_resources_interval,
+          self(),
+          &Self::forwardOversubscribed);
+
     return;
   }
 
@@ -4126,29 +4136,27 @@ void Slave::forwardOversubscribedResources()
   }
 
   // Add oversubscribable resources to the total.
-  oversubscribed += oversubscribableResources;
+  oversubscribed += oversubscribable.get();
 
-  if (oversubscribed == oversubscribedResources) {
-    VLOG(1) << "Not forwarding total oversubscribed resources because the"
-            << " previous estimate " << oversubscribed << " hasn't changed";
-    return;
-  }
+  // Only forward the estimate if it's different from the previous
+  // estimate.
+  if (oversubscribed != oversubscribedResources) {
+    LOG(INFO) << "Forwarding total oversubscribed resources " << oversubscribed;
 
-  LOG(INFO) << "Forwarding total oversubscribed resources " << oversubscribed;
+    UpdateSlaveMessage message;
+    message.mutable_slave_id()->CopyFrom(info.id());
+    message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
 
-  UpdateSlaveMessage message;
-  message.mutable_slave_id()->CopyFrom(info.id());
-  message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
+    CHECK_SOME(master);
+    send(master.get(), message);
 
-  CHECK_SOME(master);
-  send(master.get(), message);
+    // Update the estimate.
+    oversubscribedResources = oversubscribed;
+  }
 
   delay(flags.oversubscribed_resources_interval,
         self(),
-        &Self::forwardOversubscribedResources);
-
-  // Update the estimate.
-  oversubscribedResources = oversubscribed;
+        &Self::forwardOversubscribed);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/10000985/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 245ea06..37e85af 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -433,8 +433,10 @@ private:
       const FrameworkID& frameworkId,
       const Executor* executor);
 
-  void updateOversubscribableResources(const Future<Resources>& future);
-  void forwardOversubscribedResources();
+  // Forwards the current total of oversubscribed resources.
+  void forwardOversubscribed();
+  void _forwardOversubscribed(
+      const process::Future<Resources>& oversubscribable);
 
   const Flags flags;
 
@@ -508,12 +510,8 @@ private:
 
   mesos::slave::ResourceEstimator* resourceEstimator;
 
-  // The most recent estimation about the maximum amount of resources
-  // that can be oversubscribed on the slave.
-  Resources oversubscribableResources;
-
-  // The total amount of oversubscribed (allocated and
-  // oversubscribable) resources.
+  // The most recent estimate of the total amount of oversubscribed
+  // (allocated and oversubscribable) resources.
   Resources oversubscribedResources;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/10000985/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index b1a10a9..43a13ee 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -90,13 +90,8 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
   EXPECT_CALL(resourceEstimator, initialize(_));
 
   Queue<Resources> estimations;
-  // We expect 2 calls:
-  // - First for some slack estimation.
-  // - Second ensures that slave will wait
-  //   asynchronously for next estimation.
   EXPECT_CALL(resourceEstimator, oversubscribable())
-    .Times(2)
-    .WillRepeatedly(Invoke(&estimations, &Queue<Resources>::get));
+    .WillOnce(Invoke(&estimations, &Queue<Resources>::get));
 
   slave::Flags flags = CreateSlaveFlags();
   Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
@@ -150,13 +145,8 @@ TEST_F(OversubscriptionTest, RevocableOffer)
   EXPECT_CALL(resourceEstimator, initialize(_));
 
   Queue<Resources> estimations;
-  // We expect 2 calls:
-  // - First for some slack estimation.
-  // - Second ensures that slave will wait
-  //   asynchronously for next estimation.
   EXPECT_CALL(resourceEstimator, oversubscribable())
-    .Times(2)
-    .WillRepeatedly(Invoke(&estimations, &Queue<Resources>::get));
+    .WillOnce(Invoke(&estimations, &Queue<Resources>::get));
 
   slave::Flags flags = CreateSlaveFlags();
 
@@ -220,13 +210,9 @@ TEST_F(OversubscriptionTest, RescindRevocableOffer)
   EXPECT_CALL(resourceEstimator, initialize(_));
 
   Queue<Resources> estimations;
-  // We expect 3 calls:
-  // - First for some slack estimation.
-  // - Second for extended slack resources.
-  // - Third ensures that slave will wait
-  //   asynchronously for next estimation.
+  // We expect 2 calls for 2 estimations.
   EXPECT_CALL(resourceEstimator, oversubscribable())
-    .Times(3)
+    .Times(2)
     .WillRepeatedly(Invoke(&estimations, &Queue<Resources>::get));
 
   slave::Flags flags = CreateSlaveFlags();