You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:02:50 UTC
svn commit: r1132218 - in /incubator/mesos/trunk: ./ src/ src/examples/
src/examples/python/ src/python/native/ src/python/src/
Author: benh
Date: Sun Jun 5 09:02:50 2011
New Revision: 1132218
URL: http://svn.apache.org/viewvc?rev=1132218&view=rev
Log:
Added executor bindings to Python API, fixed the Python test framework,
and made the build process a little nicer.
Added:
incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp
incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp
incubator/mesos/trunk/src/python/native/proxy_executor.cpp
incubator/mesos/trunk/src/python/native/proxy_executor.hpp
Modified:
incubator/mesos/trunk/.gitignore
incubator/mesos/trunk/src/Makefile.in
incubator/mesos/trunk/src/examples/Makefile.in
incubator/mesos/trunk/src/examples/python/test_executor
incubator/mesos/trunk/src/examples/python/test_executor.py
incubator/mesos/trunk/src/examples/python/test_framework
incubator/mesos/trunk/src/examples/python/test_framework.py
incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp
incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.hpp
incubator/mesos/trunk/src/python/native/module.cpp
incubator/mesos/trunk/src/python/native/module.hpp
incubator/mesos/trunk/src/python/src/mesos.py
Modified: incubator/mesos/trunk/.gitignore
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/.gitignore?rev=1132218&r1=1132217&r2=1132218&view=diff
==============================================================================
--- incubator/mesos/trunk/.gitignore (original)
+++ incubator/mesos/trunk/.gitignore Sun Jun 5 09:02:50 2011
@@ -20,8 +20,6 @@
.*
Makefile
autom4te.cache
-bin/master
-bin/slaves
config.cache
config.log
config.status
@@ -110,6 +108,7 @@ third_party/protobuf-2.3.0/gtest/scripts
third_party/protobuf-2.3.0/java/src/main/java/com/google/protobuf/DescriptorProtos.java
third_party/protobuf-2.3.0/protobuf-lite.pc
third_party/protobuf-2.3.0/protobuf.pc
+third_party/protobuf-2.3.0/python/google
third_party/protobuf-2.3.0/src/Makefile.in
third_party/protobuf-2.3.0/src/google/protobuf/compiler/cpp/cpp_test_bad_identifiers.pb.cc
third_party/protobuf-2.3.0/src/google/protobuf/compiler/cpp/cpp_test_bad_identifiers.pb.h
Modified: incubator/mesos/trunk/src/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.in?rev=1132218&r1=1132217&r2=1132218&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.in (original)
+++ incubator/mesos/trunk/src/Makefile.in Sun Jun 5 09:02:50 2011
@@ -205,7 +205,11 @@ MESOS_PYTHON_LIB = $(LIBDIR)/python/_mes
MESOS_PYTHON_LIB_OBJ = python/native/module.o \
python/native/proxy_scheduler.o \
- python/native/mesos_scheduler_driver_impl.o
+ python/native/mesos_scheduler_driver_impl.o \
+ python/native/proxy_executor.o \
+ python/native/mesos_executor_driver_impl.o
+
+MESOS_PYTHON_FILE = $(LIBDIR)/python/mesos.py
# We copy all the webui files into the bin directory.
WEBUI_FILES = $(BINDIR)/webui/bottle-0.8.3 \
@@ -257,8 +261,7 @@ $(DIRECTORIES): %:
mesos.pb.cc: @top_srcdir@/include/mesos.proto
mkdir -p java/generated
- mkdir -p python/generated
- $(PROTOC) -I@top_srcdir@/include --cpp_out=. --java_out=java/generated --python_out=python/generated @top_srcdir@/include/mesos.proto
+ $(PROTOC) -I@top_srcdir@/include --cpp_out=. --java_out=java/generated @top_srcdir@/include/mesos.proto
$(INCLUDEDIR)/mesos.hpp: mesos.pb.cc | $(INCLUDEDIR)
cp mesos.pb.h $(INCLUDEDIR)/mesos.hpp
@@ -352,7 +355,7 @@ ifdef JAVA_HOME
$(CXX) $(CXXFLAGS) -shared -o $@ $(MESOS_JAVA_LIB_OBJ) $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) $(LDFLAGS) $(LIBS)
endif
-python: $(MESOS_PYTHON_LIB)
+python: $(MESOS_PYTHON_LIB) $(MESOS_PYTHON_FILE) $(MESOS_PYTHON_PROTOBUFS)
$(MESOS_PYTHON_LIB_OBJ): %.o: $(SRCDIR)/%.cpp
ifdef PYTHON_HEADERS
@@ -361,16 +364,18 @@ endif
$(MESOS_PYTHON_LIB): $(MESOS_PYTHON_LIB_OBJ) $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) | $(LIBDIR)/python
ifdef PYTHON_HEADERS
- # Compile protocol buffers' descriptor.proto into its source directory
- # to create $(PROTOBUF)/python/google/protobuf/descriptor_pb2.py.
- # This is ugly but it seems to be the only way to have Python see this
- # as part of the google.protobuf module if you want to run Python apps
- # using the protobuf shipped with Mesos.
- $(PROTOC) --python_out=@top_builddir@/$(PROTOBUF)/python -I@top_builddir@/$(PROTOBUF)/src @top_builddir@/$(PROTOBUF)/src/google/protobuf/descriptor.proto
- # Build native Python module with Mesos SchedulerDriver, ExecutorDriver
$(CXX) $(CXXFLAGS) -shared -o $@ $(MESOS_PYTHON_LIB_OBJ) $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) $(LDFLAGS) $(PYTHON_LDFLAGS) $(LIBS)
- # Copy mesos.py to lib
- cp python/src/mesos.py $(LIBDIR)/python/mesos.py
+endif
+
+$(MESOS_PYTHON_FILE): python/src/mesos.py
+ifdef PYTHON_HEADERS
+ cp $< $@
+endif
+
+$(MESOS_PYTHON_PROTOBUFS): @top_srcdir@/include/mesos.proto | $(LIBDIR)/python
+ifdef PYTHON_HEADERS
+ $(PROTOC) --python_out=@top_builddir@/$(PROTOBUF)/python -I@top_builddir@/$(PROTOBUF)/src @top_builddir@/$(PROTOBUF)/src/google/protobuf/descriptor.proto
+ $(PROTOC) -I@top_srcdir@/include --python_out=$(LIBDIR)/python @top_srcdir@/include/mesos.proto
endif
$(WEBUI_FILES): $(BINDIR)/%: $(SRCDIR)/% | $(WEBUI_DIRECTORIES)
Modified: incubator/mesos/trunk/src/examples/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/Makefile.in?rev=1132218&r1=1132217&r2=1132218&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/Makefile.in (original)
+++ incubator/mesos/trunk/src/examples/Makefile.in Sun Jun 5 09:02:50 2011
@@ -146,8 +146,7 @@ ifdef PYTHON_HEADERS
$(MAKE) -C python
endif
-#all: $(EXAMPLES_EXES) java python
-all: $(EXAMPLES_EXES) java
+all: $(EXAMPLES_EXES) java python
clean:
rm -f $(patsubst %, %.d, $(EXAMPLES_EXES))
Modified: incubator/mesos/trunk/src/examples/python/test_executor
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/python/test_executor?rev=1132218&r1=1132217&r2=1132218&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/python/test_executor (original)
+++ incubator/mesos/trunk/src/examples/python/test_executor Sun Jun 5 09:02:50 2011
@@ -2,7 +2,6 @@
if [ "x$PYTHON" == "x" ]; then
PYTHON=python
fi
-FWDIR=`dirname $0`
-MESOS_HOME=`cd $FWDIR/../../..; pwd`
-export PYTHONPATH="$MESOS_HOME/lib/python:$PYTHONPATH"
-exec $PYTHON $FWDIR/test_executor.py $@
+FRAMEWORK_DIR="`cd $(dirname $0); pwd`"
+export PYTHONPATH="$MESOS_HOME/lib/python:$MESOS_HOME/third_party/protobuf-2.3.0/python:$PYTHONPATH"
+exec $PYTHON $FRAMEWORK_DIR/test_executor.py $@
Modified: incubator/mesos/trunk/src/examples/python/test_executor.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/python/test_executor.py?rev=1132218&r1=1132217&r2=1132218&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/python/test_executor.py (original)
+++ incubator/mesos/trunk/src/examples/python/test_executor.py Sun Jun 5 09:02:50 2011
@@ -1,22 +1,27 @@
#!/usr/bin/env python
-import mesos
import sys
+import threading
import time
-class MyExecutor(mesos.Executor):
- def __init__(self):
- mesos.Executor.__init__(self)
+import mesos
+import mesos_pb2
+class MyExecutor(mesos.Executor):
def launchTask(self, driver, task):
- print "Running task %d" % task.taskId
- time.sleep(1)
- print "Sending the update..."
- update = mesos.TaskStatus(task.taskId, mesos.TASK_FINISHED, "")
- driver.sendStatusUpdate(update)
- print "Sent the update"
-
- def error(self, driver, code, message):
- print "Error: %s" % message
+ # Create a thread to run the task. Tasks should always be run in new
+ # threads or processes, rather than inside launchTask itself.
+ def run_task():
+ print "Running task %s" % task.task_id.value
+ time.sleep(1)
+ print "Sending status update..."
+ update = mesos_pb2.TaskStatus()
+ update.task_id.value = task.task_id.value
+ update.slave_id.value = task.slave_id.value
+ update.state = mesos_pb2.TASK_FINISHED
+ driver.sendStatusUpdate(update)
+ print "Sent status update"
+ thread = threading.Thread(target=run_task)
+ thread.start()
if __name__ == "__main__":
print "Starting executor"
Modified: incubator/mesos/trunk/src/examples/python/test_framework
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/python/test_framework?rev=1132218&r1=1132217&r2=1132218&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/python/test_framework (original)
+++ incubator/mesos/trunk/src/examples/python/test_framework Sun Jun 5 09:02:50 2011
@@ -4,5 +4,5 @@ if [ "x$PYTHON" == "x" ]; then
fi
FRAMEWORK_DIR="`cd $(dirname $0); pwd`"
MESOS_HOME="$FRAMEWORK_DIR/../../.."
-export PYTHONPATH="$MESOS_HOME/lib/python:$PYTHONPATH"
-exec $PYTHON $FRAMEWORK_DIR/test_framework.py $@
+export PYTHONPATH="$MESOS_HOME/lib/python:$MESOS_HOME/third_party/protobuf-2.3.0/python:$PYTHONPATH"
+exec $PYTHON $FRAMEWORK_DIR/test_framework.py "$@"
Modified: incubator/mesos/trunk/src/examples/python/test_framework.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/examples/python/test_framework.py?rev=1132218&r1=1132217&r2=1132218&view=diff
==============================================================================
--- incubator/mesos/trunk/src/examples/python/test_framework.py (original)
+++ incubator/mesos/trunk/src/examples/python/test_framework.py Sun Jun 5 09:02:50 2011
@@ -1,9 +1,11 @@
#!/usr/bin/env python
-import mesos
import os
import sys
import time
+import mesos
+import mesos_pb2
+
TOTAL_TASKS = 5
TASK_CPUS = 1
@@ -11,7 +13,6 @@ TASK_MEM = 32
class MyScheduler(mesos.Scheduler):
def __init__(self):
- mesos.Scheduler.__init__(self) # Required to extend Scheduler in Python
self.tasksLaunched = 0
self.tasksFinished = 0
@@ -21,28 +22,37 @@ class MyScheduler(mesos.Scheduler):
def getExecutorInfo(self, driver):
frameworkDir = os.path.abspath(os.path.dirname(sys.argv[0]))
execPath = os.path.join(frameworkDir, "test_executor")
- return mesos.ExecutorInfo(execPath, "")
+ execInfo = mesos_pb2.ExecutorInfo()
+ execInfo.uri = execPath
+ return execInfo
def registered(self, driver, fid):
- print "Registered!"
+ print "Registered with framework ID %s" % fid.value
def resourceOffer(self, driver, oid, offers):
tasks = []
- print "Got a resource offer!"
+ print "Got resource offer %s" % oid.value
for offer in offers:
if self.tasksLaunched < TOTAL_TASKS:
tid = self.tasksLaunched
self.tasksLaunched += 1
- print "Accepting offer on %s to start task %d" % (offer.host, tid)
- params = {"cpus": "%d" % TASK_CPUS, "mem": "%d" % TASK_MEM}
- td = mesos.TaskDescription(tid, offer.slaveId, "task %d" % tid,
- params, "")
- tasks.append(td)
+ print "Accepting offer on %s to start task %d" % (offer.hostname, tid)
+ task = mesos_pb2.TaskDescription()
+ task.task_id.value = str(tid)
+ task.slave_id.value = offer.slave_id.value
+ task.name = "task %d" % tid
+ cpu_param = task.params.param.add()
+ cpu_param.key = "cpus"
+ cpu_param.value = str(TASK_CPUS)
+ mem_param = task.params.param.add()
+ mem_param.key = "mem"
+ mem_param.value = str(TASK_MEM)
+ tasks.append(task)
driver.replyToOffer(oid, tasks, {})
def statusUpdate(self, driver, update):
- print "Task %d is in state %d" % (update.taskId, update.state)
- if update.state == mesos.TASK_FINISHED:
+ print "Task %s is in state %d" % (update.task_id.value, update.state)
+ if update.state == mesos_pb2.TASK_FINISHED:
self.tasksFinished += 1
if self.tasksFinished == TOTAL_TASKS:
print "All tasks done, exiting"
Added: incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp?rev=1132218&view=auto
==============================================================================
--- incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp (added)
+++ incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.cpp Sun Jun 5 09:02:50 2011
@@ -0,0 +1,293 @@
+#include <Python.h>
+
+#include "mesos_executor_driver_impl.hpp"
+#include "module.hpp"
+#include "proxy_executor.hpp"
+
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::string;
+using std::vector;
+using std::map;
+using namespace mesos;
+using namespace mesos::python;
+
+namespace mesos { namespace python {
+
+/**
+ * Python type object for MesosExecutorDriverImpl.
+ */
+PyTypeObject MesosExecutorDriverImplType = {
+ PyObject_HEAD_INIT(NULL)
+ 0, /* ob_size */
+ "_mesos.MesosExecutorDriverImpl", /* tp_name */
+ sizeof(MesosExecutorDriverImpl), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor) MesosExecutorDriverImpl_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /* tp_flags */
+ "Private MesosExecutorDriver implementation", /* tp_doc */
+ (traverseproc) MesosExecutorDriverImpl_traverse, /* tp_traverse */
+ (inquiry) MesosExecutorDriverImpl_clear, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ MesosExecutorDriverImpl_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ (initproc) MesosExecutorDriverImpl_init, /* tp_init */
+ 0, /* tp_alloc */
+ MesosExecutorDriverImpl_new, /* tp_new */
+};
+
+
+/**
+ * List of Python methods in MesosExecutorDriverImpl.
+ */
+PyMethodDef MesosExecutorDriverImpl_methods[] = {
+ {"start", (PyCFunction) MesosExecutorDriverImpl_start, METH_NOARGS,
+ "Start the driver to connect to Mesos"},
+ {"stop", (PyCFunction) MesosExecutorDriverImpl_stop, METH_NOARGS,
+ "Stop the driver, disconnecting from Mesos"},
+ {"join", (PyCFunction) MesosExecutorDriverImpl_join, METH_NOARGS,
+ "Wait for a running driver to disconnect from Mesos"},
+ {"run", (PyCFunction) MesosExecutorDriverImpl_run, METH_NOARGS,
+ "Start a driver and run it, returning when it disconnects from Mesos"},
+ {"sendStatusUpdate",
+ (PyCFunction) MesosExecutorDriverImpl_sendStatusUpdate,
+ METH_VARARGS,
+ "Send a status update for a task"},
+ {"sendFrameworkMessage",
+ (PyCFunction) MesosExecutorDriverImpl_sendFrameworkMessage,
+ METH_VARARGS,
+ "Send a FrameworkMessage to a slave"},
+ {NULL} /* Sentinel */
+};
+
+
+/**
+ * Initialize a MesosExecutorDriverImpl with constructor arguments.
+ */
+PyObject* MesosExecutorDriverImpl_new(PyTypeObject *type,
+ PyObject *args,
+ PyObject *kwds)
+{
+ cout << "In MesosExecutorDriverImpl_new" << endl;
+ MesosExecutorDriverImpl *self;
+ self = (MesosExecutorDriverImpl *) type->tp_alloc(type, 0);
+ if (self != NULL) {
+ self->driver = NULL;
+ self->proxyExecutor = NULL;
+ self->pythonExecutor = NULL;
+ }
+ return (PyObject*) self;
+}
+
+
+/**
+ * Initialize a MesosExecutorDriverImpl (this is its constructor).
+ */
+int MesosExecutorDriverImpl_init(MesosExecutorDriverImpl *self,
+ PyObject *args,
+ PyObject *kwds)
+{
+ cout << "In MesosExecutorDriverImpl_init" << endl;
+ PyObject *pythonExecutor = NULL;
+
+ if (!PyArg_ParseTuple(args, "O", &pythonExecutor)) {
+ return -1;
+ }
+
+ if (pythonExecutor != NULL) {
+ PyObject* tmp = self->pythonExecutor;
+ Py_INCREF(pythonExecutor);
+ self->pythonExecutor = pythonExecutor;
+ Py_XDECREF(tmp);
+ }
+
+ if (self->driver != NULL) {
+ self->driver->stop();
+ delete self->driver;
+ self->driver = NULL;
+ }
+
+ if (self->proxyExecutor != NULL) {
+ delete self->proxyExecutor;
+ self->proxyExecutor = NULL;
+ }
+
+ self->proxyExecutor = new ProxyExecutor(self);
+ self->driver = new MesosExecutorDriver(self->proxyExecutor);
+
+ return 0;
+}
+
+
+/**
+ * Free a MesosExecutorDriverImpl.
+ */
+void MesosExecutorDriverImpl_dealloc(MesosExecutorDriverImpl* self)
+{
+ cout << "In MesosExecutorDriverImpl_dealloc" << endl;
+ if (self->driver != NULL) {
+ self->driver->stop();
+ delete self->driver;
+ self->driver = NULL;
+ }
+ if (self->proxyExecutor != NULL) {
+ delete self->proxyExecutor;
+ self->proxyExecutor = NULL;
+ }
+ MesosExecutorDriverImpl_clear(self);
+ self->ob_type->tp_free((PyObject*) self);
+}
+
+
+/**
+ * Traverse fields of a MesosExecutorDriverImpl on a cyclic GC search.
+ * See http://docs.python.org/extending/newtypes.html.
+ */
+int MesosExecutorDriverImpl_traverse(MesosExecutorDriverImpl* self,
+ visitproc visit,
+ void* arg)
+{
+ Py_VISIT(self->pythonExecutor);
+ return 0;
+}
+
+
+/**
+ * Clear fields of a MesosExecutorDriverImpl that can participate in
+ * GC cycles. See http://docs.python.org/extending/newtypes.html.
+ */
+int MesosExecutorDriverImpl_clear(MesosExecutorDriverImpl* self)
+{
+ Py_CLEAR(self->pythonExecutor);
+ return 0;
+}
+
+
+PyObject* MesosExecutorDriverImpl_start(MesosExecutorDriverImpl* self)
+{
+ if (self->driver == NULL) {
+ PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL");
+ return NULL;
+ }
+
+ int res = self->driver->start();
+ return PyInt_FromLong(res); // Sets an exception if creating the int fails
+}
+
+
+PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self)
+{
+ if (self->driver == NULL) {
+ PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL");
+ return NULL;
+ }
+
+ int res = self->driver->stop();
+ return PyInt_FromLong(res); // Sets an exception if creating the int fails
+}
+
+
+PyObject* MesosExecutorDriverImpl_join(MesosExecutorDriverImpl* self)
+{
+ if (self->driver == NULL) {
+ PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL");
+ return NULL;
+ }
+
+ int res;
+ Py_BEGIN_ALLOW_THREADS
+ res = self->driver->join();
+ Py_END_ALLOW_THREADS
+ return PyInt_FromLong(res); // Sets an exception if creating the int fails
+}
+
+
+PyObject* MesosExecutorDriverImpl_run(MesosExecutorDriverImpl* self)
+{
+ if (self->driver == NULL) {
+ PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL");
+ return NULL;
+ }
+
+ int res;
+ Py_BEGIN_ALLOW_THREADS
+ res = self->driver->run();
+ Py_END_ALLOW_THREADS
+ return PyInt_FromLong(res); // Sets an exception if creating the int fails
+}
+
+
+PyObject* MesosExecutorDriverImpl_sendStatusUpdate(
+ MesosExecutorDriverImpl* self,
+ PyObject* args)
+{
+ if (self->driver == NULL) {
+ PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL");
+ return NULL;
+ }
+
+ PyObject* statusObj = NULL;
+ TaskStatus status;
+ if (!PyArg_ParseTuple(args, "O", &statusObj)) {
+ return NULL;
+ }
+ if (!readPythonProtobuf(statusObj, &status)) {
+ PyErr_Format(PyExc_Exception,
+ "Could not deserialize Python TaskStatus");
+ return NULL;
+ }
+
+ int res = self->driver->sendStatusUpdate(status);
+ return PyInt_FromLong(res); // Sets an exception if creating the int fails
+}
+
+
+PyObject* MesosExecutorDriverImpl_sendFrameworkMessage(
+ MesosExecutorDriverImpl* self,
+ PyObject* args)
+{
+ if (self->driver == NULL) {
+ PyErr_Format(PyExc_Exception, "MesosExecutorDriverImpl.driver is NULL");
+ return NULL;
+ }
+
+ PyObject* msgObj = NULL;
+ FrameworkMessage msg;
+ if (!PyArg_ParseTuple(args, "O", &msgObj)) {
+ return NULL;
+ }
+ if (!readPythonProtobuf(msgObj, &msg)) {
+ PyErr_Format(PyExc_Exception,
+ "Could not deserialize Python FrameworkMessage");
+ return NULL;
+ }
+
+ int res = self->driver->sendFrameworkMessage(msg);
+ return PyInt_FromLong(res); // Sets an exception if creating the int fails
+}
+
+}} /* namespace mesos { namespace python { */
Added: incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp?rev=1132218&view=auto
==============================================================================
--- incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp (added)
+++ incubator/mesos/trunk/src/python/native/mesos_executor_driver_impl.hpp Sun Jun 5 09:02:50 2011
@@ -0,0 +1,83 @@
+#ifndef MESOS_EXECUTOR_DRIVER_IMPL_HPP
+#define MESOS_EXECUTOR_DRIVER_IMPL_HPP
+
+#include "mesos_exec.hpp"
+
+namespace mesos { namespace python {
+
+class ProxyExecutor;
+
+/**
+ * Python object structure for MesosExecutorDriverImpl objects.
+ */
+struct MesosExecutorDriverImpl {
+ PyObject_HEAD
+ /* Type-specific fields go here. */
+ MesosExecutorDriver* driver;
+ ProxyExecutor* proxyExecutor;
+ PyObject* pythonExecutor;
+};
+
+/**
+ * Python type object for MesosExecutorDriverImpl.
+ */
+extern PyTypeObject MesosExecutorDriverImplType;
+
+/**
+ * List of Python methods in MesosExecutorDriverImpl.
+ */
+extern PyMethodDef MesosExecutorDriverImpl_methods[];
+
+/**
+ * Create, but don't initialize, a new MesosExecutorDriverImpl
+ * (called by Python before init method).
+ */
+PyObject* MesosExecutorDriverImpl_new(PyTypeObject *type,
+ PyObject *args,
+ PyObject *kwds);
+
+/**
+ * Initialize a MesosExecutorDriverImpl with constructor arguments.
+ */
+int MesosExecutorDriverImpl_init(MesosExecutorDriverImpl *self,
+ PyObject *args,
+ PyObject *kwds);
+
+/**
+ * Free a MesosExecutorDriverImpl.
+ */
+void MesosExecutorDriverImpl_dealloc(MesosExecutorDriverImpl* self);
+
+/**
+ * Traverse fields of a MesosExecutorDriverImpl on a cyclic GC search.
+ * See http://docs.python.org/extending/newtypes.html.
+ */
+int MesosExecutorDriverImpl_traverse(MesosExecutorDriverImpl* self,
+ visitproc visit,
+ void* arg);
+/**
+ * Clear fields of a MesosExecutorDriverImpl that can participate in
+ * GC cycles. See http://docs.python.org/extending/newtypes.html.
+ */
+int MesosExecutorDriverImpl_clear(MesosExecutorDriverImpl* self);
+
+// MesosExecutorDriverImpl methods
+PyObject* MesosExecutorDriverImpl_start(MesosExecutorDriverImpl* self);
+
+PyObject* MesosExecutorDriverImpl_stop(MesosExecutorDriverImpl* self);
+
+PyObject* MesosExecutorDriverImpl_join(MesosExecutorDriverImpl* self);
+
+PyObject* MesosExecutorDriverImpl_run(MesosExecutorDriverImpl* self);
+
+PyObject* MesosExecutorDriverImpl_sendStatusUpdate(
+ MesosExecutorDriverImpl* self,
+ PyObject* args);
+
+PyObject* MesosExecutorDriverImpl_sendFrameworkMessage(
+ MesosExecutorDriverImpl* self,
+ PyObject* args);
+
+}} /* namespace mesos { namespace python { */
+
+#endif /* MESOS_EXECUTOR_DRIVER_IMPL_HPP */
Modified: incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp?rev=1132218&r1=1132217&r2=1132218&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp (original)
+++ incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.cpp Sun Jun 5 09:02:50 2011
@@ -20,44 +20,44 @@ namespace mesos { namespace python {
*/
PyTypeObject MesosSchedulerDriverImplType = {
PyObject_HEAD_INIT(NULL)
- 0, /* ob_size */
- "_mesos.MesosSchedulerDriverImpl", /* tp_name */
- sizeof(MesosSchedulerDriverImpl), /* tp_basicsize */
- 0, /* tp_itemsize */
- (destructor) MesosSchedulerDriverImpl_dealloc, /* tp_dealloc */
- 0, /* tp_print */
- 0, /* tp_getattr */
- 0, /* tp_setattr */
- 0, /* tp_compare */
- 0, /* tp_repr */
- 0, /* tp_as_number */
- 0, /* tp_as_sequence */
- 0, /* tp_as_mapping */
- 0, /* tp_hash */
- 0, /* tp_call */
- 0, /* tp_str */
- 0, /* tp_getattro */
- 0, /* tp_setattro */
- 0, /* tp_as_buffer */
- Py_TPFLAGS_DEFAULT, /* tp_flags */
- "Private MesosSchedulerDriver implementation", /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- MesosSchedulerDriverImpl_methods, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- (initproc) MesosSchedulerDriverImpl_init, /* tp_init */
- 0, /* tp_alloc */
- MesosSchedulerDriverImpl_new, /* tp_new */
+ 0, /* ob_size */
+ "_mesos.MesosSchedulerDriverImpl", /* tp_name */
+ sizeof(MesosSchedulerDriverImpl), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor) MesosSchedulerDriverImpl_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /* tp_flags */
+ "Private MesosSchedulerDriver implementation", /* tp_doc */
+ (traverseproc) MesosSchedulerDriverImpl_traverse, /* tp_traverse */
+ (inquiry) MesosSchedulerDriverImpl_clear, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ MesosSchedulerDriverImpl_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ (initproc) MesosSchedulerDriverImpl_init, /* tp_init */
+ 0, /* tp_alloc */
+ MesosSchedulerDriverImpl_new, /* tp_new */
};
@@ -175,15 +175,41 @@ void MesosSchedulerDriverImpl_dealloc(Me
if (self->driver != NULL) {
self->driver->stop();
delete self->driver;
+ self->driver = NULL;
}
if (self->proxyScheduler != NULL) {
delete self->proxyScheduler;
+ self->proxyScheduler = NULL;
}
- Py_XDECREF(self->pythonScheduler);
+ MesosSchedulerDriverImpl_clear(self);
self->ob_type->tp_free((PyObject*) self);
}
+/**
+ * Traverse fields of a MesosSchedulerDriverImpl on a cyclic GC search.
+ * See http://docs.python.org/extending/newtypes.html.
+ */
+int MesosSchedulerDriverImpl_traverse(MesosSchedulerDriverImpl* self,
+ visitproc visit,
+ void* arg)
+{
+ Py_VISIT(self->pythonScheduler);
+ return 0;
+}
+
+
+/**
+ * Clear fields of a MesosSchedulerDriverImpl that can participate in
+ * GC cycles. See http://docs.python.org/extending/newtypes.html.
+ */
+int MesosSchedulerDriverImpl_clear(MesosSchedulerDriverImpl* self)
+{
+ Py_CLEAR(self->pythonScheduler);
+ return 0;
+}
+
+
PyObject* MesosSchedulerDriverImpl_start(MesosSchedulerDriverImpl* self)
{
if (self->driver == NULL) {
Modified: incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.hpp?rev=1132218&r1=1132217&r2=1132218&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.hpp (original)
+++ incubator/mesos/trunk/src/python/native/mesos_scheduler_driver_impl.hpp Sun Jun 5 09:02:50 2011
@@ -48,6 +48,19 @@ int MesosSchedulerDriverImpl_init(MesosS
*/
void MesosSchedulerDriverImpl_dealloc(MesosSchedulerDriverImpl* self);
+/**
+ * Traverse fields of a MesosSchedulerDriverImpl on a cyclic GC search.
+ * See http://docs.python.org/extending/newtypes.html.
+ */
+int MesosSchedulerDriverImpl_traverse(MesosSchedulerDriverImpl* self,
+ visitproc visit,
+ void* arg);
+/**
+ * Clear fields of a MesosSchedulerDriverImpl that can participate in
+ * GC cycles. See http://docs.python.org/extending/newtypes.html.
+ */
+int MesosSchedulerDriverImpl_clear(MesosSchedulerDriverImpl* self);
+
// MesosSchedulerDriverImpl methods
PyObject* MesosSchedulerDriverImpl_start(MesosSchedulerDriverImpl* self);
Modified: incubator/mesos/trunk/src/python/native/module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/module.cpp?rev=1132218&r1=1132217&r2=1132218&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/module.cpp (original)
+++ incubator/mesos/trunk/src/python/native/module.cpp Sun Jun 5 09:02:50 2011
@@ -20,6 +20,8 @@
#include "module.hpp"
#include "proxy_scheduler.hpp"
#include "mesos_scheduler_driver_impl.hpp"
+#include "proxy_executor.hpp"
+#include "mesos_executor_driver_impl.hpp"
using std::cout;
using std::cerr;
@@ -51,7 +53,7 @@ PyMethodDef MODULE_METHODS[] = {
/**
- * Called by Python to initialize our module.
+ * Entry point called by Python to initialize our module.
*/
PyMODINIT_FUNC init_mesos(void)
{
@@ -63,15 +65,20 @@ PyMODINIT_FUNC init_mesos(void)
if (mesos_pb2 == NULL)
return;
- // Initialize the MesosSchedulerDriverImpl type
+ // Initialize our Python types
if (PyType_Ready(&MesosSchedulerDriverImplType) < 0)
return;
+ if (PyType_Ready(&MesosExecutorDriverImplType) < 0)
+ return;
// Create the _mesos module and add our types to it
PyObject* module = Py_InitModule("_mesos", MODULE_METHODS);
-
Py_INCREF(&MesosSchedulerDriverImplType);
PyModule_AddObject(module,
"MesosSchedulerDriverImpl",
(PyObject*) &MesosSchedulerDriverImplType);
+ Py_INCREF(&MesosExecutorDriverImplType);
+ PyModule_AddObject(module,
+ "MesosExecutorDriverImpl",
+ (PyObject*) &MesosExecutorDriverImplType);
}
Modified: incubator/mesos/trunk/src/python/native/module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/module.hpp?rev=1132218&r1=1132217&r2=1132218&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/native/module.hpp (original)
+++ incubator/mesos/trunk/src/python/native/module.hpp Sun Jun 5 09:02:50 2011
@@ -7,8 +7,6 @@
#include <google/protobuf/io/zero_copy_stream_impl.h>
-#include "mesos_sched.hpp"
-
namespace mesos { namespace python {
/**
@@ -19,7 +17,7 @@ extern PyObject* mesos_pb2;
/**
- * RAAI utility class for acquiring the Python global interpreter lock.
+ * RAII utility class for acquiring the Python global interpreter lock.
*/
class InterpreterLock {
PyGILState_STATE state;
Added: incubator/mesos/trunk/src/python/native/proxy_executor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/proxy_executor.cpp?rev=1132218&view=auto
==============================================================================
--- incubator/mesos/trunk/src/python/native/proxy_executor.cpp (added)
+++ incubator/mesos/trunk/src/python/native/proxy_executor.cpp Sun Jun 5 09:02:50 2011
@@ -0,0 +1,198 @@
+#include <iostream>
+
+#include "proxy_executor.hpp"
+#include "module.hpp"
+#include "mesos_executor_driver_impl.hpp"
+
+using std::cout;
+using std::cerr;
+using std::endl;
+using std::string;
+using std::vector;
+using std::map;
+using namespace mesos;
+
+namespace mesos { namespace python {
+
+void ProxyExecutor::init(ExecutorDriver* driver,
+ const ExecutorArgs& args)
+{
+ cout << "ProxyExecutor::init being called" << endl;
+ InterpreterLock lock;
+
+ PyObject* argsObj = NULL;
+ PyObject* res = NULL;
+
+ argsObj = createPythonProtobuf(args, "ExecutorArgs");
+ if (argsObj == NULL) {
+ goto cleanup; // createPythonProtobuf will have set an exception
+ }
+
+ res = PyObject_CallMethod(impl->pythonExecutor,
+ (char*) "init",
+ (char*) "OO",
+ impl,
+ argsObj);
+ if (res == NULL) {
+ cerr << "Failed to call executor's init" << endl;
+ goto cleanup;
+ }
+
+cleanup:
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ driver->stop();
+ }
+ Py_XDECREF(argsObj);
+ Py_XDECREF(res);
+}
+
+
+void ProxyExecutor::launchTask(ExecutorDriver* driver,
+ const TaskDescription& task)
+{
+ cout << "ProxyExecutor::launchTask being called" << endl;
+ InterpreterLock lock;
+
+ PyObject* taskObj = NULL;
+ PyObject* res = NULL;
+
+ taskObj = createPythonProtobuf(task, "TaskDescription");
+ if (taskObj == NULL) {
+ goto cleanup; // createPythonProtobuf will have set an exception
+ }
+
+ res = PyObject_CallMethod(impl->pythonExecutor,
+ (char*) "launchTask",
+ (char*) "OO",
+ impl,
+ taskObj);
+ if (res == NULL) {
+ cerr << "Failed to call executor's launchTask" << endl;
+ goto cleanup;
+ }
+
+cleanup:
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ driver->stop();
+ }
+ Py_XDECREF(taskObj);
+ Py_XDECREF(res);
+}
+
+
+void ProxyExecutor::killTask(ExecutorDriver* driver,
+ const TaskID& taskId)
+{
+ cout << "ProxyExecutor::killTask being called" << endl;
+ InterpreterLock lock;
+
+ PyObject* taskIdObj = NULL;
+ PyObject* res = NULL;
+
+ taskIdObj = createPythonProtobuf(taskId, "TaskID");
+ if (taskIdObj == NULL) {
+ goto cleanup; // createPythonProtobuf will have set an exception
+ }
+
+ res = PyObject_CallMethod(impl->pythonExecutor,
+ (char*) "killTask",
+ (char*) "OO",
+ impl,
+ taskIdObj);
+ if (res == NULL) {
+ cerr << "Failed to call executor's killTask" << endl;
+ goto cleanup;
+ }
+
+cleanup:
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ driver->stop();
+ }
+ Py_XDECREF(taskIdObj);
+ Py_XDECREF(res);
+}
+
+
+void ProxyExecutor::frameworkMessage(ExecutorDriver* driver,
+ const FrameworkMessage& message)
+{
+ cout << "ProxyExecutor::frameworkMessage being called" << endl;
+ InterpreterLock lock;
+
+ PyObject* messageObj = NULL;
+ PyObject* res = NULL;
+
+ messageObj = createPythonProtobuf(message, "FrameworkMessage");
+ if (messageObj == NULL) {
+ goto cleanup; // createPythonProtobuf will have set an exception
+ }
+
+ res = PyObject_CallMethod(impl->pythonExecutor,
+ (char*) "frameworkMessage",
+ (char*) "OO",
+ impl,
+ messageObj);
+ if (res == NULL) {
+ cerr << "Failed to call executor's frameworkMessage" << endl;
+ goto cleanup;
+ }
+
+cleanup:
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ driver->stop();
+ }
+ Py_XDECREF(messageObj);
+ Py_XDECREF(res);
+}
+
+
+void ProxyExecutor::shutdown(ExecutorDriver* driver)
+{
+ cout << "ProxyExecutor::shutdown being called" << endl;
+ InterpreterLock lock;
+ PyObject* res = PyObject_CallMethod(impl->pythonExecutor,
+ (char*) "shutdown",
+ (char*) "O",
+ impl);
+ if (res == NULL) {
+ cerr << "Failed to call executor's shutdown" << endl;
+ goto cleanup;
+ }
+cleanup:
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ driver->stop();
+ }
+ Py_XDECREF(res);
+}
+
+
+void ProxyExecutor::error(ExecutorDriver* driver,
+ int code,
+ const string& message)
+{
+ cout << "ProxyExecutor::error being called" << endl;
+ InterpreterLock lock;
+ PyObject* res = PyObject_CallMethod(impl->pythonExecutor,
+ (char*) "error",
+ (char*) "Ois",
+ impl,
+ code,
+ message.c_str());
+ if (res == NULL) {
+ cerr << "Failed to call executor's error" << endl;
+ goto cleanup;
+ }
+cleanup:
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ // No need for driver.stop(); it should stop itself
+ }
+ Py_XDECREF(res);
+}
+
+}} /* namespace mesos { namespace python { */
Added: incubator/mesos/trunk/src/python/native/proxy_executor.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/native/proxy_executor.hpp?rev=1132218&view=auto
==============================================================================
--- incubator/mesos/trunk/src/python/native/proxy_executor.hpp (added)
+++ incubator/mesos/trunk/src/python/native/proxy_executor.hpp Sun Jun 5 09:02:50 2011
@@ -0,0 +1,46 @@
+#ifndef PROXY_EXECUTOR_HPP
+#define PROXY_EXECUTOR_HPP
+
+#include <Python.h>
+
+#include <string>
+#include <vector>
+
+#include "mesos_exec.hpp"
+
+namespace mesos { namespace python {
+
+struct MesosExecutorDriverImpl;
+
+/**
+ * Proxy Executor implementation that will call into Python
+ */
+class ProxyExecutor : public Executor
+{
+ MesosExecutorDriverImpl *impl;
+
+public:
+ ProxyExecutor(MesosExecutorDriverImpl *_impl) : impl(_impl) {}
+
+ virtual ~ProxyExecutor() {}
+
+ virtual void init(ExecutorDriver* driver, const ExecutorArgs& args);
+
+ virtual void launchTask(ExecutorDriver* driver,
+ const TaskDescription& task);
+
+ virtual void killTask(ExecutorDriver* driver, const TaskID& taskId);
+
+ virtual void frameworkMessage(ExecutorDriver* driver,
+ const FrameworkMessage& message);
+
+ virtual void shutdown(ExecutorDriver* driver);
+
+ virtual void error(ExecutorDriver* driver,
+ int code,
+ const std::string& message);
+};
+
+}} /* namespace mesos { namespace python { */
+
+#endif /* PROXY_EXECUTOR_HPP */
Modified: incubator/mesos/trunk/src/python/src/mesos.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/python/src/mesos.py?rev=1132218&r1=1132217&r2=1132218&view=diff
==============================================================================
--- incubator/mesos/trunk/src/python/src/mesos.py (original)
+++ incubator/mesos/trunk/src/python/src/mesos.py Sun Jun 5 09:02:50 2011
@@ -30,3 +30,41 @@ class SchedulerDriver:
def killTask(self, taskId): pass
def replyToOffer(self, offerId, tasks, params = None): pass
def reviveOffers(self): pass
+
+
+# Base class for Mesos executors. Users' executors should extend this class
+# to get default implementations of methods they don't override.
+class Executor:
+ def init(self, driver, args): pass
+ def launchTask(self, driver, task): pass
+ def killTask(self, driver, taskId): pass
+ def frameworkMessage(self, driver, message): pass
+ def shutdown(self, driver): pass
+
+ # Default implementation of error() prints to stderr because we can't
+ # make error() an abstract method in Python
+ def error(self, driver, code, message):
+ print >> sys.stderr, "Error from Mesos: %s (code: %d)" % (message, code)
+
+
+# Interface for Mesos executor drivers. Users may wish to extend this class
+# in mock objects for tests.
+class ExecutorDriver:
+ def start(self): pass
+ def stop(self): pass
+ def join(self): pass
+ def run(self): pass
+ def sendStatusUpdate(self, status): pass
+ def sendFrameworkMessage(self, message): pass
+
+
+# Alias the MesosSchedulerDriverImpl from _mesos. Ideally we would make this
+# class inherit from SchedulerDriver somehow, but this complicates the C++
+# code, and there seems to be no point in doing it in a dynamic language.
+MesosSchedulerDriver = _mesos.MesosSchedulerDriverImpl
+
+
+# Alias the MesosExecutorDriverImpl from _mesos. Ideally we would make this
+# class inherit from ExecutorDriver somehow, but this complicates the C++
+# code, and there seems to be no point in doing it in a dynamic language.
+MesosExecutorDriver = _mesos.MesosExecutorDriverImpl