You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2019/07/02 03:27:26 UTC

[mesos] 01/02: Provided ability to pass suppressed roles via V0 updateFramework().

This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 89a16ae04a2e4ed10a4d49bff467cb066883febb
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Mon Jul 1 22:56:06 2019 -0400

    Provided ability to pass suppressed roles via V0 updateFramework().
    
    This patch adds a list of suppressed roles to the arguments of
    scheduler driver's updateFramework() method to make it possible
    for V0 frameworks to selectively suppress/revive offers, to reach
    parity with V1 UPDATE_FRAMEWORK.
    
    To keep re-registration consistent with updateFramework(), the set
    of suppressed roles is now stored by the driver and used when it
    re-registers.
    
    To prevent re-registration from implicitly altering the effects of
    reviveOffers() and suppressOffers(), reviveOffers() now clears the
    stored suppressed roles and suppressOffers() now fills the stored
    suppressed roles. If these calls are issued in a disconnected state
    of the driver, the driver will perform an UPDATE_FRAMEWORK call upon
    re-connection.
    
    Review: https://reviews.apache.org/r/70894/
---
 include/mesos/scheduler.hpp                        | 36 +++++++++++-----
 .../jni/org_apache_mesos_MesosSchedulerDriver.cpp  |  2 +-
 src/java/src/org/apache/mesos/SchedulerDriver.java | 21 ++++++---
 .../interface/src/mesos/interface/__init__.py      | 21 ++++++---
 src/sched/sched.cpp                                | 50 +++++++++++++++++++---
 src/tests/master/update_framework_tests.cpp        | 12 +++---
 6 files changed, 110 insertions(+), 32 deletions(-)

diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 8c67748..27281c1 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -284,13 +284,24 @@ public:
       const OfferID& offerId,
       const Filters& filters = Filters()) = 0;
 
-  // Removes all filters previously set by the framework (via
-  // launchTasks()). This enables the framework to receive offers from
-  // those filtered slaves.
+  // Removes all filters previously set by the framework (via launchTasks()
+  // or declineOffer()) and clears the set of suppressed roles.
+  //
+  // NOTE: If the framework is not connected to the master, the set
+  // of suppressed roles stored by the driver will be cleared, and an
+  // up-to-date set of suppressed roles will be sent to the master
+  // during re-registration.
   virtual Status reviveOffers() = 0;
 
-  // Inform Mesos master to stop sending offers to the framework. The
-  // scheduler should call reviveOffers() to resume getting offers.
+  // Informs Mesos master to stop sending offers to the framework (i.e.
+  // to suppress all roles of the framework). To resume getting offers,
+  // the scheduler can call reviveOffers() or set the suppressed roles
+  // explicitly via updateFramework().
+  //
+  // NOTE: If the framework is not connected to the master, all the roles
+  // will be added to the set of suppressed roles in the driver, and an
+  // up-to-date suppressed roles set will be sent to the master during
+  // re-registration.
   virtual Status suppressOffers() = 0;
 
   // Acknowledges the status update. This should only be called
@@ -318,9 +329,10 @@ public:
   virtual Status reconcileTasks(
       const std::vector<TaskStatus>& statuses) = 0;
 
-  // Inform Mesos master about changes to the `FrameworkInfo`. The
-  // driver will store the new `FrameworkInfo` and all subsequent
-  // re-registrations will use it.
+  // Inform Mesos master about changes to the `FrameworkInfo` and
+  // the set of suppressed roles. The driver will store the new
+  // `FrameworkInfo` and the new set of suppressed roles, and all
+  // subsequent re-registrations will use them.
   //
   // NOTE: If the supplied info is invalid or fails authorization,
   // the `error()` callback will be invoked asynchronously (after
@@ -333,7 +345,9 @@ public:
   // NOTE: The `FrameworkInfo.user` and `FrameworkInfo.hostname`
   // fields will be auto-populated using the same approach used
   // during driver initialization.
