You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:33:31 UTC
svn commit: r1131641 - in /incubator/mesos/trunk/src/scaling: nested_exec.py
scaling_exec.py scaling_sched.py
Author: benh
Date: Sun Jun 5 03:33:30 2011
New Revision: 1131641
URL: http://svn.apache.org/viewvc?rev=1131641&view=rev
Log:
bugfixed versions grabbed from benh instance on ec2
Modified:
incubator/mesos/trunk/src/scaling/nested_exec.py
incubator/mesos/trunk/src/scaling/scaling_exec.py
incubator/mesos/trunk/src/scaling/scaling_sched.py
Modified: incubator/mesos/trunk/src/scaling/nested_exec.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/scaling/nested_exec.py?rev=1131641&r1=1131640&r2=1131641&view=diff
==============================================================================
--- incubator/mesos/trunk/src/scaling/nested_exec.py (original)
+++ incubator/mesos/trunk/src/scaling/nested_exec.py Sun Jun 5 03:33:30 2011
@@ -9,25 +9,27 @@ class NestedExecutor(nexus.Executor):
nexus.Executor.__init__(self)
self.tid = -1
- def init(self, args):
+ def init(self, driver, args):
self.fid = args.frameworkId
- def startTask(self, task):
+ def launchTask(self, driver, task):
self.tid = task.taskId
duration = pickle.loads(task.arg)
print "(%d:%d) Sleeping for %s seconds." % (self.fid, self.tid, duration)
# TODO(benh): Don't sleep, this blocks the event loop!
time.sleep(duration)
- self.sendStatusUpdate(nexus.TaskStatus(self.tid, nexus.TASK_FINISHED, ""))
+ status = nexus.TaskStatus(self.tid, nexus.TASK_FINISHED, "")
+ driver.sendStatusUpdate(status)
- def killTask(self, tid):
+ def killTask(self, driver, tid):
if (self.tid != tid):
print "Expecting different task id ... killing anyway!"
- self.sendStatusUpdate(nexus.TaskStatus(tid, nexus.TASK_FINISHED, ""))
+ status = nexus.TaskStatus(tid, nexus.TASK_FINISHED, "")
+ driver.sendStatusUpdate(status)
- def error(self, code, message):
+ def error(self, driver, code, message):
print "Error: %s" % message
if __name__ == "__main__":
- NestedExecutor().run()
+ nexus.NexusExecutorDriver(NestedExecutor()).run()
Modified: incubator/mesos/trunk/src/scaling/scaling_exec.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/scaling/scaling_exec.py?rev=1131641&r1=1131640&r2=1131641&view=diff
==============================================================================
--- incubator/mesos/trunk/src/scaling/scaling_exec.py (original)
+++ incubator/mesos/trunk/src/scaling/scaling_exec.py Sun Jun 5 03:33:30 2011
@@ -32,6 +32,8 @@ class NestedScheduler(nexus.Scheduler):
"task %d" % self.tid, offer.params,
pickle.dumps(self.duration))
tasks.append(task)
+ #msg = nexus.FrameworkMessage(-1, , "")
+ #executor.sendFrameworkMessage("")
driver.replyToOffer(oid, tasks, {})
def statusUpdate(self, driver, status):
@@ -47,29 +49,29 @@ class ScalingExecutor(nexus.Executor):
def __init__(self):
nexus.Executor.__init__(self)
self.tid = -1
- self.driver = -1
+ self.nested_driver = -1
- def startTask(self, task):
+ def launchTask(self, driver, task):
self.tid = task.taskId
master, (todo, duration) = pickle.loads(task.arg)
scheduler = NestedScheduler(todo, duration, self)
- self.driver = nexus.NexusSchedulerDriver(scheduler, master)
- self.driver.start()
+ self.nested_driver = nexus.NexusSchedulerDriver(scheduler, master)
+ self.nested_driver.start()
- def killTask(self, tid):
+ def killTask(self, driver, tid):
if (tid != self.tid):
print "Expecting different task id ... killing anyway!"
- if self.driver != -1:
- self.driver.stop()
- self.driver.join()
- self.sendStatusUpdate(nexus.TaskStatus(tid, nexus.TASK_FINISHED, ""))
+ if self.nested_driver != -1:
+ self.nested_driver.stop()
+ self.nested_driver.join()
+ driver.sendStatusUpdate(nexus.TaskStatus(tid, nexus.TASK_FINISHED, ""))
- def shutdown(self):
+ def shutdown(self, driver):
self.killTask(self.tid)
- def error(self, code, message):
+ def error(self, driver, code, message):
print "Error: %s" % message
if __name__ == "__main__":
- ScalingExecutor().run()
+ nexus.NexusExecutorDriver(ScalingExecutor()).run()
Modified: incubator/mesos/trunk/src/scaling/scaling_sched.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/scaling/scaling_sched.py?rev=1131641&r1=1131640&r2=1131641&view=diff
==============================================================================
--- incubator/mesos/trunk/src/scaling/scaling_sched.py (original)
+++ incubator/mesos/trunk/src/scaling/scaling_sched.py Sun Jun 5 03:33:30 2011
@@ -7,22 +7,43 @@ import os
import pickle
# Scheduler configurations as pairs of (todo, duration) to run.
+# config = [ (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+# (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+# (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+# (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+# (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
+# (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
+# (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
+# (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
+# (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
+# (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
+# (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
+# (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
+# (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
+# (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
+# (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
+# (100, 1), (100, 1), (100, 1), (100, 1), (100, 1) ]
+
config = [ (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
- (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
- (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
- (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
- (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
- (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
- (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
- (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
- (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
- (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
- (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
- (100, 1), (100, 1), (100, 1), (100, 1), (100, 1) ]
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+ (1, 10), (1, 10), (1, 10), (1, 10), (1, 10) ]
class ScalingScheduler(nexus.Scheduler):
@@ -43,9 +64,10 @@ class ScalingScheduler(nexus.Scheduler):
def resourceOffer(self, driver, oid, offers):
# Make sure the nested schedulers can actually run their tasks.
- if len(offers) <= len(config):
- print "Need at least one spare slave to do this work ... exiting!"
- driver.stop()
+ # if len(offers) <= len(config) and len(config) != self.tid:
+ # print "Need at least one spare slave to do this work ... exiting!"
+ # driver.stop()
+ # return
# Farm out the schedulers!
tasks = []
@@ -58,6 +80,7 @@ class ScalingScheduler(nexus.Scheduler):
tasks.append(task)
self.running[self.tid] = (todo, duration)
self.tid += 1
+ print "Launching (%d, %d) on slave %d" % (todo, duration, offer.slaveId)
driver.replyToOffer(oid, tasks, {})
def statusUpdate(self, driver, status):