You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2019/07/24 15:02:48 UTC

[mesos] 02/03: Added enchanced multi-role capability support to the python bindings.

This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit aebf86f97a77da233b523eb7381bf1132a90410c
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Wed Jul 24 11:02:04 2019 -0400

    Added enchanced multi-role capability support to the python bindings.
    
    This patch adds suppressed roles list to the arguments of the scheduler
    driver constructor and `revivieOffers()`/`suppressOffers` methods, and
    also adds support of the `updateFramework()` method.
    
    Review: https://reviews.apache.org/r/71083/
---
 .../interface/src/mesos/interface/__init__.py      |  53 +++++++---
 .../scheduler/mesos_scheduler_driver_impl.cpp      | 108 ++++++++++++++++++---
 .../scheduler/mesos_scheduler_driver_impl.hpp      |  11 ++-
 3 files changed, 143 insertions(+), 29 deletions(-)

diff --git a/src/python/interface/src/mesos/interface/__init__.py b/src/python/interface/src/mesos/interface/__init__.py
index 1200ef6..f986185 100644
--- a/src/python/interface/src/mesos/interface/__init__.py
+++ b/src/python/interface/src/mesos/interface/__init__.py
@@ -238,28 +238,29 @@ class SchedulerDriver(object):
       callback.
     """
 
-  def reviveOffers(self):
+  def reviveOffers(self, roles=None):
     """
-      Removes all filters previously set by the framework (via launchTasks()
-      or declineOffer()) and clears the set of suppressed roles.
-
-      NOTE: If the framework is not connected to the master, the set
-      of suppressed roles stored by the driver will be cleared, and an
-      up-to-date set of suppressed roles will be sent to the master
+      Removes filters either for all roles of the framework (if 'roles'
+      is None) or for the specified roles and removes these roles from
+      the suppressed set.  If the framework is not connected to the master,
+      an up-to-date set of suppressed roles will be sent to the master
       during re-registration.
+
+      NOTE: If 'roles' is an empty iterable, this method does nothing.
     """
 
-  def suppressOffers(self):
+  def suppressOffers(self, roles=None):
     """
-      Informs Mesos master to stop sending offers to the framework (i.e.
-      to suppress all roles of the framework). To resume getting offers,
-      the scheduler can call reviveOffers() or set the suppressed roles
-      explicitly via updateFramework().
+      Informs Mesos master to stop sending offers either for all roles
+      of the framework (if 'roles' is None) or for the specified 'roles'
+      of the framework (i.e. to suppress these roles). To resume getting
+      offers, the scheduler can call reviveOffers() or set the suppressed
+      roles explicitly via updateFramework().
+
+      NOTE: If the framework is not connected to the master, an up-to-date set
+      of suppressed roles will be sent to the master during re-registration.
 
-      NOTE: If the framework is not connected to the master, all the roles
-      will be added to the set of suppressed roles in the driver, and an
-      up-to-date suppressed roles set will be sent to the master during
-      re-registration.
+      NOTE: If `roles` is an empty iterable, this method does nothing.
     """
 
   def acknowledgeStatusUpdate(self, status):
@@ -288,6 +289,26 @@ class SchedulerDriver(object):
       currently known.
     """
 
