You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/05/18 21:25:09 UTC

mesos git commit: Changed to use a push model for resource estimator.

Repository: mesos
Updated Branches:
  refs/heads/master 2c320eee9 -> ac7fb6324


Changed to use a push model for resource estimator.

Review: https://reviews.apache.org/r/34299


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac7fb632
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac7fb632
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac7fb632

Branch: refs/heads/master
Commit: ac7fb6324db60a3cfd417ad87ca4bb9a68457688
Parents: 2c320ee
Author: Jie Yu <yu...@gmail.com>
Authored: Fri May 15 16:35:32 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon May 18 12:24:53 2015 -0700

----------------------------------------------------------------------
 include/mesos/slave/resource_estimator.hpp | 27 +++++------
 src/messages/messages.proto                |  4 +-
 src/slave/constants.cpp                    |  6 ---
 src/slave/constants.hpp                    |  5 ---
 src/slave/flags.cpp                        |  7 +++
 src/slave/flags.hpp                        |  1 +
 src/slave/resource_estimator.cpp           | 37 ++++++++-------
 src/slave/resource_estimator.hpp           |  4 +-
 src/slave/slave.cpp                        | 60 ++++++++++++-------------
 src/slave/slave.hpp                        | 10 +++--
 src/tests/mesos.hpp                        | 31 ++++++-------
 src/tests/oversubscription_tests.cpp       | 40 +++++++++++++----
 12 files changed, 125 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/include/mesos/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp
index 3639615..d64c698 100644
--- a/include/mesos/slave/resource_estimator.hpp
+++ b/include/mesos/slave/resource_estimator.hpp
@@ -25,7 +25,7 @@
 
 #include <process/future.hpp>
 
-#include <stout/none.hpp>
+#include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/try.hpp>
@@ -46,22 +46,17 @@ public:
 
   virtual ~ResourceEstimator() {}
 
-  // Initializes this resource estimator. This method needs to be
-  // called before any other member method is called.
+  // Initializes this resource estimator. It registers a callback with
+  // the resource estimator. The callback allows the resource
+  // estimator to tell the slave about the current estimation of the
+  // *maximum* amount of resources that can be oversubscribed on the
+  // slave. A new estimation will invalidate all the previously
+  // returned estimations. The slave will keep track of the most
+  // recent estimation and periodically send it to the master.
+  //
   // TODO(jieyu): Pass ResourceMonitor* once it's exposed.
