You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2019/07/24 15:02:46 UTC

[mesos] branch master updated (366f568 -> 72ca397)

This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from 366f568  Fixed a compilation issue in the allocator test.
     new 254e5c2  Added a helper to create vector from iterable into python bindings.
     new aebf86f  Added enchanced multi-role capability support to the python bindings.
     new 72ca397  Added unsuppressing via `updateFramework()` to python example framework.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/examples/python/test_framework.py              |  28 +++---
 .../interface/src/mesos/interface/__init__.py      |  53 +++++++---
 src/python/native_common/common.hpp                | 104 ++++++++++++++++++++
 .../scheduler/mesos_scheduler_driver_impl.cpp      | 108 ++++++++++++++++++---
 .../scheduler/mesos_scheduler_driver_impl.hpp      |  11 ++-
 5 files changed, 262 insertions(+), 42 deletions(-)


[mesos] 01/03: Added a helper to create vector from iterable into python bindings.

Posted by bm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 254e5c2dd888bd08309dc59254a1cf2d9a745625
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Wed Jul 24 11:01:39 2019 -0400

    Added a helper to create vector from iterable into python bindings.
    
    Review: https://reviews.apache.org/r/71082/
---
 src/python/native_common/common.hpp | 104 ++++++++++++++++++++++++++++++++++++
 1 file changed, 104 insertions(+)

diff --git a/src/python/native_common/common.hpp b/src/python/native_common/common.hpp
index ed743aa..fadec27 100644
--- a/src/python/native_common/common.hpp
+++ b/src/python/native_common/common.hpp
@@ -22,8 +22,12 @@
 #include <Python.h>
 
 #include <iostream>
+#include <memory>
+#include <string>
+#include <vector>
 
 #include <google/protobuf/io/zero_copy_stream_impl.h>
+#include <google/protobuf/message.h>
 
 
 namespace mesos { namespace python {
@@ -130,6 +134,106 @@ PyObject* createPythonProtobuf(const T& t, const char* typeName)
                              str.size());
 }
 
+
+// construct<T>() should take PyObject* as an argument, try to convert that
+// PyObject into an object of type T and return an object of type equivalent
+// to std::unique_ptr<T> that should hold
+//  - the conversion result on success
+//  - nullptr on failure
+// Also, the Python exception should be set on conversion failure.
+//
+// TODO(asekretenko): use std::optional or stout Try instead of
+// std::unique_ptr when they become available in this code.
+//
+// Declaration of 'construct<T>()' for protobufs.
+template <typename T>
+typename std::enable_if<
+  std::is_base_of<google::protobuf::Message, T>::value,
+  std::unique_ptr<T>>::type
+construct(PyObject* obj)
+{
+  std::unique_ptr<T> result(new T());
+  if (!readPythonProtobuf(obj, result.get())) {
+    PyErr_Format(
+        PyExc_TypeError,
+        "Failed to construct %s from a Python object",
+        result->GetDescriptor()->full_name().c_str());
+
+    return nullptr;
+  }
+
+  return result;
+}
+
+
+// Declaration of 'construct<T>()' for non-protobufs.
+template <typename T>
+typename std::enable_if<
+  !std::is_base_of<google::protobuf::Message, T>::value,
+  std::unique_ptr<T>>::type
+construct(PyObject* obj);
+
+
+// TODO(asekretenko): move this specialization into .cpp file. That file will
+// likely have to be put into a library (there is no simple way to use one
+// source file in two python extensions that can be built concurrently).
+template <>
+inline std::unique_ptr<std::string> construct<std::string>(PyObject* obj)
+{
+  char* chars;
+  Py_ssize_t len;
+  if (PyString_AsStringAndSize(obj, &chars, &len) < 0) {
+    PyErr_Print();
+    PyErr_Format(
+        PyExc_TypeError,
+        "Cannot construct std::string from a non-string object");
+
+    return nullptr;
+  }
+
+  return std::unique_ptr<std::string>(new std::string(chars, len));
+}
+
+
+template <typename T>
+std::unique_ptr<std::vector<T>> constructFromIterable(PyObject* iterable)
+{
+  PyObject* pyIterator = PyObject_GetIter(iterable);
+  if (pyIterator == nullptr) {
+    PyErr_Format(
+      PyExc_TypeError,
+      "Cannot construct std::vector from a non-iterable object");
+    return nullptr;
+  }
+
+  std::unique_ptr<std::vector<T>> result(new std::vector<T>());
+
+  PyObject* pyItem;
+
+  while ((pyItem = PyIter_Next(pyIterator)) != nullptr) {
+    std::unique_ptr<T> item = construct<T>(pyItem);
+    if (!item) {
+      // An exception has already been set by construct<>().
+      Py_DECREF(pyItem);
+      Py_DECREF(pyIterator);
+      return nullptr;
+    }
+
+    result->emplace_back(std::move(*item));
+    Py_DECREF(pyItem);
+  }
+
+  Py_DECREF(pyIterator);
+
+  if (PyErr_Occurred() != nullptr) {
+    return nullptr;
+  }
+
+  return result;
+}
+
+
+
 } // namespace python {
 } // namespace mesos {
 