+  def updateFramework(self, frameworkInfo, suppressedRoles):
+    """
+      Inform Mesos master about changes to the `FrameworkInfo` and
+      the set of suppressed roles. The driver will store the new
+      `FrameworkInfo` and the new set of suppressed roles, and all
+      subsequent re-registrations will use them.
+
+      NOTE: If the supplied info is invalid or fails authorization,
+      the `error()` callback will be invoked asynchronously (after
+      the master replies with a `FrameworkErrorMessage`).
+
+      NOTE: This must be called after initial registration with the
+      master completes and the `FrameworkID` is assigned. The assigned
+      `FrameworkID` must be set in `frameworkInfo`.
+
+      NOTE: The `FrameworkInfo.user` and `FrameworkInfo.hostname`
+      fields will be auto-populated using the same approach used
+      during driver initialization.
+    """
+
 class Executor(object):
   """
     Base class for Mesos executors. Users' executors should extend this
diff --git a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp
index 6dec9da..256632a 100644
--- a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp
+++ b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp
@@ -32,6 +32,7 @@ using std::endl;
 using std::string;
 using std::vector;
 using std::map;
+using std::unique_ptr;
 
 namespace mesos {
 namespace python {
@@ -138,13 +139,13 @@ PyMethodDef MesosSchedulerDriverImpl_methods[] = {
   },
   { "reviveOffers",
     (PyCFunction) MesosSchedulerDriverImpl_reviveOffers,
-    METH_NOARGS,
-    "Remove all filters and ask Mesos for new offers"
+    METH_VARARGS,
+    "Remove all filters, unsuppress and ask Mesos for new offers for the roles"
   },
   { "suppressOffers",
     (PyCFunction) MesosSchedulerDriverImpl_suppressOffers,
-    METH_NOARGS,
-    "Set suppressed attribute as true for the Framework"
+    METH_VARARGS,
+    "Set suppressed roles for the Framework"
   },
   { "acknowledgeStatusUpdate",
     (PyCFunction) MesosSchedulerDriverImpl_acknowledgeStatusUpdate,
@@ -161,6 +162,11 @@ PyMethodDef MesosSchedulerDriverImpl_methods[] = {
     METH_VARARGS,
     "Master sends status updates if task status is different from expected"
   },
+  { "updateFramework",
+    (PyCFunction) MesosSchedulerDriverImpl_updateFramework,
+    METH_VARARGS,
+    "Updates FrameworkInfo and suppressed roles"
+  },
   { nullptr }  /* Sentinel */
 };
 
@@ -198,15 +204,17 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
   const char* master;
   int implicitAcknowledgements = 1; // Enabled by default.
   PyObject* credentialObj = nullptr;
+  PyObject* suppressedRolesObj = nullptr;
 
   if (!PyArg_ParseTuple(
       args,
-      "OOs|iO",
+      "OOs|iOO",
       &schedulerObj,
       &frameworkObj,
       &master,
       &implicitAcknowledgements,
-      &credentialObj)) {
+      &credentialObj,
+      &suppressedRolesObj)) {
     return -1;
   }
 
@@ -234,6 +242,14 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
     }
   }
 
+  unique_ptr<vector<string>> suppressedRoles;
+  if (suppressedRolesObj != nullptr && suppressedRolesObj != Py_None) {
+    suppressedRoles = constructFromIterable<string>(suppressedRolesObj);
+    if (!suppressedRoles) {
+      // Exception has been set by constructFromIterable
+      return -1;
+    }
+  }
 
   if (self->driver != nullptr) {
     delete self->driver;
@@ -251,6 +267,7 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
     self->driver = new MesosSchedulerDriver(
         self->proxyScheduler,
         framework,
+        suppressedRoles ? *suppressedRoles : vector<string>{},
         master,
         implicitAcknowledgements != 0,
         credential);
@@ -258,6 +275,7 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
     self->driver = new MesosSchedulerDriver(
         self->proxyScheduler,
         framework,
+        suppressedRoles ? *suppressedRoles : vector<string>{},
         master,
         implicitAcknowledgements != 0);
   }
@@ -645,27 +663,64 @@ PyObject* MesosSchedulerDriverImpl_declineOffer(MesosSchedulerDriverImpl* self,
 }
 
 
-PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self)
+PyObject* MesosSchedulerDriverImpl_reviveOffers(
+    MesosSchedulerDriverImpl* self,
+    PyObject* args)
 {
+  PyObject* rolesObj = nullptr;
+  if (!PyArg_ParseTuple(args, "|O", &rolesObj)) {
+    return nullptr;
+  }
+
   if (self->driver == nullptr) {
     PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
     return nullptr;
   }
 
-  Status status = self->driver->reviveOffers();
-  return PyInt_FromLong(status); // Sets exception if creating long fails.
+  Status status;
+
+  if (rolesObj == nullptr || rolesObj == Py_None) {
+    status = self->driver->reviveOffers();
+  } else {
+    unique_ptr<vector<string>> roles = constructFromIterable<string>(rolesObj);
+    if (!roles) {
+      return nullptr;
+    }
+
+    status = self->driver->reviveOffers(*roles);
+  }
+
+  return PyInt_FromLong(status);
 }
 
 
 PyObject* MesosSchedulerDriverImpl_suppressOffers(
-    MesosSchedulerDriverImpl* self)
+    MesosSchedulerDriverImpl* self,
+    PyObject* args)
 {
+  PyObject* rolesObj = nullptr;
+  if (!PyArg_ParseTuple(args, "|O", &rolesObj)) {
+    return nullptr;
+  }
+
   if (self->driver == nullptr) {
     PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
     return nullptr;
   }
 
-  Status status = self->driver->suppressOffers();
+  Status status;
+
+  if (rolesObj == nullptr || rolesObj == Py_None) {
+    status = self->driver->suppressOffers();
+  } else {
+    unique_ptr<vector<string>> roles = constructFromIterable<string>(rolesObj);
+    if (!roles) {
+      return nullptr;
+    }
+
+    status = self->driver->suppressOffers(*roles);
+  }
+
   return PyInt_FromLong(status); // Sets exception if creating long fails.
 }
 
@@ -778,5 +833,36 @@ PyObject* MesosSchedulerDriverImpl_reconcileTasks(
   return PyInt_FromLong(status);
 }
 
+
+PyObject* MesosSchedulerDriverImpl_updateFramework(
+    MesosSchedulerDriverImpl* self,
+    PyObject* args)
+{
+  PyObject* frameworkObj = nullptr;
+  PyObject* suppressedRolesObj = nullptr;
+
+  if (!PyArg_ParseTuple(args, "OO", &frameworkObj, &suppressedRolesObj)) {
+    return nullptr;
+  }
+
+  FrameworkInfo framework;
+  if (!readPythonProtobuf(frameworkObj, &framework)) {
+    PyErr_Format(PyExc_Exception,
+                 "Could not deserialize Python FrameworkInfo");
+    return nullptr;
+  }
+
+  unique_ptr<vector<string>> suppressedRoles;
+  suppressedRoles = constructFromIterable<string>(suppressedRolesObj);
+  if (!suppressedRoles) {
+    // Exception has been set by constructFromIterable
+    return nullptr;
+  }
+
+  Status status = self->driver->updateFramework(framework, *suppressedRoles);
+  return PyInt_FromLong(status);
+}
+
+
 } // namespace python {
 } // namespace mesos {
diff --git a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp
index 8c98d46..7bc0856 100644
--- a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp
+++ b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp
@@ -111,10 +111,13 @@ PyObject* MesosSchedulerDriverImpl_declineOffer(
     MesosSchedulerDriverImpl* self,
     PyObject* args);
 
-PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self);
+PyObject* MesosSchedulerDriverImpl_reviveOffers(
+    MesosSchedulerDriverImpl* self,
+    PyObject* pyRoles);
 
 PyObject* MesosSchedulerDriverImpl_suppressOffers(
-    MesosSchedulerDriverImpl* self);
+    MesosSchedulerDriverImpl* self,
+    PyObject* pyRoles);
 
 PyObject* MesosSchedulerDriverImpl_acknowledgeStatusUpdate(
     MesosSchedulerDriverImpl* self,
@@ -128,6 +131,10 @@ PyObject* MesosSchedulerDriverImpl_reconcileTasks(
     MesosSchedulerDriverImpl* self,
     PyObject* args);
 
+PyObject* MesosSchedulerDriverImpl_updateFramework(
+    MesosSchedulerDriverImpl* self,
+    PyObject* args);
+
 } // namespace python {
 } // namespace mesos {