You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/02/21 23:26:54 UTC

[4/8] mesos git commit: Updated Python bindings for explicit acknowledgements.

Updated Python bindings for explicit acknowledgements.

Review: https://reviews.apache.org/r/30973


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e4013c0a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e4013c0a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e4013c0a

Branch: refs/heads/master
Commit: e4013c0ac158c4e2851bedd662f6b48ea503b80c
Parents: 28d5f93
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:26:44 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:32 2015 -0800

----------------------------------------------------------------------
 .../interface/src/mesos/interface/__init__.py   | 28 +++++++---
 .../native/mesos_scheduler_driver_impl.cpp      | 54 ++++++++++++++++++--
 .../native/mesos_scheduler_driver_impl.hpp      |  4 ++
 3 files changed, 75 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e4013c0a/src/python/interface/src/mesos/interface/__init__.py
----------------------------------------------------------------------
diff --git a/src/python/interface/src/mesos/interface/__init__.py b/src/python/interface/src/mesos/interface/__init__.py
index 8af649c..f3d96a4 100644
--- a/src/python/interface/src/mesos/interface/__init__.py
+++ b/src/python/interface/src/mesos/interface/__init__.py
@@ -87,14 +87,17 @@ class Scheduler(object):
 
   def statusUpdate(self, driver, status):
     """
-      Invoked when the status of a task has changed (e.g., a slave is lost
-      and so the task is lost, a task finishes and an executor sends a
-      status update saying so, etc.) Note that returning from this callback
-      acknowledges receipt of this status update.  If for whatever reason
-      the scheduler aborts during this callback (or the process exits)
-      another status update will be delivered.  Note, however, that this is
-      currently not true if the slave sending the status update is lost or
-      fails during that time.
+      Invoked when the status of a task has changed (e.g., a slave is
+      lost and so the task is lost, a task finishes and an executor
+      sends a status update saying so, etc). If implicit
+      acknowledgements are being used, then returning from this
+      callback _acknowledges_ receipt of this status update! If for
+      whatever reason the scheduler aborts during this callback (or
+      the process exits) another status update will be delivered (note,
+      however, that this is currently not true if the slave sending the
+      status update is lost/fails during that time). If explicit
+      acknowledgements are in use, the scheduler must acknowledge this
+      status on the driver.
     """
 
   def frameworkMessage(self, driver, executorId, slaveId, message):
@@ -214,6 +217,15 @@ class SchedulerDriver(object):
       those filtered slaves.
     """
 
+  def acknowledgeStatusUpdate(self, status):
+    """
+      Acknowledges the status update. This should only be called
+      once the status update is processed durably by the scheduler.
+      Not that explicit acknowledgements must be requested via the
+      constructor argument, otherwise a call to this method will
+      cause the driver to crash.
+    """
+
   def sendFrameworkMessage(self, executorId, slaveId, data):
     """
       Sends a message from the framework to one of its executors. These

http://git-wip-us.apache.org/repos/asf/mesos/blob/e4013c0a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp
----------------------------------------------------------------------
diff --git a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp
index 1badf55..bb18845 100644
--- a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp
+++ b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp
@@ -138,6 +138,11 @@ PyMethodDef MesosSchedulerDriverImpl_methods[] = {
     METH_NOARGS,
     "Remove all filters and ask Mesos for new offers"
   },
+  { "acknowledgeStatusUpdate",
+    (PyCFunction) MesosSchedulerDriverImpl_acknowledgeStatusUpdate,
+    METH_VARARGS,
+    "Acknowledge a status update"
+  },
   { "sendFrameworkMessage",
     (PyCFunction) MesosSchedulerDriverImpl_sendFrameworkMessage,
     METH_VARARGS,
@@ -178,13 +183,22 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
                                   PyObject* args,
                                   PyObject* kwds)
 {
+  // Note: We use an integer for 'implicitAcknoweldgements' because
+  // it is the recommended way to pass booleans through CPython.
   PyObject* schedulerObj = NULL;
   PyObject* frameworkObj = NULL;
   const char* master;
+  int implicitAcknowledgements;
   PyObject* credentialObj = NULL;
 
   if (!PyArg_ParseTuple(
-      args, "OOs|O", &schedulerObj, &frameworkObj, &master, &credentialObj)) {
+      args,
+      "OOs|iO",
+      &schedulerObj,
+      &frameworkObj,
+      &master,
+      &implicitAcknowledgements,
+      &credentialObj)) {
     return -1;
   }
 
@@ -227,10 +241,17 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
 
   if (credentialObj != NULL) {
     self->driver = new MesosSchedulerDriver(
-        self->proxyScheduler, framework, master, credential);
+        self->proxyScheduler,
+        framework,
+        master,
+        implicitAcknowledgements != 0,
+        credential);
   } else {
     self->driver = new MesosSchedulerDriver(
-        self->proxyScheduler, framework, master);
+        self->proxyScheduler,
+        framework,
+        master,
+        implicitAcknowledgements != 0);
   }
 
   return 0;
@@ -549,6 +570,33 @@ PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self)
 }
 
 
+PyObject* MesosSchedulerDriverImpl_acknowledgeStatusUpdate(
+    MesosSchedulerDriverImpl* self,
+    PyObject* args)
+{
+  if (self->driver == NULL) {
+    PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL");
+    return NULL;
+  }
+
+  PyObject* taskStatusObj = NULL;
+  TaskStatus taskStatus;
+
+  if (!PyArg_ParseTuple(args, "O", &taskStatusObj)) {
+    return NULL;
+  }
+
+  if (!readPythonProtobuf(taskStatusObj, &taskStatus)) {
+    PyErr_Format(PyExc_Exception, "Could not deserialize Python TaskStatus");
+    return NULL;
+  }
+
+  Status status = self->driver->acknowledgeStatusUpdate(taskStatus);
+
+  return PyInt_FromLong(status); // Sets exception if creating long fails.
+}
+
+
 PyObject* MesosSchedulerDriverImpl_sendFrameworkMessage(
     MesosSchedulerDriverImpl* self,
     PyObject* args)

http://git-wip-us.apache.org/repos/asf/mesos/blob/e4013c0a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp
----------------------------------------------------------------------
diff --git a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp
index d15dfb9..a698000 100644
--- a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp
+++ b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp
@@ -111,6 +111,10 @@ PyObject* MesosSchedulerDriverImpl_declineOffer(
 
 PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self);
 
+PyObject* MesosSchedulerDriverImpl_acknowledgeStatusUpdate(
+    MesosSchedulerDriverImpl* self,
+    PyObject* args);
+
 PyObject* MesosSchedulerDriverImpl_sendFrameworkMessage(
     MesosSchedulerDriverImpl* self,
     PyObject* args);