You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2019/07/03 23:07:08 UTC

[mesos] 01/04: Added ability to revive a subset of roles in the scheduler driver.

This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit f0285ecbfd38a60e17937834aec58d0a9235d1d7
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Wed Jul 3 18:46:30 2019 -0400

    Added ability to revive a subset of roles in the scheduler driver.
    
    This patch adds to the scheduler driver a 'reviveOffers(roles)' method,
    which sends the REVIVE call for these roles and removes them from the
    suppressed roles set.
    
    Review: https://reviews.apache.org/r/70941/
---
 include/mesos/scheduler.hpp | 10 ++++++++++
 src/sched/sched.cpp         | 46 ++++++++++++++++++++++++++++++++++++++-------
 2 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 0a09d55..cc3a278 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -293,6 +293,14 @@ public:
   // during re-registration.
   virtual Status reviveOffers() = 0;
 
+  // Removes filters for the specified roles and removes  these roles from
+  // the suppressed set.  If the framework is not connected to the master,
+  // an up-to-date set of suppressed roles will be sent to the master
+  // during re-registration.
+  //
+  // NOTE: If 'roles' is empty, this method does nothing.
+  virtual Status reviveOffers(const std::vector<std::string>& roles) = 0;
+
   // Informs Mesos master to stop sending offers to the framework (i.e.
   // to suppress all roles of the framework). To resume getting offers,
   // the scheduler can call reviveOffers() or set the suppressed roles
@@ -490,6 +498,8 @@ public:
 
   Status reviveOffers() override;
 
+  Status reviveOffers(const std::vector<std::string>& roles) override;
+
   Status suppressOffers() override;
 
   Status acknowledgeStatusUpdate(
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 6b02ac0..47b87ad 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -116,6 +116,7 @@ using process::Process;
 using process::UPID;
 
 using std::map;
+using std::make_move_iterator;
 using std::mutex;
 using std::shared_ptr;
 using std::set;
@@ -1438,27 +1439,38 @@ protected:
     send(master->pid(), call);
   }
 
-  void reviveOffers()
+  void reviveOffers(const vector<string>& roles)
   {
-    suppressedRoles.clear();
+    if (roles.empty()) {
+      suppressedRoles.clear();
+    } else {
+      for (const string& role : roles) {
+        suppressedRoles.erase(role);
+      }
+    }
 
     if (!connected) {
       VLOG(1) << "Ignoring REVIVE as master is disconnected;"
-              << " the set of suppressed roles in the driver has been cleared"
+              << " the set of suppressed roles in the driver has been updated"
               << " and will be sent to the master during re-registration";
-
       sendUpdateFrameworkOnConnect = true;
       return;
     }
 
-    VLOG(2) << "Sending REVIVE for all roles";
-
     Call call;
 
     CHECK(framework.has_id());
     call.mutable_framework_id()->CopyFrom(framework.id());
     call.set_type(Call::REVIVE);
 
+    if (roles.empty()) {
+      VLOG(2) << "Sending REVIVE for all roles";
+    } else {
+      VLOG(2) << "Sending REVIVE for roles: " << stringify(roles);
+      *call.mutable_revive()->mutable_roles() =
+        RepeatedPtrField<string>(roles.begin(), roles.end());
+    }
+
     CHECK_SOME(master);
     send(master->pid(), call);
   }
@@ -2327,7 +2339,27 @@ Status MesosSchedulerDriver::reviveOffers()
 
     CHECK(process != nullptr);
 
-    dispatch(process, &SchedulerProcess::reviveOffers);
+    dispatch(process, &SchedulerProcess::reviveOffers, vector<string>());
+
+    return status;
+  }
+}
+
+
+Status MesosSchedulerDriver::reviveOffers(const vector<string>& roles)
+{
+  if (roles.empty()) {
+    return status;
+  }
+
+  synchronized (mutex) {
+    if (status != DRIVER_RUNNING) {
+      return status;
+    }
+
+    CHECK(process != nullptr);
+
+    dispatch(process, &SchedulerProcess::reviveOffers, roles);
 
     return status;
   }