-  virtual Status updateFramework(const FrameworkInfo& frameworkInfo) = 0;
+  virtual Status updateFramework(
+      const FrameworkInfo& frameworkInfo,
+      const std::vector<std::string>& suppressedRoles) = 0;
 };
 
 
@@ -471,7 +485,9 @@ public:
   Status reconcileTasks(
       const std::vector<TaskStatus>& statuses) override;
 
-  Status updateFramework(const FrameworkInfo& frameworkInfo) override;
+  Status updateFramework(
+      const FrameworkInfo& frameworkInfo,
+      const std::vector<std::string>& suppressedRoles) override;
 
 protected:
   // Used to detect (i.e., choose) the master.
diff --git a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
index a21aca2..167f232 100644
--- a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
+++ b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
@@ -1016,7 +1016,7 @@ Java_org_apache_mesos_MesosSchedulerDriver_updateFramework(
   MesosSchedulerDriver* driver =
     (MesosSchedulerDriver*) env->GetLongField(thiz, __driver);
 
-  Status status = driver->updateFramework(frameworkInfo);
+  Status status = driver->updateFramework(frameworkInfo, {});
 
   return convert<Status>(env, status);
 }
diff --git a/src/java/src/org/apache/mesos/SchedulerDriver.java b/src/java/src/org/apache/mesos/SchedulerDriver.java
index ee5a9e2..f92521a 100644
--- a/src/java/src/org/apache/mesos/SchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/SchedulerDriver.java
@@ -254,9 +254,13 @@ public interface SchedulerDriver {
   Status declineOffer(OfferID offerId);
 
   /**
-   * Removes all filters, previously set by the framework (via {@link
-   * #launchTasks}). This enables the framework to receive offers
-   * from those filtered slaves.
+   * Removes all filters previously set by the framework (via launchTasks()
+   * or declineOffer()) and clears the set of suppressed roles.
+   *
+   * NOTE: If the framework is not connected to the master, the set
+   * of suppressed roles stored by the driver will be cleared, and an
+   * up-to-date set of suppressed roles will be sent to the master
+   * during re-registration.
    *
    * @return    The state of the driver after the call.
    *
@@ -265,8 +269,15 @@ public interface SchedulerDriver {
   Status reviveOffers();
 
   /**
-   * Inform Mesos master to stop sending offers to the framework. The
-   * scheduler should call reviveOffers() to resume getting offers.
+   * Informs Mesos master to stop sending offers to the framework (i.e.
+   * to suppress all roles of the framework). To resume getting offers,
+   * the scheduler can call reviveOffers() or set the suppressed roles
+   * explicitly via updateFramework().
+   *
+   * NOTE: If the framework is not connected to the master, all the roles
+   * will be added to the set of suppressed roles in the driver, and an
+   * up-to-date suppressed roles set will be sent to the master during
+   * re-registration.
    *
    * @return    The state of the driver after the call.
    *
diff --git a/src/python/interface/src/mesos/interface/__init__.py b/src/python/interface/src/mesos/interface/__init__.py
index f9642a0..1200ef6 100644
--- a/src/python/interface/src/mesos/interface/__init__.py
+++ b/src/python/interface/src/mesos/interface/__init__.py
@@ -240,15 +240,26 @@ class SchedulerDriver(object):
 
   def reviveOffers(self):
     """
-      Removes all filters previously set by the framework (via
-      launchTasks()).  This enables the framework to receive offers from
-      those filtered slaves.
+      Removes all filters previously set by the framework (via launchTasks()
+      or declineOffer()) and clears the set of suppressed roles.
+
+      NOTE: If the framework is not connected to the master, the set
+      of suppressed roles stored by the driver will be cleared, and an
+      up-to-date set of suppressed roles will be sent to the master
+      during re-registration.
     """
 
   def suppressOffers(self):
     """
-      Inform Mesos master to stop sending offers to the framework. The
-      scheduler should call reviveOffers() to resume getting offers.
+      Informs Mesos master to stop sending offers to the framework (i.e.
+      to suppress all roles of the framework). To resume getting offers,
+      the scheduler can call reviveOffers() or set the suppressed roles
+      explicitly via updateFramework().
+
+      NOTE: If the framework is not connected to the master, all the roles
+      will be added to the set of suppressed roles in the driver, and an
+      up-to-date suppressed roles set will be sent to the master during
+      re-registration.
     """
 
   def acknowledgeStatusUpdate(self, status):
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index e6cc534..ac98b39 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -103,6 +103,8 @@ using namespace mesos::internal;
 using namespace mesos::internal::master;
 using namespace mesos::scheduler;
 
+using google::protobuf::RepeatedPtrField;
+
 using mesos::master::detector::MasterDetector;
 
 using process::Clock;
@@ -116,6 +118,7 @@ using process::UPID;
 using std::map;
 using std::mutex;
 using std::shared_ptr;
+using std::set;
 using std::string;
 using std::vector;
 using std::weak_ptr;
@@ -833,6 +836,8 @@ protected:
 
     Call::Subscribe* subscribe = call.mutable_subscribe();
     subscribe->mutable_framework_info()->CopyFrom(framework);
+    *subscribe->mutable_suppressed_roles() = RepeatedPtrField<string>(
+        suppressedRoles.begin(), suppressedRoles.end());
 
     if (framework.has_id() && !framework.id().value().empty()) {
       subscribe->set_force(failover);
@@ -1433,11 +1438,19 @@ protected:
 
   void reviveOffers()
   {
+    suppressedRoles.clear();
+
     if (!connected) {
-      VLOG(1) << "Ignoring revive offers message as master is disconnected";
+      VLOG(1) << "Ignoring REVIVE as master is disconnected;"
+              << " the set of suppressed roles in the driver has been cleared"
+              << " and will be sent to the master during re-registration";
+
+      sendUpdateFrameworkOnConnect = true;
       return;
     }
 
+    VLOG(2) << "Sending REVIVE for all roles";
+
     Call call;
 
     CHECK(framework.has_id());
@@ -1450,11 +1463,20 @@ protected:
 
   void suppressOffers()
   {
+    suppressedRoles =
+      std::set<string>(framework.roles().begin(), framework.roles().end());
+
     if (!connected) {
-      VLOG(1) << "Ignoring suppress offers message as master is disconnected";
+      VLOG(1) << "Ignoring SUPPRESS as master is disconnected;"
+              << " the set of suppressed roles in the driver has been updated"
+              << " and will be sent to the master during re-registration";
+
+      sendUpdateFrameworkOnConnect = true;
       return;
     }
 
+    VLOG(2) << "Sending SUPPRESS for all roles";
+
     Call call;
 
     CHECK(framework.has_id());
@@ -1595,7 +1617,9 @@ protected:
     send(master->pid(), call);
   }
 
-  void updateFramework(const FrameworkInfo& framework_)
+  void updateFramework(
+      const FrameworkInfo& framework_,
+      set<string>&& suppressedRoles_)
   {
     if (!framework.has_id() || framework.id().value().empty()) {
       error("MesosSchedulerDriver::updateFramework() must not be called"
@@ -1613,6 +1637,7 @@ protected:
     }
 
     framework = framework_;
+    suppressedRoles = std::move(suppressedRoles_);
 
     if (connected) {
       sendUpdateFramework();
@@ -1675,6 +1700,8 @@ private:
 
     call.set_type(Call::UPDATE_FRAMEWORK);
     *call.mutable_update_framework()->mutable_framework_info() = framework;
+    *call.mutable_update_framework()->mutable_suppressed_roles() =
+      RepeatedPtrField<string>(suppressedRoles.begin(), suppressedRoles.end());
 
     VLOG(1) << "Sending UPDATE_FRAMEWORK message";
 
@@ -1686,6 +1713,8 @@ private:
   MesosSchedulerDriver* driver;
   Scheduler* scheduler;
   FrameworkInfo framework;
+  set<string> suppressedRoles;
+
   std::recursive_mutex* mutex;
   Latch* latch;
 
@@ -2343,7 +2372,9 @@ Status MesosSchedulerDriver::reconcileTasks(
 }
 
 
-Status MesosSchedulerDriver::updateFramework(const FrameworkInfo& update)
+Status MesosSchedulerDriver::updateFramework(
+  const FrameworkInfo& update,
+  const vector<string>& suppressedRoles_)
 {
   synchronized (mutex) {
     if (status != DRIVER_RUNNING) {
@@ -2356,7 +2387,16 @@ Status MesosSchedulerDriver::updateFramework(const FrameworkInfo& update)
 
     CHECK(process != nullptr);
 
-    dispatch(process, &SchedulerProcess::updateFramework, framework);
+    set<string> suppressedRoles(
+        suppressedRoles_.begin(), suppressedRoles_.end());
+
+    CHECK_EQ(suppressedRoles_.size(), suppressedRoles.size())
+      << "Invalid suppressed role list: contains"
+      << " " << suppressedRoles_.size() - suppressedRoles.size()
+      << " duplicates " << suppressedRoles_;
+
+    dispatch(process, &SchedulerProcess::updateFramework, framework,
+             std::move(suppressedRoles));
 
     return status;
   }
diff --git a/src/tests/master/update_framework_tests.cpp b/src/tests/master/update_framework_tests.cpp
index 0d466e2..8487f6c 100644
--- a/src/tests/master/update_framework_tests.cpp
+++ b/src/tests/master/update_framework_tests.cpp
@@ -623,7 +623,7 @@ TEST_F(UpdateFrameworkV0Test, DriverErrorWhenCalledBeforeRegistration)
 
   driver.start();
 
-  driver.updateFramework(DEFAULT_FRAMEWORK_INFO);
+  driver.updateFramework(DEFAULT_FRAMEWORK_INFO, {});
 
   AWAIT_READY(error);
   EXPECT_EQ(error.get(),
@@ -662,7 +662,7 @@ TEST_F(UpdateFrameworkV0Test, DriverErrorOnFrameworkIDMismatch)
   *update.mutable_id() = frameworkId.get();
   *update.mutable_id()->mutable_value() += "-deadbeef";
 
-  driver.updateFramework(update);
+  driver.updateFramework(update, {});
 
   AWAIT_READY(error);
   EXPECT_EQ(
@@ -704,7 +704,7 @@ TEST_F(UpdateFrameworkV0Test, CheckpointingChangeFails)
   FrameworkInfo update = changeAllMutableFields(DEFAULT_FRAMEWORK_INFO);
   update.set_checkpoint(!update.checkpoint());
   *update.mutable_id() = frameworkId.get();
-  driver.updateFramework(update);
+  driver.updateFramework(update, {});
 
   AWAIT_READY(error);
   EXPECT_TRUE(strings::contains(
@@ -762,7 +762,7 @@ TEST_F(UpdateFrameworkV0Test, MutableFieldsUpdateSuccessfully)
   FrameworkInfo update = changeAllMutableFields(DEFAULT_FRAMEWORK_INFO);
   *update.mutable_id() = frameworkId.get();
 
-  driver.updateFramework(update);
+  driver.updateFramework(update, {});
 
   AWAIT_READY(updateFrameworkMessage);
 
@@ -854,7 +854,7 @@ TEST_F(UpdateFrameworkV0Test, OffersOnAddingRole)
   update.add_roles("new_role");
   *update.mutable_id() = frameworkId.get();
 
-  driver.updateFramework(update);
+  driver.updateFramework(update, {});
 
   AWAIT_READY(offers);
 
@@ -914,7 +914,7 @@ TEST_F(UpdateFrameworkV0Test, RescindOnRemovingRoles)
   update.clear_roles();
   *update.mutable_id() = frameworkId.get();
 
-  driver.updateFramework(update);
+  driver.updateFramework(update, {});
 
   AWAIT_READY(rescindedOfferId);
   AWAIT_READY(recoverResources);