You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/12/01 23:17:51 UTC

[3/3] mesos git commit: Fixed a bug that removed the suppressed framework from sorter.

Fixed a bug that removed the suppressed framework from sorter.

Review: https://reviews.apache.org/r/63831


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8c2f972b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8c2f972b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8c2f972b

Branch: refs/heads/master
Commit: 8c2f972b5c0c42e1519d09275cc26e1765a0c5de
Parents: 3711233
Author: Jiang Yan Xu <xu...@apple.com>
Authored: Tue Nov 14 00:12:17 2017 -0800
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Fri Dec 1 15:12:57 2017 -0800

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.cpp |  48 ++++----
 src/tests/scheduler_tests.cpp               | 139 +++++++++++++++++++++++
 2 files changed, 164 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8c2f972b/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index ab2abf8..715650e 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -35,6 +35,7 @@
 
 #include <stout/check.hpp>
 #include <stout/hashset.hpp>
+#include <stout/set.hpp>
 #include <stout/stopwatch.hpp>
 #include <stout/stringify.hpp>
 
@@ -421,27 +422,30 @@ void HierarchicalAllocatorProcess::updateFramework(
   set<string> newRoles = protobuf::framework::getRoles(frameworkInfo);
   set<string> oldSuppressedRoles = framework.suppressedRoles;
 
-  // The roles which are candidates for deactivation are the roles that are
-  // removed, as well as the roles which have moved from non-suppressed
-  // to suppressed mode.
-  const set<string> rolesToDeactivate = [&]() {
+  // TODO(xujyan): Add a stout set difference method that wraps around
+  // `std::set_difference` for this.
+  const set<string> removedRoles = [&]() {
     set<string> result = oldRoles;
     foreach (const string& role, newRoles) {
       result.erase(role);
     }
+    return result;
+  }();
 
-    foreach (const string& role, oldRoles) {
-      if (!oldSuppressedRoles.count(role) && suppressedRoles.count(role)) {
-        result.insert(role);
-      }
+  const set<string> newSuppressedRoles = [&]() {
+    set<string> result = suppressedRoles;
+    foreach (const string& role, oldSuppressedRoles) {
+      result.erase(role);
     }
     return result;
   }();
 
-  foreach (const string& role, rolesToDeactivate) {
+  foreach (const string& role, removedRoles | newSuppressedRoles) {
     CHECK(frameworkSorters.contains(role));
     frameworkSorters.at(role)->deactivate(frameworkId.value());
+  }
 
+  foreach (const string& role, removedRoles) {
     // Stop tracking the framework under this role if there are
     // no longer any resources allocated to it.
     if (frameworkSorters.at(role)->allocation(frameworkId.value()).empty()) {
@@ -453,36 +457,34 @@ void HierarchicalAllocatorProcess::updateFramework(
     }
   }
 
-  // The roles which are candidates for activation are the roles that are
-  // added, as well as the roles which have moved from suppressed to
-  // non-suppressed mode.
-  //
-  // TODO(anindya_sinha): We should activate the roles only if the
-  // framework is active (instead of always).
-  const set<string> rolesToActivate = [&]() {
+  const set<string> addedRoles = [&]() {
     set<string> result = newRoles;
     foreach (const string& role, oldRoles) {
       result.erase(role);
     }
+    return result;
+  }();
 
-    foreach (const string& role, newRoles) {
-      if (!suppressedRoles.count(role) && oldSuppressedRoles.count(role)) {
-        result.insert(role);
-      } else if (suppressedRoles.count(role)) {
-        result.erase(role);
-      }
+  const set<string> newRevivedRoles = [&]() {
+    set<string> result = oldSuppressedRoles;
+    foreach (const string& role, suppressedRoles) {
+      result.erase(role);
     }
     return result;
   }();
 
-  foreach (const string& role, rolesToActivate) {
+  foreach (const string& role, addedRoles) {
     // NOTE: It's possible that we're already tracking this framework
     // under the role because a framework can unsubscribe from a role
     // while it still has resources allocated to the role.
     if (!isFrameworkTrackedUnderRole(frameworkId, role)) {
       trackFrameworkUnderRole(frameworkId, role);
     }
+  }
 
+  // TODO(anindya_sinha): We should activate the roles only if the
+  // framework is active (instead of always).
+  foreach (const string& role, addedRoles | newRevivedRoles) {
     CHECK(frameworkSorters.contains(role));
     frameworkSorters.at(role)->activate(frameworkId.value());
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/8c2f972b/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 566ffb3..29cab82 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -1551,6 +1551,145 @@ TEST_P(SchedulerTest, NoOffersWithAllRolesSuppressed)
 }
 
 
+// This test verifies that if a framework (initially with no roles
+// suppressed) decides to suppress offers for its roles on reregisteration,
+// no offers will be made.
+TEST_P(SchedulerTest, NoOffersOnReregistrationWithAllRolesSuppressed)
+{
+  master::Flags flags = CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  Clock::pause();
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  ContentType contentType = GetParam();
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      contentType,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<Nothing> heartbeat;
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillOnce(FutureSatisfy(&heartbeat));
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+    // Enable failover.
+    frameworkInfo.set_failover_timeout(Weeks(1).secs());
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    *(subscribe->mutable_framework_info()) = frameworkInfo;
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+  AWAIT_READY(heartbeat);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers->offers().empty());
+
+  // Now fail over and reregister with all roles suppressed.
+  EXPECT_CALL(*scheduler, disconnected(_));
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillOnce(FutureSatisfy(&heartbeat));
+
+  // The framework will subscribe with its role being suppressed so no
+  // offers should be received by the framework.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .Times(0);
+
+  // Now fail over the scheduler.
+  mesos.reconnect();
+
+  AWAIT_READY(connected);
+
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+    *(call.mutable_framework_id()) = frameworkId;
+
+    v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+    *(frameworkInfo.mutable_id()) = frameworkId;
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    *(subscribe->mutable_framework_info()) = frameworkInfo;
+    subscribe->add_suppressed_roles(frameworkInfo.role());
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+  AWAIT_READY(heartbeat);
+
+  // We use an additional heartbeat as a synchronization mechanism to make
+  // sure an offer would be received by the scheduler if one was ever extended.
+  // Note that Clock::settle() wouldn't be sufficient here.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillOnce(FutureSatisfy(&heartbeat))
+    .WillRepeatedly(Return()); // Ignore additional heartbeats.
+
+  Clock::advance(master::DEFAULT_HEARTBEAT_INTERVAL);
+  AWAIT_READY(heartbeat);
+
+  // On revival the scheduler should get an offer.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    *(call.mutable_framework_id()) = frameworkId;
+    call.set_type(Call::REVIVE);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers->offers().empty());
+}
+
+
 TEST_P(SchedulerTest, Message)
 {
   Try<Owned<cluster::Master>> master = StartMaster();