You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:33:31 UTC

svn commit: r1131641 - in /incubator/mesos/trunk/src/scaling: nested_exec.py scaling_exec.py scaling_sched.py

Author: benh
Date: Sun Jun  5 03:33:30 2011
New Revision: 1131641

URL: http://svn.apache.org/viewvc?rev=1131641&view=rev
Log:
bugfixed versions grabbed from benh instance on ec2

Modified:
    incubator/mesos/trunk/src/scaling/nested_exec.py
    incubator/mesos/trunk/src/scaling/scaling_exec.py
    incubator/mesos/trunk/src/scaling/scaling_sched.py

Modified: incubator/mesos/trunk/src/scaling/nested_exec.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/scaling/nested_exec.py?rev=1131641&r1=1131640&r2=1131641&view=diff
==============================================================================
--- incubator/mesos/trunk/src/scaling/nested_exec.py (original)
+++ incubator/mesos/trunk/src/scaling/nested_exec.py Sun Jun  5 03:33:30 2011
@@ -9,25 +9,27 @@ class NestedExecutor(nexus.Executor):
     nexus.Executor.__init__(self)
     self.tid = -1
 
-  def init(self, args):
+  def init(self, driver, args):
     self.fid = args.frameworkId
 
-  def startTask(self, task):
+  def launchTask(self, driver, task):
     self.tid = task.taskId
     duration = pickle.loads(task.arg)
     print "(%d:%d) Sleeping for %s seconds." % (self.fid, self.tid, duration)
     # TODO(benh): Don't sleep, this blocks the event loop!
     time.sleep(duration)
-    self.sendStatusUpdate(nexus.TaskStatus(self.tid, nexus.TASK_FINISHED, ""))
+    status = nexus.TaskStatus(self.tid, nexus.TASK_FINISHED, "")
+    driver.sendStatusUpdate(status)
 
-  def killTask(self, tid):
+  def killTask(self, driver, tid):
     if (self.tid != tid):
       print "Expecting different task id ... killing anyway!"
-    self.sendStatusUpdate(nexus.TaskStatus(tid, nexus.TASK_FINISHED, ""))
+    status = nexus.TaskStatus(tid, nexus.TASK_FINISHED, "")
+    driver.sendStatusUpdate(status)
 
-  def error(self, code, message):
+  def error(self, driver, code, message):
     print "Error: %s" % message
 
 
 if __name__ == "__main__":
-  NestedExecutor().run()
+  nexus.NexusExecutorDriver(NestedExecutor()).run()

Modified: incubator/mesos/trunk/src/scaling/scaling_exec.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/scaling/scaling_exec.py?rev=1131641&r1=1131640&r2=1131641&view=diff
==============================================================================
--- incubator/mesos/trunk/src/scaling/scaling_exec.py (original)
+++ incubator/mesos/trunk/src/scaling/scaling_exec.py Sun Jun  5 03:33:30 2011
@@ -32,6 +32,8 @@ class NestedScheduler(nexus.Scheduler):
                                      "task %d" % self.tid, offer.params,
                                      pickle.dumps(self.duration))
         tasks.append(task)
+        #msg = nexus.FrameworkMessage(-1, , "")
+        #executor.sendFrameworkMessage("")
     driver.replyToOffer(oid, tasks, {})
 
   def statusUpdate(self, driver, status):
@@ -47,29 +49,29 @@ class ScalingExecutor(nexus.Executor):
   def __init__(self):
     nexus.Executor.__init__(self)
     self.tid = -1
-    self.driver = -1
+    self.nested_driver = -1
 
-  def startTask(self, task):
+  def launchTask(self, driver, task):
     self.tid = task.taskId
     master, (todo, duration) = pickle.loads(task.arg)
     scheduler = NestedScheduler(todo, duration, self)
-    self.driver = nexus.NexusSchedulerDriver(scheduler, master)
-    self.driver.start()
+    self.nested_driver = nexus.NexusSchedulerDriver(scheduler, master)
+    self.nested_driver.start()
 
-  def killTask(self, tid):
+  def killTask(self, driver, tid):
     if (tid != self.tid):
       print "Expecting different task id ... killing anyway!"
-    if self.driver != -1:
-      self.driver.stop()
-      self.driver.join()
-    self.sendStatusUpdate(nexus.TaskStatus(tid, nexus.TASK_FINISHED, ""))
+    if self.nested_driver != -1:
+      self.nested_driver.stop()
+      self.nested_driver.join()
+    driver.sendStatusUpdate(nexus.TaskStatus(tid, nexus.TASK_FINISHED, ""))
 
-  def shutdown(self):
+  def shutdown(self, driver):
     self.killTask(self.tid)
 
-  def error(self, code, message):
+  def error(self, driver, code, message):
     print "Error: %s" % message
 
 
 if __name__ == "__main__":
-  ScalingExecutor().run()
+  nexus.NexusExecutorDriver(ScalingExecutor()).run()

Modified: incubator/mesos/trunk/src/scaling/scaling_sched.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/scaling/scaling_sched.py?rev=1131641&r1=1131640&r2=1131641&view=diff
==============================================================================
--- incubator/mesos/trunk/src/scaling/scaling_sched.py (original)
+++ incubator/mesos/trunk/src/scaling/scaling_sched.py Sun Jun  5 03:33:30 2011
@@ -7,22 +7,43 @@ import os
 import pickle
 
 # Scheduler configurations as pairs of (todo, duration) to run.
+# config = [ (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+#            (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+#            (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+#            (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+#            (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
+#            (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
+#            (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
+#            (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
+#            (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
+#            (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
+#            (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
+#            (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
+#            (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
+#            (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
+#            (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
+#            (100, 1), (100, 1), (100, 1), (100, 1), (100, 1) ]
+
 config = [ (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
-           (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
-           (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
-           (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
-           (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
-           (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
-           (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
-           (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
-           (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
-           (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
-           (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
-           (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
-           (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
-           (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
-           (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
-           (100, 1), (100, 1), (100, 1), (100, 1), (100, 1) ]
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
+           (1, 10), (1, 10), (1, 10), (1, 10), (1, 10) ]
 
 
 class ScalingScheduler(nexus.Scheduler):
@@ -43,9 +64,10 @@ class ScalingScheduler(nexus.Scheduler):
 
   def resourceOffer(self, driver, oid, offers):
     # Make sure the nested schedulers can actually run their tasks.
-    if len(offers) <= len(config):
-      print "Need at least one spare slave to do this work ... exiting!"
-      driver.stop()
+    # if len(offers) <= len(config) and len(config) != self.tid:
+    #   print "Need at least one spare slave to do this work ... exiting!"
+    #   driver.stop()
+    #   return
 
     # Farm out the schedulers!
     tasks = []
@@ -58,6 +80,7 @@ class ScalingScheduler(nexus.Scheduler):
         tasks.append(task)
         self.running[self.tid] = (todo, duration)
         self.tid += 1
+        print "Launching (%d, %d) on slave %d" % (todo, duration, offer.slaveId)
     driver.replyToOffer(oid, tasks, {})
 
   def statusUpdate(self, driver, status):