-  virtual Try<Nothing> initialize() = 0;
-
-  // Returns the current estimation about the *maximum* amount of
-  // resources that can be oversubscribed on the slave. A new
-  // estimation will invalidate all the previously returned
-  // estimations. The slave will be calling this method continuously
-  // to get the most up-to-date estimation and forward them to the
-  // master. As a result, it is up to the resource estimator to
-  // control the speed of sending estimations to the master. To avoid
-  // overwhelming the master, it is recommended that the resource
-  // estimator should return an estimation only if the current
-  // estimation is significantly different from the previous one.
-  virtual process::Future<Resources> oversubscribed() = 0;
+  virtual Try<Nothing> initialize(
+      const lambda::function<void(const Resources&)>& oversubscribe) = 0;
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 19e2444..c946754 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -334,8 +334,8 @@ message CheckpointResourcesMessage {
 
 
 // This message is sent by the slave to the master to inform the
-// master about the currently available oversubscribed resources.
-message UpdateOversubscribedResourcesMessage {
+// master about the currently oversubscribable resources.
+message OversubscribeResourcesMessage {
   required SlaveID slave_id = 1;
   repeated Resource resources = 2;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index 07f699a..2a99b11 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -57,12 +57,6 @@ Duration MASTER_PING_TIMEOUT()
   return master::SLAVE_PING_TIMEOUT * master::MAX_SLAVE_PING_TIMEOUTS;
 }
 
-
-Duration UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN()
-{
-  return Seconds(5);
-}
-
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index df02043..fd1c1ab 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -104,11 +104,6 @@ extern const std::string DEFAULT_AUTHENTICATEE;
 // trigger a re-detection of the master to cause a re-registration.
 Duration MASTER_PING_TIMEOUT();
 
-
-// To avoid overwhelming the master, we enforce a minimal delay
-// between two subsequent UpdateOversubscribedResourcesMessages.
-Duration UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN();
-
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index da30973..b5e2518 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -440,4 +440,11 @@ mesos::internal::slave::Flags::Flags()
   add(&Flags::resource_estimator,
       "resource_estimator",
       "The name of the resource estimator to use for oversubscription.");
+
+  add(&Flags::oversubscribe_resources_interval,
+      "oversubscribe_resources_interval",
+      "The slave periodically updates the master with the current estimation\n"
+      "about the maximum amount of resources that can be oversubscribed. The\n"
+      "interval between updates is controlled by this flag.",
+      Seconds(15));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index ca7cc13..5c57478 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -98,6 +98,7 @@ public:
   std::string authenticatee;
   Option<std::string> hooks;
   Option<std::string> resource_estimator;
+  Duration oversubscribe_resources_interval;
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/resource_estimator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.cpp b/src/slave/resource_estimator.cpp
index 13d706c..7b7b499 100644
--- a/src/slave/resource_estimator.cpp
+++ b/src/slave/resource_estimator.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+#include <process/delay.hpp>
 #include <process/dispatch.hpp>
 #include <process/process.hpp>
 
@@ -52,10 +53,25 @@ class NoopResourceEstimatorProcess :
   public Process<NoopResourceEstimatorProcess>
 {
 public:
-  Future<Resources> oversubscribed()
+  NoopResourceEstimatorProcess(
+      const lambda::function<void(const Resources&)>& _oversubscribe)
+    : oversubscribe(_oversubscribe) {}
+
+protected:
+  virtual void initialize()
+  {
+    notify();
+  }
+
+  // Periodically notify the slave about oversubscribable resources.
+  void notify()
   {
-    return Resources();
+    oversubscribe(Resources());
+
+    delay(Seconds(1), self(), &Self::notify);
   }
+
+  const lambda::function<void(const Resources&)> oversubscribe;
 };
 
 
@@ -68,30 +84,19 @@ NoopResourceEstimator::~NoopResourceEstimator()
 }
 
 
-Try<Nothing> NoopResourceEstimator::initialize()
+Try<Nothing> NoopResourceEstimator::initialize(
+    const lambda::function<void(const Resources&)>& oversubscribe)
 {
   if (process.get() != NULL) {
     return Error("Noop resource estimator has already been initialized");
   }
 
-  process.reset(new NoopResourceEstimatorProcess());
+  process.reset(new NoopResourceEstimatorProcess(oversubscribe));
   spawn(process.get());
 
   return Nothing();
 }
 
-
-Future<Resources> NoopResourceEstimator::oversubscribed()
-{
-  if (process.get() == NULL) {
-    return Failure("Noop resource estimator is not initialized");
-  }
-
-  return dispatch(
-      process.get(),
-      &NoopResourceEstimatorProcess::oversubscribed);
-}
-
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.hpp b/src/slave/resource_estimator.hpp
index bdf62ba..5a6367c 100644
--- a/src/slave/resource_estimator.hpp
+++ b/src/slave/resource_estimator.hpp
@@ -39,8 +39,8 @@ class NoopResourceEstimator : public mesos::slave::ResourceEstimator
 public:
   virtual ~NoopResourceEstimator();
 
-  virtual Try<Nothing> initialize();
-  virtual process::Future<Resources> oversubscribed();
+  virtual Try<Nothing> initialize(
+      const lambda::function<void(const Resources&)>& oversubscribe);
 
 protected:
   process::Owned<NoopResourceEstimatorProcess> process;

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 132f83e..8e88482 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -324,7 +324,9 @@ void Slave::initialize()
   }
 
   // TODO(jieyu): Pass ResourceMonitor* to 'initialize'.
