You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/05/02 19:51:27 UTC
git commit: Applied four space indents (PEP8) to Python example
frameworks.
Repository: mesos
Updated Branches:
refs/heads/master 1223ad524 -> 3657550ec
Applied four space indents (PEP8) to Python example frameworks.
Review: https://reviews.apache.org/r/21008
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3657550e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3657550e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3657550e
Branch: refs/heads/master
Commit: 3657550ec605f819bbb863638f494ac7122a0bae
Parents: 1223ad5
Author: Steven Phung <st...@gmail.com>
Authored: Fri May 2 10:50:15 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri May 2 10:51:14 2014 -0700
----------------------------------------------------------------------
src/examples/python/test_executor.py | 60 +++----
src/examples/python/test_framework.py | 273 +++++++++++++++--------------
2 files changed, 167 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3657550e/src/examples/python/test_executor.py
----------------------------------------------------------------------
diff --git a/src/examples/python/test_executor.py b/src/examples/python/test_executor.py
index f34ab3f..065b50a 100755
--- a/src/examples/python/test_executor.py
+++ b/src/examples/python/test_executor.py
@@ -24,35 +24,35 @@ import mesos
import mesos_pb2
class MyExecutor(mesos.Executor):
- def launchTask(self, driver, task):
- # Create a thread to run the task. Tasks should always be run in new
- # threads or processes, rather than inside launchTask itself.
- def run_task():
- print "Running task %s" % task.task_id.value
- update = mesos_pb2.TaskStatus()
- update.task_id.value = task.task_id.value
- update.state = mesos_pb2.TASK_RUNNING
- update.data = 'data with a \0 byte'
- driver.sendStatusUpdate(update)
-
- # This is where one would perform the requested task.
-
- print "Sending status update..."
- update = mesos_pb2.TaskStatus()
- update.task_id.value = task.task_id.value
- update.state = mesos_pb2.TASK_FINISHED
- update.data = 'data with a \0 byte'
- driver.sendStatusUpdate(update)
- print "Sent status update"
-
- thread = threading.Thread(target=run_task)
- thread.start()
-
- def frameworkMessage(self, driver, message):
- # Send it back to the scheduler.
- driver.sendFrameworkMessage(message)
+ def launchTask(self, driver, task):
+ # Create a thread to run the task. Tasks should always be run in new
+ # threads or processes, rather than inside launchTask itself.
+ def run_task():
+ print "Running task %s" % task.task_id.value
+ update = mesos_pb2.TaskStatus()
+ update.task_id.value = task.task_id.value
+ update.state = mesos_pb2.TASK_RUNNING
+ update.data = 'data with a \0 byte'
+ driver.sendStatusUpdate(update)
+
+ # This is where one would perform the requested task.
+
+ print "Sending status update..."
+ update = mesos_pb2.TaskStatus()
+ update.task_id.value = task.task_id.value
+ update.state = mesos_pb2.TASK_FINISHED
+ update.data = 'data with a \0 byte'
+ driver.sendStatusUpdate(update)
+ print "Sent status update"
+
+ thread = threading.Thread(target=run_task)
+ thread.start()
+
+ def frameworkMessage(self, driver, message):
+ # Send it back to the scheduler.
+ driver.sendFrameworkMessage(message)
if __name__ == "__main__":
- print "Starting executor"
- driver = mesos.MesosExecutorDriver(MyExecutor())
- sys.exit(0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1)
+ print "Starting executor"
+ driver = mesos.MesosExecutorDriver(MyExecutor())
+ sys.exit(0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1)
http://git-wip-us.apache.org/repos/asf/mesos/blob/3657550e/src/examples/python/test_framework.py
----------------------------------------------------------------------
diff --git a/src/examples/python/test_framework.py b/src/examples/python/test_framework.py
index deca48e..c37de6e 100755
--- a/src/examples/python/test_framework.py
+++ b/src/examples/python/test_framework.py
@@ -29,141 +29,142 @@ TASK_CPUS = 1
TASK_MEM = 32
class TestScheduler(mesos.Scheduler):
- def __init__(self, executor):
- self.executor = executor
- self.taskData = {}
- self.tasksLaunched = 0
- self.tasksFinished = 0
- self.messagesSent = 0
- self.messagesReceived = 0
-
- def registered(self, driver, frameworkId, masterInfo):
- print "Registered with framework ID %s" % frameworkId.value
-
- def resourceOffers(self, driver, offers):
- print "Got %d resource offers" % len(offers)
- for offer in offers:
- tasks = []
- print "Got resource offer %s" % offer.id.value
- if self.tasksLaunched < TOTAL_TASKS:
- tid = self.tasksLaunched
- self.tasksLaunched += 1
-
- print "Accepting offer on %s to start task %d" % (offer.hostname, tid)
-
- task = mesos_pb2.TaskInfo()
- task.task_id.value = str(tid)
- task.slave_id.value = offer.slave_id.value
- task.name = "task %d" % tid
- task.executor.MergeFrom(self.executor)
-
- cpus = task.resources.add()
- cpus.name = "cpus"
- cpus.type = mesos_pb2.Value.SCALAR
- cpus.scalar.value = TASK_CPUS
-
- mem = task.resources.add()
- mem.name = "mem"
- mem.type = mesos_pb2.Value.SCALAR
- mem.scalar.value = TASK_MEM
-
- tasks.append(task)
- self.taskData[task.task_id.value] = (
- offer.slave_id, task.executor.executor_id)
- driver.launchTasks(offer.id, tasks)
-
- def statusUpdate(self, driver, update):
- print "Task %s is in state %d" % (update.task_id.value, update.state)
-
- # Ensure the binary data came through.
- if update.data != "data with a \0 byte":
- print "The update data did not match!"
- print " Expected: 'data with a \\x00 byte'"
- print " Actual: ", repr(str(update.data))
- sys.exit(1)
-
- if update.state == mesos_pb2.TASK_FINISHED:
- self.tasksFinished += 1
- if self.tasksFinished == TOTAL_TASKS:
- print "All tasks done, waiting for final framework message"
-
- slave_id, executor_id = self.taskData[update.task_id.value]
-
- self.messagesSent += 1
- driver.sendFrameworkMessage(
- executor_id,
- slave_id,
- 'data with a \0 byte')
-
- def frameworkMessage(self, driver, executorId, slaveId, message):
- self.messagesReceived += 1
-
- # The message bounced back as expected.
- if message != "data with a \0 byte":
- print "The returned message data did not match!"
- print " Expected: 'data with a \\x00 byte'"
- print " Actual: ", repr(str(message))
- sys.exit(1)
- print "Received message:", repr(str(message))
-
- if self.messagesReceived == TOTAL_TASKS:
- if self.messagesReceived != self.messagesSent:
- print "Sent", self.messagesSent,
- print "but received", self.messagesReceived
- sys.exit(1)
- print "All tasks done, and all messages received, exiting"
- driver.stop()
+ def __init__(self, executor):
+ self.executor = executor
+ self.taskData = {}
+ self.tasksLaunched = 0
+ self.tasksFinished = 0
+ self.messagesSent = 0
+ self.messagesReceived = 0
+
+ def registered(self, driver, frameworkId, masterInfo):
+ print "Registered with framework ID %s" % frameworkId.value
+
+ def resourceOffers(self, driver, offers):
+ print "Got %d resource offers" % len(offers)
+ for offer in offers:
+ tasks = []
+ print "Got resource offer %s" % offer.id.value
+ if self.tasksLaunched < TOTAL_TASKS:
+ tid = self.tasksLaunched
+ self.tasksLaunched += 1
+
+ print "Accepting offer on %s to start task %d" \
+ % (offer.hostname, tid)
+
+ task = mesos_pb2.TaskInfo()
+ task.task_id.value = str(tid)
+ task.slave_id.value = offer.slave_id.value
+ task.name = "task %d" % tid
+ task.executor.MergeFrom(self.executor)
+
+ cpus = task.resources.add()
+ cpus.name = "cpus"
+ cpus.type = mesos_pb2.Value.SCALAR
+ cpus.scalar.value = TASK_CPUS
+
+ mem = task.resources.add()
+ mem.name = "mem"
+ mem.type = mesos_pb2.Value.SCALAR
+ mem.scalar.value = TASK_MEM
+
+ tasks.append(task)
+ self.taskData[task.task_id.value] = (
+ offer.slave_id, task.executor.executor_id)
+ driver.launchTasks(offer.id, tasks)
+
+ def statusUpdate(self, driver, update):
+ print "Task %s is in state %d" % (update.task_id.value, update.state)
+
+ # Ensure the binary data came through.
+ if update.data != "data with a \0 byte":
+ print "The update data did not match!"
+ print " Expected: 'data with a \\x00 byte'"
+ print " Actual: ", repr(str(update.data))
+ sys.exit(1)
+
+ if update.state == mesos_pb2.TASK_FINISHED:
+ self.tasksFinished += 1
+ if self.tasksFinished == TOTAL_TASKS:
+ print "All tasks done, waiting for final framework message"
+
+ slave_id, executor_id = self.taskData[update.task_id.value]
+
+ self.messagesSent += 1
+ driver.sendFrameworkMessage(
+ executor_id,
+ slave_id,
+ 'data with a \0 byte')
+
+ def frameworkMessage(self, driver, executorId, slaveId, message):
+ self.messagesReceived += 1
+
+ # The message bounced back as expected.
+ if message != "data with a \0 byte":
+ print "The returned message data did not match!"
+ print " Expected: 'data with a \\x00 byte'"
+ print " Actual: ", repr(str(message))
+ sys.exit(1)
+ print "Received message:", repr(str(message))
+
+ if self.messagesReceived == TOTAL_TASKS:
+ if self.messagesReceived != self.messagesSent:
+ print "Sent", self.messagesSent,
+ print "but received", self.messagesReceived
+ sys.exit(1)
+ print "All tasks done, and all messages received, exiting"
+ driver.stop()
if __name__ == "__main__":
- if len(sys.argv) != 2:
- print "Usage: %s master" % sys.argv[0]
- sys.exit(1)
-
- executor = mesos_pb2.ExecutorInfo()
- executor.executor_id.value = "default"
- executor.command.value = os.path.abspath("./test-executor")
- executor.name = "Test Executor (Python)"
- executor.source = "python_test"
-
- framework = mesos_pb2.FrameworkInfo()
- framework.user = "" # Have Mesos fill in the current user.
- framework.name = "Test Framework (Python)"
-
- # TODO(vinod): Make checkpointing the default when it is default
- # on the slave.
- if os.getenv("MESOS_CHECKPOINT"):
- print "Enabling checkpoint for the framework"
- framework.checkpoint = True
-
- if os.getenv("MESOS_AUTHENTICATE"):
- print "Enabling authentication for the framework"
-
- if not os.getenv("DEFAULT_PRINCIPAL"):
- print "Expecting authentication principal in the environment"
- sys.exit(1);
-
- if not os.getenv("DEFAULT_SECRET"):
- print "Expecting authentication secret in the environment"
- sys.exit(1);
-
- credential = mesos_pb2.Credential()
- credential.principal = os.getenv("DEFAULT_PRINCIPAL")
- credential.secret = os.getenv("DEFAULT_SECRET")
-
- driver = mesos.MesosSchedulerDriver(
- TestScheduler(executor),
- framework,
- sys.argv[1],
- credential)
- else:
- driver = mesos.MesosSchedulerDriver(
- TestScheduler(executor),
- framework,
- sys.argv[1])
-
- status = 0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1
-
- # Ensure that the driver process terminates.
- driver.stop();
-
- sys.exit(status)
+ if len(sys.argv) != 2:
+ print "Usage: %s master" % sys.argv[0]
+ sys.exit(1)
+
+ executor = mesos_pb2.ExecutorInfo()
+ executor.executor_id.value = "default"
+ executor.command.value = os.path.abspath("./test-executor")
+ executor.name = "Test Executor (Python)"
+ executor.source = "python_test"
+
+ framework = mesos_pb2.FrameworkInfo()
+ framework.user = "" # Have Mesos fill in the current user.
+ framework.name = "Test Framework (Python)"
+
+ # TODO(vinod): Make checkpointing the default when it is default
+ # on the slave.
+ if os.getenv("MESOS_CHECKPOINT"):
+ print "Enabling checkpoint for the framework"
+ framework.checkpoint = True
+
+ if os.getenv("MESOS_AUTHENTICATE"):
+ print "Enabling authentication for the framework"
+
+ if not os.getenv("DEFAULT_PRINCIPAL"):
+ print "Expecting authentication principal in the environment"
+ sys.exit(1);
+
+ if not os.getenv("DEFAULT_SECRET"):
+ print "Expecting authentication secret in the environment"
+ sys.exit(1);
+
+ credential = mesos_pb2.Credential()
+ credential.principal = os.getenv("DEFAULT_PRINCIPAL")
+ credential.secret = os.getenv("DEFAULT_SECRET")
+
+ driver = mesos.MesosSchedulerDriver(
+ TestScheduler(executor),
+ framework,
+ sys.argv[1],
+ credential)
+ else:
+ driver = mesos.MesosSchedulerDriver(
+ TestScheduler(executor),
+ framework,
+ sys.argv[1])
+
+ status = 0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1
+
+ # Ensure that the driver process terminates.
+ driver.stop();
+
+ sys.exit(status)