[mesos] 02/03: Added enchanced multi-role capability support to the python bindings.

Posted by bm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit aebf86f97a77da233b523eb7381bf1132a90410c
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Wed Jul 24 11:02:04 2019 -0400

    Added enchanced multi-role capability support to the python bindings.
    
    This patch adds suppressed roles list to the arguments of the scheduler
    driver constructor and `revivieOffers()`/`suppressOffers` methods, and
    also adds support of the `updateFramework()` method.
    
    Review: https://reviews.apache.org/r/71083/
---
 .../interface/src/mesos/interface/__init__.py      |  53 +++++++---
 .../scheduler/mesos_scheduler_driver_impl.cpp      | 108 ++++++++++++++++++---
 .../scheduler/mesos_scheduler_driver_impl.hpp      |  11 ++-
 3 files changed, 143 insertions(+), 29 deletions(-)

diff --git a/src/python/interface/src/mesos/interface/__init__.py b/src/python/interface/src/mesos/interface/__init__.py
index 1200ef6..f986185 100644
--- a/src/python/interface/src/mesos/interface/__init__.py
+++ b/src/python/interface/src/mesos/interface/__init__.py
@@ -238,28 +238,29 @@ class SchedulerDriver(object):
       callback.
     """
 
-  def reviveOffers(self):
+  def reviveOffers(self, roles=None):
     """
-      Removes all filters previously set by the framework (via launchTasks()
-      or declineOffer()) and clears the set of suppressed roles.
-
-      NOTE: If the framework is not connected to the master, the set
-      of suppressed roles stored by the driver will be cleared, and an
-      up-to-date set of suppressed roles will be sent to the master
+      Removes filters either for all roles of the framework (if 'roles'
+      is None) or for the specified roles and removes these roles from
+      the suppressed set.  If the framework is not connected to the master,
+      an up-to-date set of suppressed roles will be sent to the master
       during re-registration.
+
+      NOTE: If 'roles' is an empty iterable, this method does nothing.
     """
 
-  def suppressOffers(self):
+  def suppressOffers(self, roles=None):
     """
-      Informs Mesos master to stop sending offers to the framework (i.e.
-      to suppress all roles of the framework). To resume getting offers,
-      the scheduler can call reviveOffers() or set the suppressed roles
-      explicitly via updateFramework().
+      Informs Mesos master to stop sending offers either for all roles
+      of the framework (if 'roles' is None) or for the specified 'roles'
+      of the framework (i.e. to suppress these roles). To resume getting
+      offers, the scheduler can call reviveOffers() or set the suppressed
+      roles explicitly via updateFramework().
+
+      NOTE: If the framework is not connected to the master, an up-to-date set
+      of suppressed roles will be sent to the master during re-registration.
 
-      NOTE: If the framework is not connected to the master, all the roles
-      will be added to the set of suppressed roles in the driver, and an
-      up-to-date suppressed roles set will be sent to the master during
-      re-registration.
+      NOTE: If `roles` is an empty iterable, this method does nothing.
     """
 
   def acknowledgeStatusUpdate(self, status):
@@ -288,6 +289,26 @@ class SchedulerDriver(object):
       currently known.
     """
 