-  Try<Nothing> initialize = resourceEstimator->initialize();
+  Try<Nothing> initialize = resourceEstimator->initialize(
+      defer(self(), &Self::updateOversubscribableResources, lambda::_1));
+
   if (initialize.isError()) {
     EXIT(1) << "Failed to initialize the resource estimator: "
             << initialize.error();
@@ -3978,8 +3980,8 @@ void Slave::__recover(const Future<Nothing>& future)
   if (flags.recover == "reconnect") {
     state = DISCONNECTED;
 
-    // Start to detect available oversubscribed resources.
-    updateOversubscribedResources();
+    // Start to send updates about oversubscribable resources.
+    forwardOversubscribableResources();
 
     // Start detecting masters.
     detection = detector->detect()
@@ -4070,45 +4072,43 @@ Future<Nothing> Slave::garbageCollect(const string& path)
 }
 
 
-void Slave::updateOversubscribedResources()
+void Slave::updateOversubscribableResources(const Resources& resources)
 {
-  // TODO(jieyu): Consider switching to a push model in which the
-  // slave registers a callback with the resource estimator, and the
-  // resource estimator invokes the callback whenever a new estimation
-  // is ready (similar to the allocator/master interface).
+  LOG(INFO) << "Received a new estimation of the oversubscribable "
+            << "resources " << resources;
+
+  oversubscribableResources = resources;
+}
 
+
+void Slave::forwardOversubscribableResources()
+{
   if (state != RUNNING) {
-    delay(Seconds(1), self(), &Self::updateOversubscribedResources);
+    delay(Seconds(1), self(), &Self::forwardOversubscribableResources);
     return;
   }
 
-  resourceEstimator->oversubscribed()
-    .onAny(defer(self(), &Slave::_updateOversubscribedResources, lambda::_1));
-}
-
+  // We only forward updates after the first estimation is received.
+  if (oversubscribableResources.isNone()) {
+    delay(Seconds(1), self(), &Self::forwardOversubscribableResources);
+    return;
+  }
 
-void Slave::_updateOversubscribedResources(const Future<Resources>& future)
-{
-  if (!future.isReady()) {
-    LOG(ERROR) << "Failed to estimate oversubscribed resources: "
-               << (future.isFailed() ? future.failure() : "discarded");
-  } else if (state == RUNNING) {
-    CHECK_SOME(master);
+  CHECK_SOME(master);
+  CHECK_SOME(oversubscribableResources);
 
-    LOG(INFO) << "Updating available oversubscribed resources to "
-              << future.get();
+  LOG(INFO) << "Forwarding oversubscribable resources "
+            << oversubscribableResources.get();
 
-    UpdateOversubscribedResourcesMessage message;
-    message.mutable_slave_id()->CopyFrom(info.id());
-    message.mutable_resources()->CopyFrom(future.get());
+  OversubscribeResourcesMessage message;
+  message.mutable_slave_id()->CopyFrom(info.id());
+  message.mutable_resources()->CopyFrom(oversubscribableResources.get());
 
-    send(master.get(), message);
-  }
+  send(master.get(), message);
 
-  // TODO(jieyu): Consider making the interval configurable.
-  delay(UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN(),
+  delay(flags.oversubscribe_resources_interval,
         self(),
-        &Self::updateOversubscribedResources);
+        &Self::forwardOversubscribableResources);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index b62ed7b..d82b10c 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -433,10 +433,8 @@ private:
       const FrameworkID& frameworkId,
       const Executor* executor);
 
-  // Polls oversubscribed resources estimations from resources
-  // estimator and forwards estimations to the master.
-  void updateOversubscribedResources();
-  void _updateOversubscribedResources(const process::Future<Resources>& future);
+  void updateOversubscribableResources(const Resources& resources);
+  void forwardOversubscribableResources();
 
   const Flags flags;
 
@@ -509,6 +507,10 @@ private:
   Duration executorDirectoryMaxAllowedAge;
 
   mesos::slave::ResourceEstimator* resourceEstimator;
+
+  // The most recent estimation about the maximum amount of resources
+  // that can be oversubscribed on the slave.
+  Option<Resources> oversubscribableResources;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index df8cd20..a60df75 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -30,6 +30,8 @@
 
 #include <mesos/master/allocator.hpp>
 
+#include <mesos/slave/resource_estimator.hpp>
+
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
@@ -699,28 +701,23 @@ public:
 };
 
 
