You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/12/01 23:17:51 UTC
[3/3] mesos git commit: Fixed a bug that removed the suppressed
framework from sorter.
Fixed a bug that removed the suppressed framework from sorter.
Review: https://reviews.apache.org/r/63831
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8c2f972b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8c2f972b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8c2f972b
Branch: refs/heads/master
Commit: 8c2f972b5c0c42e1519d09275cc26e1765a0c5de
Parents: 3711233
Author: Jiang Yan Xu <xu...@apple.com>
Authored: Tue Nov 14 00:12:17 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Fri Dec 1 15:12:57 2017 -0800
----------------------------------------------------------------------
src/master/allocator/mesos/hierarchical.cpp | 48 ++++----
src/tests/scheduler_tests.cpp | 139 +++++++++++++++++++++++
2 files changed, 164 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8c2f972b/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index ab2abf8..715650e 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -35,6 +35,7 @@
#include <stout/check.hpp>
#include <stout/hashset.hpp>
+#include <stout/set.hpp>
#include <stout/stopwatch.hpp>
#include <stout/stringify.hpp>
@@ -421,27 +422,30 @@ void HierarchicalAllocatorProcess::updateFramework(
set<string> newRoles = protobuf::framework::getRoles(frameworkInfo);
set<string> oldSuppressedRoles = framework.suppressedRoles;
- // The roles which are candidates for deactivation are the roles that are
- // removed, as well as the roles which have moved from non-suppressed
- // to suppressed mode.
- const set<string> rolesToDeactivate = [&]() {
+ // TODO(xujyan): Add a stout set difference method that wraps around
+ // `std::set_difference` for this.
+ const set<string> removedRoles = [&]() {
set<string> result = oldRoles;
foreach (const string& role, newRoles) {
result.erase(role);
}
+ return result;
+ }();
- foreach (const string& role, oldRoles) {
- if (!oldSuppressedRoles.count(role) && suppressedRoles.count(role)) {
- result.insert(role);
- }
+ const set<string> newSuppressedRoles = [&]() {
+ set<string> result = suppressedRoles;
+ foreach (const string& role, oldSuppressedRoles) {
+ result.erase(role);
}
return result;
}();
- foreach (const string& role, rolesToDeactivate) {
+ foreach (const string& role, removedRoles | newSuppressedRoles) {
CHECK(frameworkSorters.contains(role));
frameworkSorters.at(role)->deactivate(frameworkId.value());
+ }
+ foreach (const string& role, removedRoles) {
// Stop tracking the framework under this role if there are
// no longer any resources allocated to it.
if (frameworkSorters.at(role)->allocation(frameworkId.value()).empty()) {
@@ -453,36 +457,34 @@ void HierarchicalAllocatorProcess::updateFramework(
}
}
- // The roles which are candidates for activation are the roles that are
- // added, as well as the roles which have moved from suppressed to
- // non-suppressed mode.
- //
- // TODO(anindya_sinha): We should activate the roles only if the
- // framework is active (instead of always).
- const set<string> rolesToActivate = [&]() {
+ const set<string> addedRoles = [&]() {
set<string> result = newRoles;
foreach (const string& role, oldRoles) {
result.erase(role);
}
+ return result;
+ }();
- foreach (const string& role, newRoles) {
- if (!suppressedRoles.count(role) && oldSuppressedRoles.count(role)) {
- result.insert(role);
- } else if (suppressedRoles.count(role)) {
- result.erase(role);
- }
+ const set<string> newRevivedRoles = [&]() {
+ set<string> result = oldSuppressedRoles;
+ foreach (const string& role, suppressedRoles) {
+ result.erase(role);
}
return result;
}();
- foreach (const string& role, rolesToActivate) {
+ foreach (const string& role, addedRoles) {
// NOTE: It's possible that we're already tracking this framework
// under the role because a framework can unsubscribe from a role
// while it still has resources allocated to the role.
if (!isFrameworkTrackedUnderRole(frameworkId, role)) {
trackFrameworkUnderRole(frameworkId, role);
}
+ }
+ // TODO(anindya_sinha): We should activate the roles only if the
+ // framework is active (instead of always).
+ foreach (const string& role, addedRoles | newRevivedRoles) {
CHECK(frameworkSorters.contains(role));
frameworkSorters.at(role)->activate(frameworkId.value());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/8c2f972b/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 566ffb3..29cab82 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -1551,6 +1551,145 @@ TEST_P(SchedulerTest, NoOffersWithAllRolesSuppressed)
}
+// This test verifies that if a framework (initially with no roles
+// suppressed) decides to suppress offers for its roles on reregisteration,
+// no offers will be made.
+TEST_P(SchedulerTest, NoOffersOnReregistrationWithAllRolesSuppressed)
+{
+ master::Flags flags = CreateMasterFlags();
+
+ Try<Owned<cluster::Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ Clock::pause();
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected));
+
+ ContentType contentType = GetParam();
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ contentType,
+ scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<Nothing> heartbeat;
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillOnce(FutureSatisfy(&heartbeat));
+
+ Future<Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+ // Enable failover.
+ frameworkInfo.set_failover_timeout(Weeks(1).secs());
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ *(subscribe->mutable_framework_info()) = frameworkInfo;
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+ AWAIT_READY(heartbeat);
+
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers->offers().empty());
+
+ // Now fail over and reregister with all roles suppressed.
+ EXPECT_CALL(*scheduler, disconnected(_));
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillOnce(FutureSatisfy(&heartbeat));
+
+ // The framework will subscribe with its role being suppressed so no
+ // offers should be received by the framework.
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .Times(0);
+
+ // Now fail over the scheduler.
+ mesos.reconnect();
+
+ AWAIT_READY(connected);
+
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+ *(call.mutable_framework_id()) = frameworkId;
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+ *(frameworkInfo.mutable_id()) = frameworkId;
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ *(subscribe->mutable_framework_info()) = frameworkInfo;
+ subscribe->add_suppressed_roles(frameworkInfo.role());
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+ AWAIT_READY(heartbeat);
+
+ // We use an additional heartbeat as a synchronization mechanism to make
+ // sure an offer would be received by the scheduler if one was ever extended.
+ // Note that Clock::settle() wouldn't be sufficient here.
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillOnce(FutureSatisfy(&heartbeat))
+ .WillRepeatedly(Return()); // Ignore additional heartbeats.
+
+ Clock::advance(master::DEFAULT_HEARTBEAT_INTERVAL);
+ AWAIT_READY(heartbeat);
+
+ // On revival the scheduler should get an offer.
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ {
+ Call call;
+ *(call.mutable_framework_id()) = frameworkId;
+ call.set_type(Call::REVIVE);
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers->offers().empty());
+}
+
+
TEST_P(SchedulerTest, Message)
{
Try<Owned<cluster::Master>> master = StartMaster();