+  def updateFramework(self, frameworkInfo, suppressedRoles):
+    """
+      Inform Mesos master about changes to the `FrameworkInfo` and
+      the set of suppressed roles. The driver will store the new
+      `FrameworkInfo` and the new set of suppressed roles, and all
+      subsequent re-registrations will use them.
+
+      NOTE: If the supplied info is invalid or fails authorization,
+      the `error()` callback will be invoked asynchronously (after
+      the master replies with a `FrameworkErrorMessage`).
+
+      NOTE: This must be called after initial registration with the
+      master completes and the `FrameworkID` is assigned. The assigned
+      `FrameworkID` must be set in `frameworkInfo`.
+
+      NOTE: The `FrameworkInfo.user` and `FrameworkInfo.hostname`
+      fields will be auto-populated using the same approach used
+      during driver initialization.
+    """
+
 class Executor(object):
   """
     Base class for Mesos executors. Users' executors should extend this
diff --git a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp
index 6dec9da..256632a 100644
--- a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp
+++ b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.cpp
@@ -32,6 +32,7 @@ using std::endl;
 using std::string;
 using std::vector;
 using std::map;
+using std::unique_ptr;
 
 namespace mesos {
 namespace python {
@@ -138,13 +139,13 @@ PyMethodDef MesosSchedulerDriverImpl_methods[] = {
   },
   { "reviveOffers",
     (PyCFunction) MesosSchedulerDriverImpl_reviveOffers,
-    METH_NOARGS,
-    "Remove all filters and ask Mesos for new offers"
+    METH_VARARGS,
+    "Remove all filters, unsuppress and ask Mesos for new offers for the roles"
   },
   { "suppressOffers",
     (PyCFunction) MesosSchedulerDriverImpl_suppressOffers,
-    METH_NOARGS,
-    "Set suppressed attribute as true for the Framework"
+    METH_VARARGS,
+    "Set suppressed roles for the Framework"
   },
   { "acknowledgeStatusUpdate",
     (PyCFunction) MesosSchedulerDriverImpl_acknowledgeStatusUpdate,
@@ -161,6 +162,11 @@ PyMethodDef MesosSchedulerDriverImpl_methods[] = {
     METH_VARARGS,
     "Master sends status updates if task status is different from expected"
   },
+  { "updateFramework",
+    (PyCFunction) MesosSchedulerDriverImpl_updateFramework,
+    METH_VARARGS,
+    "Updates FrameworkInfo and suppressed roles"
+  },
   { nullptr }  /* Sentinel */
 };
 
@@ -198,15 +204,17 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
   const char* master;
   int implicitAcknowledgements = 1; // Enabled by default.
   PyObject* credentialObj = nullptr;
+  PyObject* suppressedRolesObj = nullptr;
 
   if (!PyArg_ParseTuple(
       args,
-      "OOs|iO",
+      "OOs|iOO",
       &schedulerObj,
       &frameworkObj,
       &master,
       &implicitAcknowledgements,
-      &credentialObj)) {
+      &credentialObj,
+      &suppressedRolesObj)) {
     return -1;
   }
 
@@ -234,6 +242,14 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
     }
   }
 
+  unique_ptr<vector<string>> suppressedRoles;
+  if (suppressedRolesObj != nullptr && suppressedRolesObj != Py_None) {
+    suppressedRoles = constructFromIterable<string>(suppressedRolesObj);
+    if (!suppressedRoles) {
+      // Exception has been set by constructFromIterable
+      return -1;
+    }
+  }
 
   if (self->driver != nullptr) {
     delete self->driver;
@@ -251,6 +267,7 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
     self->driver = new MesosSchedulerDriver(
         self->proxyScheduler,
         framework,
+        suppressedRoles ? *suppressedRoles : vector<string>{},
         master,
         implicitAcknowledgements != 0,
         credential);
@@ -258,6 +275,7 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
     self->driver = new MesosSchedulerDriver(
         self->proxyScheduler,
         framework,
+        suppressedRoles ? *suppressedRoles : vector<string>{},
         master,
         implicitAcknowledgements != 0);
   }
@@ -645,27 +663,64 @@ PyObject* MesosSchedulerDriverImpl_declineOffer(MesosSchedulerDriverImpl* self,
 }
 
 
-PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self)
+PyObject* MesosSchedulerDriverImpl_reviveOffers(
+    MesosSchedulerDriverImpl* self,
+    PyObject* args)
 {
+  PyObject* rolesObj = nullptr;
+  if (!PyArg_ParseTuple(args, "|O", &rolesObj)) {
+    return nullptr;
+  }
+
   if (self->driver == nullptr) {
     PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
     return nullptr;
   }
 
-  Status status = self->driver->reviveOffers();
-  return PyInt_FromLong(status); // Sets exception if creating long fails.
+  Status status;
+
+  if (rolesObj == nullptr || rolesObj == Py_None) {
+    status = self->driver->reviveOffers();
+  } else {
+    unique_ptr<vector<string>> roles = constructFromIterable<string>(rolesObj);
+    if (!roles) {
+      return nullptr;
+    }
+
+    status = self->driver->reviveOffers(*roles);
+  }
+
+  return PyInt_FromLong(status);
 }
 
 
 PyObject* MesosSchedulerDriverImpl_suppressOffers(
-    MesosSchedulerDriverImpl* self)
+    MesosSchedulerDriverImpl* self,
+    PyObject* args)
 {
+  PyObject* rolesObj = nullptr;
+  if (!PyArg_ParseTuple(args, "|O", &rolesObj)) {
+    return nullptr;
+  }
+
   if (self->driver == nullptr) {
     PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is nullptr");
     return nullptr;
   }
 
-  Status status = self->driver->suppressOffers();
+  Status status;
+
+  if (rolesObj == nullptr || rolesObj == Py_None) {
+    status = self->driver->suppressOffers();
+  } else {
+    unique_ptr<vector<string>> roles = constructFromIterable<string>(rolesObj);
+    if (!roles) {
+      return nullptr;
+    }
+
+    status = self->driver->suppressOffers(*roles);
+  }
+
   return PyInt_FromLong(status); // Sets exception if creating long fails.
 }
 
@@ -778,5 +833,36 @@ PyObject* MesosSchedulerDriverImpl_reconcileTasks(
   return PyInt_FromLong(status);
 }
 
+
+PyObject* MesosSchedulerDriverImpl_updateFramework(
+    MesosSchedulerDriverImpl* self,
+    PyObject* args)
+{
+  PyObject* frameworkObj = nullptr;
+  PyObject* suppressedRolesObj = nullptr;
+
+  if (!PyArg_ParseTuple(args, "OO", &frameworkObj, &suppressedRolesObj)) {
+    return nullptr;
+  }
+
+  FrameworkInfo framework;
+  if (!readPythonProtobuf(frameworkObj, &framework)) {
+    PyErr_Format(PyExc_Exception,
+                 "Could not deserialize Python FrameworkInfo");
+    return nullptr;
+  }
+
+  unique_ptr<vector<string>> suppressedRoles;
+  suppressedRoles = constructFromIterable<string>(suppressedRolesObj);
+  if (!suppressedRoles) {
+    // Exception has been set by constructFromIterable
+    return nullptr;
+  }
+
+  Status status = self->driver->updateFramework(framework, *suppressedRoles);
+  return PyInt_FromLong(status);
+}
+
+
 } // namespace python {
 } // namespace mesos {
diff --git a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp
index 8c98d46..7bc0856 100644
--- a/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp
+++ b/src/python/scheduler/src/mesos/scheduler/mesos_scheduler_driver_impl.hpp
@@ -111,10 +111,13 @@ PyObject* MesosSchedulerDriverImpl_declineOffer(
     MesosSchedulerDriverImpl* self,
     PyObject* args);
 
-PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self);
+PyObject* MesosSchedulerDriverImpl_reviveOffers(
+    MesosSchedulerDriverImpl* self,
+    PyObject* pyRoles);
 
 PyObject* MesosSchedulerDriverImpl_suppressOffers(
-    MesosSchedulerDriverImpl* self);
+    MesosSchedulerDriverImpl* self,
+    PyObject* pyRoles);
 
 PyObject* MesosSchedulerDriverImpl_acknowledgeStatusUpdate(
     MesosSchedulerDriverImpl* self,
@@ -128,6 +131,10 @@ PyObject* MesosSchedulerDriverImpl_reconcileTasks(
     MesosSchedulerDriverImpl* self,
     PyObject* args);
 
+PyObject* MesosSchedulerDriverImpl_updateFramework(
+    MesosSchedulerDriverImpl* self,
+    PyObject* args);
+
 } // namespace python {
 } // namespace mesos {
 


[mesos] 03/03: Added unsuppressing via `updateFramework()` to python example framework.

Posted by bm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bmahler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 72ca397be231d8564442f0ad8289bc22dac94eed
Author: Andrei Sekretenko <as...@mesosphere.io>
AuthorDate: Wed Jul 24 11:02:15 2019 -0400

    Added unsuppressing via `updateFramework()` to python example framework.
    
    This patch ensures that the list of suppressed roles is passed through
    the `updateFramework()` of python V0 bindings. Now the framework
    subscribes with all roles suppressed and after that issues
    UPDATE_FRAMEWORK call to unsuppress offers.
    
    Review: https://reviews.apache.org/r/71084/
---
 src/examples/python/test_framework.py | 28 +++++++++++++++-------------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git a/src/examples/python/test_framework.py b/src/examples/python/test_framework.py
index 27bc052..38ddacf 100755
--- a/src/examples/python/test_framework.py
+++ b/src/examples/python/test_framework.py
@@ -30,9 +30,10 @@ TASK_CPUS = 1
 TASK_MEM = 128
 
 class TestScheduler(mesos.interface.Scheduler):
-    def __init__(self, implicitAcknowledgements, executor):
+    def __init__(self, implicitAcknowledgements, executor, framework):
         self.implicitAcknowledgements = implicitAcknowledgements
         self.executor = executor
+        self.framework = framework
         self.taskData = {}
         self.tasksLaunched = 0
         self.tasksFinished = 0
@@ -41,6 +42,8 @@ class TestScheduler(mesos.interface.Scheduler):
 
     def registered(self, driver, frameworkId, masterInfo):
         print "Registered with framework ID %s" % frameworkId.value
+        self.framework.id.CopyFrom(frameworkId)
+        driver.updateFramework(framework, [])
 
     def resourceOffers(self, driver, offers):
         for offer in offers:
@@ -167,6 +170,7 @@ if __name__ == "__main__":
     framework.user = "" # Have Mesos fill in the current user.
     framework.name = "Test Framework (Python)"
     framework.checkpoint = True
+    framework.role = "*"
 
     implicitAcknowledgements = 1
     if os.getenv("MESOS_EXPLICIT_ACKNOWLEDGEMENTS"):
@@ -188,20 +192,18 @@ if __name__ == "__main__":
 
         framework.principal = os.getenv("MESOS_EXAMPLE_PRINCIPAL")
 
-        driver = MesosSchedulerDriver(
-            TestScheduler(implicitAcknowledgements, executor),
-            framework,
-            sys.argv[1],
-            implicitAcknowledgements,
-            credential)
     else:
         framework.principal = "test-framework-python"
-
-        driver = MesosSchedulerDriver(
-            TestScheduler(implicitAcknowledgements, executor),
-            framework,
-            sys.argv[1],
-            implicitAcknowledgements)
+        credential = None
+
+    # Subscribe with all roles suppressed to test updateFramework() method
+    driver = MesosSchedulerDriver(
+        TestScheduler(implicitAcknowledgements, executor, framework),
+        framework,
+        sys.argv[1],
+        implicitAcknowledgements,
+        credential,
+        [framework.role])
 
     status = 0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1