You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/05/18 21:25:09 UTC
mesos git commit: Changed to use a push model for resource estimator.
Repository: mesos
Updated Branches:
refs/heads/master 2c320eee9 -> ac7fb6324
Changed to use a push model for resource estimator.
Review: https://reviews.apache.org/r/34299
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac7fb632
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac7fb632
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac7fb632
Branch: refs/heads/master
Commit: ac7fb6324db60a3cfd417ad87ca4bb9a68457688
Parents: 2c320ee
Author: Jie Yu <yu...@gmail.com>
Authored: Fri May 15 16:35:32 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon May 18 12:24:53 2015 -0700
----------------------------------------------------------------------
include/mesos/slave/resource_estimator.hpp | 27 +++++------
src/messages/messages.proto | 4 +-
src/slave/constants.cpp | 6 ---
src/slave/constants.hpp | 5 ---
src/slave/flags.cpp | 7 +++
src/slave/flags.hpp | 1 +
src/slave/resource_estimator.cpp | 37 ++++++++-------
src/slave/resource_estimator.hpp | 4 +-
src/slave/slave.cpp | 60 ++++++++++++-------------
src/slave/slave.hpp | 10 +++--
src/tests/mesos.hpp | 31 ++++++-------
src/tests/oversubscription_tests.cpp | 40 +++++++++++++----
12 files changed, 125 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/include/mesos/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/slave/resource_estimator.hpp b/include/mesos/slave/resource_estimator.hpp
index 3639615..d64c698 100644
--- a/include/mesos/slave/resource_estimator.hpp
+++ b/include/mesos/slave/resource_estimator.hpp
@@ -25,7 +25,7 @@
#include <process/future.hpp>
-#include <stout/none.hpp>
+#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/try.hpp>
@@ -46,22 +46,17 @@ public:
virtual ~ResourceEstimator() {}
- // Initializes this resource estimator. This method needs to be
- // called before any other member method is called.
+ // Initializes this resource estimator. It registers a callback with
+ // the resource estimator. The callback allows the resource
+ // estimator to tell the slave about the current estimation of the
+ // *maximum* amount of resources that can be oversubscribed on the
+ // slave. A new estimation will invalidate all the previously
+ // returned estimations. The slave will keep track of the most
+ // recent estimation and periodically send it to the master.
+ //
// TODO(jieyu): Pass ResourceMonitor* once it's exposed.
- virtual Try<Nothing> initialize() = 0;
-
- // Returns the current estimation about the *maximum* amount of
- // resources that can be oversubscribed on the slave. A new
- // estimation will invalidate all the previously returned
- // estimations. The slave will be calling this method continuously
- // to get the most up-to-date estimation and forward them to the
- // master. As a result, it is up to the resource estimator to
- // control the speed of sending estimations to the master. To avoid
- // overwhelming the master, it is recommended that the resource
- // estimator should return an estimation only if the current
- // estimation is significantly different from the previous one.
- virtual process::Future<Resources> oversubscribed() = 0;
+ virtual Try<Nothing> initialize(
+ const lambda::function<void(const Resources&)>& oversubscribe) = 0;
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 19e2444..c946754 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -334,8 +334,8 @@ message CheckpointResourcesMessage {
// This message is sent by the slave to the master to inform the
-// master about the currently available oversubscribed resources.
-message UpdateOversubscribedResourcesMessage {
+// master about the currently oversubscribable resources.
+message OversubscribeResourcesMessage {
required SlaveID slave_id = 1;
repeated Resource resources = 2;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index 07f699a..2a99b11 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -57,12 +57,6 @@ Duration MASTER_PING_TIMEOUT()
return master::SLAVE_PING_TIMEOUT * master::MAX_SLAVE_PING_TIMEOUTS;
}
-
-Duration UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN()
-{
- return Seconds(5);
-}
-
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index df02043..fd1c1ab 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -104,11 +104,6 @@ extern const std::string DEFAULT_AUTHENTICATEE;
// trigger a re-detection of the master to cause a re-registration.
Duration MASTER_PING_TIMEOUT();
-
-// To avoid overwhelming the master, we enforce a minimal delay
-// between two subsequent UpdateOversubscribedResourcesMessages.
-Duration UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN();
-
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index da30973..b5e2518 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -440,4 +440,11 @@ mesos::internal::slave::Flags::Flags()
add(&Flags::resource_estimator,
"resource_estimator",
"The name of the resource estimator to use for oversubscription.");
+
+ add(&Flags::oversubscribe_resources_interval,
+ "oversubscribe_resources_interval",
+ "The slave periodically updates the master with the current estimation\n"
+ "about the maximum amount of resources that can be oversubscribed. The\n"
+ "interval between updates is controlled by this flag.",
+ Seconds(15));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index ca7cc13..5c57478 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -98,6 +98,7 @@ public:
std::string authenticatee;
Option<std::string> hooks;
Option<std::string> resource_estimator;
+ Duration oversubscribe_resources_interval;
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/resource_estimator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.cpp b/src/slave/resource_estimator.cpp
index 13d706c..7b7b499 100644
--- a/src/slave/resource_estimator.cpp
+++ b/src/slave/resource_estimator.cpp
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/process.hpp>
@@ -52,10 +53,25 @@ class NoopResourceEstimatorProcess :
public Process<NoopResourceEstimatorProcess>
{
public:
- Future<Resources> oversubscribed()
+ NoopResourceEstimatorProcess(
+ const lambda::function<void(const Resources&)>& _oversubscribe)
+ : oversubscribe(_oversubscribe) {}
+
+protected:
+ virtual void initialize()
+ {
+ notify();
+ }
+
+ // Periodically notify the slave about oversubscribable resources.
+ void notify()
{
- return Resources();
+ oversubscribe(Resources());
+
+ delay(Seconds(1), self(), &Self::notify);
}
+
+ const lambda::function<void(const Resources&)> oversubscribe;
};
@@ -68,30 +84,19 @@ NoopResourceEstimator::~NoopResourceEstimator()
}
-Try<Nothing> NoopResourceEstimator::initialize()
+Try<Nothing> NoopResourceEstimator::initialize(
+ const lambda::function<void(const Resources&)>& oversubscribe)
{
if (process.get() != NULL) {
return Error("Noop resource estimator has already been initialized");
}
- process.reset(new NoopResourceEstimatorProcess());
+ process.reset(new NoopResourceEstimatorProcess(oversubscribe));
spawn(process.get());
return Nothing();
}
-
-Future<Resources> NoopResourceEstimator::oversubscribed()
-{
- if (process.get() == NULL) {
- return Failure("Noop resource estimator is not initialized");
- }
-
- return dispatch(
- process.get(),
- &NoopResourceEstimatorProcess::oversubscribed);
-}
-
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/resource_estimator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/resource_estimator.hpp b/src/slave/resource_estimator.hpp
index bdf62ba..5a6367c 100644
--- a/src/slave/resource_estimator.hpp
+++ b/src/slave/resource_estimator.hpp
@@ -39,8 +39,8 @@ class NoopResourceEstimator : public mesos::slave::ResourceEstimator
public:
virtual ~NoopResourceEstimator();
- virtual Try<Nothing> initialize();
- virtual process::Future<Resources> oversubscribed();
+ virtual Try<Nothing> initialize(
+ const lambda::function<void(const Resources&)>& oversubscribe);
protected:
process::Owned<NoopResourceEstimatorProcess> process;
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 132f83e..8e88482 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -324,7 +324,9 @@ void Slave::initialize()
}
// TODO(jieyu): Pass ResourceMonitor* to 'initialize'.
- Try<Nothing> initialize = resourceEstimator->initialize();
+ Try<Nothing> initialize = resourceEstimator->initialize(
+ defer(self(), &Self::updateOversubscribableResources, lambda::_1));
+
if (initialize.isError()) {
EXIT(1) << "Failed to initialize the resource estimator: "
<< initialize.error();
@@ -3978,8 +3980,8 @@ void Slave::__recover(const Future<Nothing>& future)
if (flags.recover == "reconnect") {
state = DISCONNECTED;
- // Start to detect available oversubscribed resources.
- updateOversubscribedResources();
+ // Start to send updates about oversubscribable resources.
+ forwardOversubscribableResources();
// Start detecting masters.
detection = detector->detect()
@@ -4070,45 +4072,43 @@ Future<Nothing> Slave::garbageCollect(const string& path)
}
-void Slave::updateOversubscribedResources()
+void Slave::updateOversubscribableResources(const Resources& resources)
{
- // TODO(jieyu): Consider switching to a push model in which the
- // slave registers a callback with the resource estimator, and the
- // resource estimator invokes the callback whenever a new estimation
- // is ready (similar to the allocator/master interface).
+ LOG(INFO) << "Received a new estimation of the oversubscribable "
+ << "resources " << resources;
+
+ oversubscribableResources = resources;
+}
+
+void Slave::forwardOversubscribableResources()
+{
if (state != RUNNING) {
- delay(Seconds(1), self(), &Self::updateOversubscribedResources);
+ delay(Seconds(1), self(), &Self::forwardOversubscribableResources);
return;
}
- resourceEstimator->oversubscribed()
- .onAny(defer(self(), &Slave::_updateOversubscribedResources, lambda::_1));
-}
-
+ // We only forward updates after the first estimation is received.
+ if (oversubscribableResources.isNone()) {
+ delay(Seconds(1), self(), &Self::forwardOversubscribableResources);
+ return;
+ }
-void Slave::_updateOversubscribedResources(const Future<Resources>& future)
-{
- if (!future.isReady()) {
- LOG(ERROR) << "Failed to estimate oversubscribed resources: "
- << (future.isFailed() ? future.failure() : "discarded");
- } else if (state == RUNNING) {
- CHECK_SOME(master);
+ CHECK_SOME(master);
+ CHECK_SOME(oversubscribableResources);
- LOG(INFO) << "Updating available oversubscribed resources to "
- << future.get();
+ LOG(INFO) << "Forwarding oversubscribable resources "
+ << oversubscribableResources.get();
- UpdateOversubscribedResourcesMessage message;
- message.mutable_slave_id()->CopyFrom(info.id());
- message.mutable_resources()->CopyFrom(future.get());
+ OversubscribeResourcesMessage message;
+ message.mutable_slave_id()->CopyFrom(info.id());
+ message.mutable_resources()->CopyFrom(oversubscribableResources.get());
- send(master.get(), message);
- }
+ send(master.get(), message);
- // TODO(jieyu): Consider making the interval configurable.
- delay(UPDATE_OVERSUBSCRIBED_RESOURCES_INTERVAL_MIN(),
+ delay(flags.oversubscribe_resources_interval,
self(),
- &Self::updateOversubscribedResources);
+ &Self::forwardOversubscribableResources);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index b62ed7b..d82b10c 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -433,10 +433,8 @@ private:
const FrameworkID& frameworkId,
const Executor* executor);
- // Polls oversubscribed resources estimations from resources
- // estimator and forwards estimations to the master.
- void updateOversubscribedResources();
- void _updateOversubscribedResources(const process::Future<Resources>& future);
+ void updateOversubscribableResources(const Resources& resources);
+ void forwardOversubscribableResources();
const Flags flags;
@@ -509,6 +507,10 @@ private:
Duration executorDirectoryMaxAllowedAge;
mesos::slave::ResourceEstimator* resourceEstimator;
+
+ // The most recent estimation about the maximum amount of resources
+ // that can be oversubscribed on the slave.
+ Option<Resources> oversubscribableResources;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index df8cd20..a60df75 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -30,6 +30,8 @@
#include <mesos/master/allocator.hpp>
+#include <mesos/slave/resource_estimator.hpp>
+
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
@@ -699,28 +701,23 @@ public:
};
-class MockResourceEstimator : public mesos::slave::ResourceEstimator
+class TestResourceEstimator : public mesos::slave::ResourceEstimator
{
public:
- MockResourceEstimator()
+ virtual Try<Nothing> initialize(
+ const lambda::function<void(const Resources&)>& _oversubscribe)
{
- // NOTE: We use 'EXPECT_CALL' and 'WillRepeatedly' here instead of
- // 'ON_CALL' and 'WillByDefault'. See 'TestContainerizer::SetUp()'
- // for more details.
- EXPECT_CALL(*this, initialize())
- .WillRepeatedly(Return(Nothing()));
-
- EXPECT_CALL(*this, oversubscribed())
- .WillRepeatedly(Return(Resources()));
+ oversubscribe = _oversubscribe;
+ return Nothing();
}
- MOCK_METHOD0(
- initialize,
- Try<Nothing>());
+ void estimate(const Resources& resources)
+ {
+ oversubscribe(resources);
+ }
- MOCK_METHOD0(
- oversubscribed,
- process::Future<Resources>());
+private:
+ lambda::function<void(const Resources&)> oversubscribe;
};
@@ -787,7 +784,7 @@ public:
private:
Files files;
MockGarbageCollector gc;
- MockResourceEstimator resourceEstimator;
+ TestResourceEstimator resourceEstimator;
slave::StatusUpdateManager* statusUpdateManager;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac7fb632/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 64c2ede..75c25b0 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -18,6 +18,9 @@
#include <gmock/gmock.h>
+#include <mesos/resources.hpp>
+
+#include <process/clock.hpp>
#include <process/gtest.hpp>
#include <stout/gtest.hpp>
@@ -45,24 +48,43 @@ class OversubscriptionSlaveTest : public MesosTest {};
// This test verifies that slave will forward the estimation of the
-// available oversubscribed resources to the master.
-TEST_F(OversubscriptionSlaveTest, UpdateOversubcribedResourcesMessage)
+// oversubscribable resources to the master.
+TEST_F(OversubscriptionSlaveTest, ForwardOversubcribableResourcesMessage)
{
Try<PID<Master>> master = StartMaster();
ASSERT_SOME(master);
- Future<UpdateOversubscribedResourcesMessage> message =
- FUTURE_PROTOBUF(UpdateOversubscribedResourcesMessage(), _, _);
+ Future<SlaveRegisteredMessage> slaveRegistered =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
- MockResourceEstimator resourceEstimator;
+ TestResourceEstimator resourceEstimator;
- EXPECT_CALL(resourceEstimator, oversubscribed())
- .WillRepeatedly(Return(Resources()));
+ slave::Flags flags = CreateSlaveFlags();
- Try<PID<Slave>> slave = StartSlave(&resourceEstimator);
+ Try<PID<Slave>> slave = StartSlave(&resourceEstimator, flags);
ASSERT_SOME(slave);
- AWAIT_READY(message);
+ AWAIT_READY(slaveRegistered);
+
+ Future<OversubscribeResourcesMessage> update =
+ FUTURE_PROTOBUF(OversubscribeResourcesMessage(), _, _);
+
+ Clock::pause();
+
+ Clock::settle();
+ Clock::advance(flags.oversubscribe_resources_interval);
+
+ ASSERT_FALSE(update.isReady());
+
+ // Inject an estimation of oversubscribable resources.
+ Resources resources = Resources::parse("cpus:1;mem:32").get();
+ resourceEstimator.estimate(resources);
+
+ Clock::settle();
+ Clock::advance(flags.oversubscribe_resources_interval);
+
+ AWAIT_READY(update);
+ EXPECT_EQ(Resources(update.get().resources()), resources);
Shutdown();
}