You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 06:54:26 UTC
svn commit: r1131659 - in /incubator/mesos/trunk/src/scaling: nested_exec
nested_exec.py scaling_exec scaling_exec.py scaling_sched scaling_sched.py
Author: benh
Date: Sun Jun 5 04:54:26 2011
New Revision: 1131659
URL: http://svn.apache.org/viewvc?rev=1131659&view=rev
Log:
latest scaling
Modified:
incubator/mesos/trunk/src/scaling/nested_exec
incubator/mesos/trunk/src/scaling/nested_exec.py
incubator/mesos/trunk/src/scaling/scaling_exec
incubator/mesos/trunk/src/scaling/scaling_exec.py
incubator/mesos/trunk/src/scaling/scaling_sched
incubator/mesos/trunk/src/scaling/scaling_sched.py
Modified: incubator/mesos/trunk/src/scaling/nested_exec
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/scaling/nested_exec?rev=1131659&r1=1131658&r2=1131659&view=diff
==============================================================================
--- incubator/mesos/trunk/src/scaling/nested_exec (original)
+++ incubator/mesos/trunk/src/scaling/nested_exec Sun Jun 5 04:54:26 2011
@@ -1,3 +1,4 @@
#!/bin/sh
export PYTHONPATH="$PYTHONPATH:../swig/python"
+cd `dirname $0`
exec ./nested_exec.py $@
Modified: incubator/mesos/trunk/src/scaling/nested_exec.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/scaling/nested_exec.py?rev=1131659&r1=1131658&r2=1131659&view=diff
==============================================================================
--- incubator/mesos/trunk/src/scaling/nested_exec.py (original)
+++ incubator/mesos/trunk/src/scaling/nested_exec.py Sun Jun 5 04:54:26 2011
@@ -15,12 +15,13 @@ class NestedExecutor(nexus.Executor):
def launchTask(self, driver, task):
self.tid = task.taskId
duration = pickle.loads(task.arg)
- print "(%d:%d) Sleeping for %s seconds." % (self.fid, self.tid, duration)
+ print "(%s:%d) Sleeping for %s seconds." % (self.fid, self.tid, duration)
# TODO(benh): Don't sleep, this blocks the event loop!
time.sleep(duration)
status = nexus.TaskStatus(self.tid, nexus.TASK_FINISHED, "")
driver.sendStatusUpdate(status)
-
+ time.sleep(1)
+
def killTask(self, driver, tid):
if (self.tid != tid):
print "Expecting different task id ... killing anyway!"
Modified: incubator/mesos/trunk/src/scaling/scaling_exec
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/scaling/scaling_exec?rev=1131659&r1=1131658&r2=1131659&view=diff
==============================================================================
--- incubator/mesos/trunk/src/scaling/scaling_exec (original)
+++ incubator/mesos/trunk/src/scaling/scaling_exec Sun Jun 5 04:54:26 2011
@@ -1,3 +1,4 @@
#!/bin/sh
export PYTHONPATH="$PYTHONPATH:../swig/python"
+cd `dirname $0`
exec ./scaling_exec.py $@
Modified: incubator/mesos/trunk/src/scaling/scaling_exec.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/scaling/scaling_exec.py?rev=1131659&r1=1131658&r2=1131659&view=diff
==============================================================================
--- incubator/mesos/trunk/src/scaling/scaling_exec.py (original)
+++ incubator/mesos/trunk/src/scaling/scaling_exec.py Sun Jun 5 04:54:26 2011
@@ -4,6 +4,8 @@ import os
import pickle
import sys
+CPUS = 1
+MEM = 50*1024*1024
class NestedScheduler(nexus.Scheduler):
def __init__(self, todo, duration, executor):
@@ -18,7 +20,8 @@ class NestedScheduler(nexus.Scheduler):
return "Nested Framework: %d todo at %d secs" % (self.todo, self.duration)
def getExecutorInfo(self, driver):
- return nexus.ExecutorInfo("nested_exec", os.getcwd(), "")
+ execPath = os.path.join(os.getcwd(), "nested_exec")
+ return nexus.ExecutorInfo(execPath, "")
def registered(self, driver, fid):
print "Nested Scheduler Registered!"
@@ -28,8 +31,9 @@ class NestedScheduler(nexus.Scheduler):
for offer in offers:
if self.todo != self.tid:
self.tid += 1
+ pars = {"cpus": "%d" % CPUS, "mem": "%d" % MEM}
task = nexus.TaskDescription(self.tid, offer.slaveId,
- "task %d" % self.tid, offer.params,
+ "task %d" % self.tid, pars,
pickle.dumps(self.duration))
tasks.append(task)
#msg = nexus.FrameworkMessage(-1, , "")
@@ -55,9 +59,10 @@ class ScalingExecutor(nexus.Executor):
self.tid = task.taskId
master, (todo, duration) = pickle.loads(task.arg)
scheduler = NestedScheduler(todo, duration, self)
+ print "Running here:" + master
self.nested_driver = nexus.NexusSchedulerDriver(scheduler, master)
self.nested_driver.start()
-
+
def killTask(self, driver, tid):
if (tid != self.tid):
print "Expecting different task id ... killing anyway!"
Modified: incubator/mesos/trunk/src/scaling/scaling_sched
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/scaling/scaling_sched?rev=1131659&r1=1131658&r2=1131659&view=diff
==============================================================================
--- incubator/mesos/trunk/src/scaling/scaling_sched (original)
+++ incubator/mesos/trunk/src/scaling/scaling_sched Sun Jun 5 04:54:26 2011
@@ -1,3 +1,4 @@
#!/bin/sh
export PYTHONPATH="$PYTHONPATH:../swig/python"
+cd `dirname $0`
exec ./scaling_sched.py $@
Modified: incubator/mesos/trunk/src/scaling/scaling_sched.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/scaling/scaling_sched.py?rev=1131659&r1=1131658&r2=1131659&view=diff
==============================================================================
--- incubator/mesos/trunk/src/scaling/scaling_sched.py (original)
+++ incubator/mesos/trunk/src/scaling/scaling_sched.py Sun Jun 5 04:54:26 2011
@@ -6,58 +6,38 @@ import time
import os
import pickle
-# Scheduler configurations as pairs of (todo, duration) to run.
-# config = [ (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
-# (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
-# (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
-# (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
-# (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
-# (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
-# (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
-# (5, 10), (5, 10), (5, 10), (5, 10), (5, 10),
-# (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
-# (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
-# (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
-# (10, 1), (10, 1), (10, 1), (10, 1), (10, 1),
-# (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
-# (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
-# (100, 1), (100, 1), (100, 1), (100, 1), (100, 1),
-# (100, 1), (100, 1), (100, 1), (100, 1), (100, 1) ]
-
-config = [ (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- # (1, 10), (1, 10), (1, 10), (1, 10), (1, 10),
- (1, 10), (1, 10), (1, 10), (1, 10), (1, 10) ]
+CPUS = 1
+MEM = 50*1024*1024
+config1 = [ (1,20) ]
+
+config2 = [ (1,20), (1,240) ]
+
+config = [ (50, 120),
+ (50, 120),
+ (50, 120),
+ (50, 120),
+ (50, 120),
+ (50, 120),
+ (50, 120),
+ (50, 120),
+ (50, 120),
+ (50, 120) ]
class ScalingScheduler(nexus.Scheduler):
def __init__(self, master):
nexus.Scheduler.__init__(self)
self.tid = 0
self.master = master
+ print self.master
self.running = {}
def getFrameworkName(self, driver):
return "Scaling Framework"
def getExecutorInfo(self, driver):
- return nexus.ExecutorInfo("scaling_exec", os.getcwd(), "")
+ execPath = os.path.join(os.getcwd(), "scaling_exec")
+ return nexus.ExecutorInfo(execPath, "")
def registered(self, driver, fid):
print "Scaling Scheduler Registered!"
@@ -75,12 +55,13 @@ class ScalingScheduler(nexus.Scheduler):
if len(config) != self.tid:
(todo, duration) = config[self.tid]
arg = pickle.dumps((self.master, (todo, duration)))
+ pars = {"cpus": "%d" % CPUS, "mem": "%d" % MEM}
task = nexus.TaskDescription(self.tid, offer.slaveId,
- "task %d" % self.tid, offer.params, arg)
+ "task %d" % self.tid, pars, arg)
tasks.append(task)
self.running[self.tid] = (todo, duration)
self.tid += 1
- print "Launching (%d, %d) on slave %d" % (todo, duration, offer.slaveId)
+ print "Launching (%d, %d) on slave %s" % (todo, duration, offer.slaveId)
driver.replyToOffer(oid, tasks, {})
def statusUpdate(self, driver, status):