You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2019/07/24 15:02:48 UTC
[mesos] 02/03: Added enchanced multi-role capability support to the
python bindings.
This is an automated email from the ASF dual-hosted git repository.
bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit aebf86f97a77da233b523eb7381bf1132a90410c
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Wed Jul 24 11:02:04 2019 -0400
Added enchanced multi-role capability support to the python bindings.
This patch adds suppressed roles list to the arguments of the scheduler
driver constructor and `revivieOffers()`/`suppressOffers` methods, and
also adds support of the `updateFramework()` method.
Review: https://reviews.apache.org/r/71083/
---
.../interface/src/mesos/interface/__init__.py | 53 +++++++---
.../scheduler/mesos_scheduler_driver_impl.cpp | 108 ++++++++++++++++++---
.../scheduler/mesos_scheduler_driver_impl.hpp | 11 ++-
3 files changed, 143 insertions(+), 29 deletions(-)
diff --git a/src/python/interface/src/mesos/interface/__init__.py b/src/python/interface/src/mesos/interface/__init__.py
index 1200ef6..f986185 100644
--- a/src/python/interface/src/mesos/interface/__init__.py
+++ b/src/python/interface/src/mesos/interface/__init__.py
@@ -238,28 +238,29 @@ class SchedulerDriver(object):
callback.
"""
- def reviveOffers(self):
+ def reviveOffers(self, roles=None):
"""
- Removes all filters previously set by the framework (via launchTasks()
- or declineOffer()) and clears the set of suppressed roles.
-
- NOTE: If the framework is not connected to the master, the set
- of suppressed roles stored by the driver will be cleared, and an
- up-to-date set of suppressed roles will be sent to the master
+ Removes filters either for all roles of the framework (if 'roles'
+ is None) or for the specified roles and removes these roles from
+ the suppressed set. If the framework is not connected to the master,
+ an up-to-date set of suppressed roles will be sent to the master
during re-registration.
+
+ NOTE: If 'roles' is an empty iterable, this method does nothing.
"""
- def suppressOffers(self):
+ def suppressOffers(self, roles=None):
"""
- Informs Mesos master to stop sending offers to the framework (i.e.
- to suppress all roles of the framework). To resume getting offers,
- the scheduler can call reviveOffers() or set the suppressed roles
- explicitly via updateFramework().
+ Informs Mesos master to stop sending offers either for all roles
+ of the framework (if 'roles' is None) or for the specified 'roles'
+ of the framework (i.e. to suppress these roles). To resume getting
+ offers, the scheduler can call reviveOffers() or set the suppressed
+ roles explicitly via updateFramework().
+
+ NOTE: If the framework is not connected to the master, an up-to-date set
+ of suppressed roles will be sent to the master during re-registration.
- NOTE: If the framework is not connected to the master, all the roles
- will be added to the set of suppressed roles in the driver, and an
- up-to-date suppressed roles set will be sent to the master during
- re-registration.
+ NOTE: If `roles` is an empty iterable, this method does nothing.
"""
def acknowledgeStatusUpdate(self, status):
@@ -288,6 +289,26 @@ class SchedulerDriver(object):
currently known.
"""
+ def updateFramework(self, frameworkInfo, suppressedRoles):
+ """
+ Inform Mesos master about changes to the `FrameworkInfo` and
+ the set of suppressed roles. The driver will store the new
+ `FrameworkInfo` and the new set of suppressed roles, and all
+ subsequent re-registrations will use them.
+
+ NOTE: If the supplied info is invalid or fails authorization,
+ the `error()` callback will be invoked asynchronously (after
+ the master replies with a `FrameworkErrorMessage`).
+
+ NOTE: This must be called after initial registration with the
+ master completes and the `FrameworkID` is assigned. The assigned
+ `FrameworkID` must be set in `frameworkInfo`.
+
+ NOTE: The `FrameworkInfo.user` and `FrameworkInfo.hostname`
+ fields will be auto-populated using the same approach used
+ during driver initialization.
+ """
+
class Executor(object):
"""
Base class for Mesos executors. Users' executors should extend this
diff --git a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp
index 6dec9da..256632a 100644
--- a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp
+++ b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp
@@ -32,6 +32,7 @@ using std::endl;
using std::string;
using std::vector;
using std::map;
+using std::unique_ptr;
namespace mesos {
namespace python {
@@ -138,13 +139,13 @@ PyMethodDef MesosSchedulerDriverImpl_methods[] = {
},
{ "reviveOffers",
(PyCFunction) MesosSchedulerDriverImpl_reviveOffers,
- METH_NOARGS,
- "Remove all filters and ask Mesos for new offers"
+ METH_VARARGS,
+ "Remove all filters, unsuppress and ask Mesos for new offers for the roles"
},
{ "suppressOffers",
(PyCFunction) MesosSchedulerDriverImpl_suppressOffers,
- METH_NOARGS,
- "Set suppressed attribute as true for the Framework"
+ METH_VARARGS,
+ "Set suppressed roles for the Framework"
},
{ "acknowledgeStatusUpdate",
(PyCFunction) MesosSchedulerDriverImpl_acknowledgeStatusUpdate,
@@ -161,6 +162,11 @@ PyMethodDef MesosSchedulerDriverImpl_methods[] = {
METH_VARARGS,
"Master sends status updates if task status is different from expected"
},
+ { "updateFramework",
+ (PyCFunction) MesosSchedulerDriverImpl_updateFramework,
+ METH_VARARGS,
+ "Updates FrameworkInfo and suppressed roles"
+ },
{ nullptr } /* Sentinel */
};
@@ -198,15 +204,17 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
const char* master;
int implicitAcknowledgements = 1; // Enabled by default.
PyObject* credentialObj = nullptr;
+ PyObject* suppressedRolesObj = nullptr;
if (!PyArg_ParseTuple(
args,
- "OOs|iO",
+ "OOs|iOO",
&schedulerObj,
&frameworkObj,
&master,
&implicitAcknowledgements,
- &credentialObj)) {
+ &credentialObj,
+ &suppressedRolesObj)) {
return -1;
}
@@ -234,6 +242,14 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
}
}
+ unique_ptr<vector<string>> suppressedRoles;
+ if (suppressedRolesObj != nullptr && suppressedRolesObj != Py_None) {
+ suppressedRoles = constructFromIterable<string>(suppressedRolesObj);
+ if (!suppressedRoles) {
+ // Exception has been set by constructFromIterable
+ return -1;
+ }
+ }
if (self->driver != nullptr) {
delete self->driver;
@@ -251,6 +267,7 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
self->driver = new MesosSchedulerDriver(
self->proxyScheduler,
framework,
+ suppressedRoles ? *suppressedRoles : vector<string>{},
master,
implicitAcknowledgements != 0,
credential);
@@ -258,6 +275,7 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
self->driver = new MesosSchedulerDriver(
self->proxyScheduler,
framework,
+ suppressedRoles ? *suppressedRoles : vector<string>{},
master,
implicitAcknowledgements != 0);
}
@@ -645,27 +663,64 @@ PyObject* MesosSchedulerDriverImpl_declineOffer(MesosSchedulerDriverImpl* self,
}
-PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self)
+PyObject* MesosSchedulerDriverImpl_reviveOffers(
+ MesosSchedulerDriverImpl* self,
+ PyObject* args)
{
+ PyObject* rolesObj = nullptr;
+ if (!PyArg_ParseTuple(args, "|O", &rolesObj)) {
+ return nullptr;
+ }
+
if (self->driver == nullptr) {
PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
return nullptr;
}
- Status status = self->driver->reviveOffers();
- return PyInt_FromLong(status); // Sets exception if creating long fails.
+ Status status;
+
+ if (rolesObj == nullptr || rolesObj == Py_None) {
+ status = self->driver->reviveOffers();
+ } else {
+ unique_ptr<vector<string>> roles = constructFromIterable<string>(rolesObj);
+ if (!roles) {
+ return nullptr;
+ }
+
+ status = self->driver->reviveOffers(*roles);
+ }
+
+ return PyInt_FromLong(status);
}
PyObject* MesosSchedulerDriverImpl_suppressOffers(
- MesosSchedulerDriverImpl* self)
+ MesosSchedulerDriverImpl* self,
+ PyObject* args)
{
+ PyObject* rolesObj = nullptr;
+ if (!PyArg_ParseTuple(args, "|O", &rolesObj)) {
+ return nullptr;
+ }
+
if (self->driver == nullptr) {
PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
return nullptr;
}
- Status status = self->driver->suppressOffers();
+ Status status;
+
+ if (rolesObj == nullptr || rolesObj == Py_None) {
+ status = self->driver->suppressOffers();
+ } else {
+ unique_ptr<vector<string>> roles = constructFromIterable<string>(rolesObj);
+ if (!roles) {
+ return nullptr;
+ }
+
+ status = self->driver->suppressOffers(*roles);
+ }
+
return PyInt_FromLong(status); // Sets exception if creating long fails.
}
@@ -778,5 +833,36 @@ PyObject* MesosSchedulerDriverImpl_reconcileTasks(
return PyInt_FromLong(status);
}
+
+PyObject* MesosSchedulerDriverImpl_updateFramework(
+ MesosSchedulerDriverImpl* self,
+ PyObject* args)
+{
+ PyObject* frameworkObj = nullptr;
+ PyObject* suppressedRolesObj = nullptr;
+
+ if (!PyArg_ParseTuple(args, "OO", &frameworkObj, &suppressedRolesObj)) {
+ return nullptr;
+ }
+
+ FrameworkInfo framework;
+ if (!readPythonProtobuf(frameworkObj, &framework)) {
+ PyErr_Format(PyExc_Exception,
+ "Could not deserialize Python FrameworkInfo");
+ return nullptr;
+ }
+
+ unique_ptr<vector<string>> suppressedRoles;
+ suppressedRoles = constructFromIterable<string>(suppressedRolesObj);
+ if (!suppressedRoles) {
+ // Exception has been set by constructFromIterable
+ return nullptr;
+ }
+
+ Status status = self->driver->updateFramework(framework, *suppressedRoles);
+ return PyInt_FromLong(status);
+}
+
+
} // namespace python {
} // namespace mesos {
diff --git a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp
index 8c98d46..7bc0856 100644
--- a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp
+++ b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp
@@ -111,10 +111,13 @@ PyObject* MesosSchedulerDriverImpl_declineOffer(
MesosSchedulerDriverImpl* self,
PyObject* args);
-PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self);
+PyObject* MesosSchedulerDriverImpl_reviveOffers(
+ MesosSchedulerDriverImpl* self,
+ PyObject* pyRoles);
PyObject* MesosSchedulerDriverImpl_suppressOffers(
- MesosSchedulerDriverImpl* self);
+ MesosSchedulerDriverImpl* self,
+ PyObject* pyRoles);
PyObject* MesosSchedulerDriverImpl_acknowledgeStatusUpdate(
MesosSchedulerDriverImpl* self,
@@ -128,6 +131,10 @@ PyObject* MesosSchedulerDriverImpl_reconcileTasks(
MesosSchedulerDriverImpl* self,
PyObject* args);
+PyObject* MesosSchedulerDriverImpl_updateFramework(
+ MesosSchedulerDriverImpl* self,
+ PyObject* args);
+
} // namespace python {
} // namespace mesos {