-class MockResourceEstimator : public mesos::slave::ResourceEstimator
+class TestResourceEstimator : public mesos::slave::ResourceEstimator
 {
 public:
-  MockResourceEstimator()
+  virtual Try<Nothing> initialize(
+      const lambda::function<void(const Resources&)>& _oversubscribe)
   {
-    // NOTE: We use 'EXPECT_CALL' and 'WillRepeatedly' here instead of
-    // 'ON_CALL' and 'WillByDefault'. See 'TestContainerizer::SetUp()'
-    // for more details.
-    EXPECT_CALL(*this, initialize())
-      .WillRepeatedly(Return(Nothing()));
-
-    EXPECT_CALL(*this, oversubscribed())
-      .WillRepeatedly(Return(Resources()));
+    oversubscribe = _oversubscribe;
+    return Nothing();
   }
 
-  MOCK_METHOD0(
-      initialize,
-      Try<Nothing>());
+  void estimate(const Resources& resources)
+  {
+    oversubscribe(resources);
+  }
 
-  MOCK_METHOD0(
-      oversubscribed,
-      process::Future<Resources>());
+private:
+  lambda::function<void(const Resources&)> oversubscribe;
 };
 
 
@@ -787,7 +784,7 @@ public:
 private:
   Files files;
   MockGarbageCollector gc;
-  MockResourceEstimator resourceEstimator;
+  TestResourceEstimator resourceEstimator;
   slave::StatusUpdateManager* statusUpdateManager;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 64c2ede..75c25b0 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -18,6 +18,9 @@
 
 #include <gmock/gmock.h>
 
+#include <mesos/resources.hpp>
+
+#include <process/clock.hpp>
 #include <process/gtest.hpp>
 
 #include <stout/gtest.hpp>
@@ -45,24 +48,43 @@ class OversubscriptionSlaveTest : public MesosTest {};
 
 
 // This test verifies that slave will forward the estimation of the
-// available oversubscribed resources to the master.
-TEST_F(OversubscriptionSlaveTest, UpdateOversubcribedResourcesMessage)
+// oversubscribable resources to the master.
+TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage)
 {
   Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Future<UpdateOversubscribedResourcesMessage> message =
-    FUTURE_PROTOBUF(UpdateOversubscribedResourcesMessage(), _, _);
+  Future<SlaveRegisteredMessage> slaveRegistered =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
 
-  MockResourceEstimator resourceEstimator;
+  TestResourceEstimator resourceEstimator;
 
-  EXPECT_CALL(resourceEstimator, oversubscribed())
-    .WillRepeatedly(Return(Resources()));
+  slave::Flags flags = CreateSlaveFlags();
 
-  Try<PID<Slave>> slave = StartSlave(&resourceEstimator);
+  Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
   ASSERT_SOME(slave);
 
-  AWAIT_READY(message);
+  AWAIT_READY(slaveRegistered);
+
+  Future<OversubscribeResourcesMessage> update =
+    FUTURE_PROTOBUF(OversubscribeResourcesMessage(), _, _);
+
+  Clock::pause();
+
+  Clock::settle();
+  Clock::advance(flags.oversubscribe_resources_interval);
+
+  ASSERT_FALSE(update.isReady());
+
+  // Inject an estimation of oversubscribable resources.
+  Resources resources = Resources::parse("cpus:1;mem:32").get();
+  resourceEstimator.estimate(resources);
+
+  Clock::settle();
+  Clock::advance(flags.oversubscribe_resources_interval);
+
+  AWAIT_READY(update);
+  EXPECT_EQ(Resources(update.get().resources()), resources);
 
   Shutdown();
 }