You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/02/21 23:26:54 UTC
[4/8] mesos git commit: Updated Python bindings for explicit
acknowledgements.
Updated Python bindings for explicit acknowledgements.
Review: https://reviews.apache.org/r/30973
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e4013c0a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e4013c0a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e4013c0a
Branch: refs/heads/master
Commit: e4013c0ac158c4e2851bedd662f6b48ea503b80c
Parents: 28d5f93
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:26:44 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:32 2015 -0800
----------------------------------------------------------------------
.../interface/src/mesos/interface/__init__.py | 28 +++++++---
.../native/mesos_scheduler_driver_impl.cpp | 54 ++++++++++++++++++--
.../native/mesos_scheduler_driver_impl.hpp | 4 ++
3 files changed, 75 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e4013c0a/src/python/interface/src/mesos/interface/__init__.py
----------------------------------------------------------------------
diff --git a/src/python/interface/src/mesos/interface/__init__.py b/src/python/interface/src/mesos/interface/__init__.py
index 8af649c..f3d96a4 100644
--- a/src/python/interface/src/mesos/interface/__init__.py
+++ b/src/python/interface/src/mesos/interface/__init__.py
@@ -87,14 +87,17 @@ class Scheduler(object):
def statusUpdate(self, driver, status):
"""
- Invoked when the status of a task has changed (e.g., a slave is lost
- and so the task is lost, a task finishes and an executor sends a
- status update saying so, etc.) Note that returning from this callback
- acknowledges receipt of this status update. If for whatever reason
- the scheduler aborts during this callback (or the process exits)
- another status update will be delivered. Note, however, that this is
- currently not true if the slave sending the status update is lost or
- fails during that time.
+ Invoked when the status of a task has changed (e.g., a slave is
+ lost and so the task is lost, a task finishes and an executor
+ sends a status update saying so, etc). If implicit
+ acknowledgements are being used, then returning from this
+ callback _acknowledges_ receipt of this status update! If for
+ whatever reason the scheduler aborts during this callback (or
+ the process exits) another status update will be delivered (note,
+ however, that this is currently not true if the slave sending the
+ status update is lost/fails during that time). If explicit
+ acknowledgements are in use, the scheduler must acknowledge this
+ status on the driver.
"""
def frameworkMessage(self, driver, executorId, slaveId, message):
@@ -214,6 +217,15 @@ class SchedulerDriver(object):
those filtered slaves.
"""
+ def acknowledgeStatusUpdate(self, status):
+ """
+ Acknowledges the status update. This should only be called
+ once the status update is processed durably by the scheduler.
+ Not that explicit acknowledgements must be requested via the
+ constructor argument, otherwise a call to this method will
+ cause the driver to crash.
+ """
+
def sendFrameworkMessage(self, executorId, slaveId, data):
"""
Sends a message from the framework to one of its executors. These
http://git-wip-us.apache.org/repos/asf/mesos/blob/e4013c0a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp
----------------------------------------------------------------------
diff --git a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp
index 1badf55..bb18845 100644
--- a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp
+++ b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.cpp
@@ -138,6 +138,11 @@ PyMethodDef MesosSchedulerDriverImpl_methods[] = {
METH_NOARGS,
"Remove all filters and ask Mesos for new offers"
},
+ { "acknowledgeStatusUpdate",
+ (PyCFunction) MesosSchedulerDriverImpl_acknowledgeStatusUpdate,
+ METH_VARARGS,
+ "Acknowledge a status update"
+ },
{ "sendFrameworkMessage",
(PyCFunction) MesosSchedulerDriverImpl_sendFrameworkMessage,
METH_VARARGS,
@@ -178,13 +183,22 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
PyObject* args,
PyObject* kwds)
{
+ // Note: We use an integer for 'implicitAcknoweldgements' because
+ // it is the recommended way to pass booleans through CPython.
PyObject* schedulerObj = NULL;
PyObject* frameworkObj = NULL;
const char* master;
+ int implicitAcknowledgements;
PyObject* credentialObj = NULL;
if (!PyArg_ParseTuple(
- args, "OOs|O", &schedulerObj, &frameworkObj, &master, &credentialObj)) {
+ args,
+ "OOs|iO",
+ &schedulerObj,
+ &frameworkObj,
+ &master,
+ &implicitAcknowledgements,
+ &credentialObj)) {
return -1;
}
@@ -227,10 +241,17 @@ int MesosSchedulerDriverImpl_init(MesosSchedulerDriverImpl* self,
if (credentialObj != NULL) {
self->driver = new MesosSchedulerDriver(
- self->proxyScheduler, framework, master, credential);
+ self->proxyScheduler,
+ framework,
+ master,
+ implicitAcknowledgements != 0,
+ credential);
} else {
self->driver = new MesosSchedulerDriver(
- self->proxyScheduler, framework, master);
+ self->proxyScheduler,
+ framework,
+ master,
+ implicitAcknowledgements != 0);
}
return 0;
@@ -549,6 +570,33 @@ PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self)
}
+PyObject* MesosSchedulerDriverImpl_acknowledgeStatusUpdate(
+ MesosSchedulerDriverImpl* self,
+ PyObject* args)
+{
+ if (self->driver == NULL) {
+ PyErr_Format(PyExc_Exception, "MesosSchedulerDriverImpl.driver is NULL");
+ return NULL;
+ }
+
+ PyObject* taskStatusObj = NULL;
+ TaskStatus taskStatus;
+
+ if (!PyArg_ParseTuple(args, "O", &taskStatusObj)) {
+ return NULL;
+ }
+
+ if (!readPythonProtobuf(taskStatusObj, &taskStatus)) {
+ PyErr_Format(PyExc_Exception, "Could not deserialize Python TaskStatus");
+ return NULL;
+ }
+
+ Status status = self->driver->acknowledgeStatusUpdate(taskStatus);
+
+ return PyInt_FromLong(status); // Sets exception if creating long fails.
+}
+
+
PyObject* MesosSchedulerDriverImpl_sendFrameworkMessage(
MesosSchedulerDriverImpl* self,
PyObject* args)
http://git-wip-us.apache.org/repos/asf/mesos/blob/e4013c0a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp
----------------------------------------------------------------------
diff --git a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp
index d15dfb9..a698000 100644
--- a/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp
+++ b/src/python/native/src/mesos/native/mesos_scheduler_driver_impl.hpp
@@ -111,6 +111,10 @@ PyObject* MesosSchedulerDriverImpl_declineOffer(
PyObject* MesosSchedulerDriverImpl_reviveOffers(MesosSchedulerDriverImpl* self);
+PyObject* MesosSchedulerDriverImpl_acknowledgeStatusUpdate(
+ MesosSchedulerDriverImpl* self,
+ PyObject* args);
+
PyObject* MesosSchedulerDriverImpl_sendFrameworkMessage(
MesosSchedulerDriverImpl* self,
PyObject* args);