You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:39:29 UTC
svn commit: r1132086 - in /incubator/mesos/trunk/frameworks/mesos-submit: ./
executor executor.py mesos-submit mesos_submit.py
Author: benh
Date: Sun Jun 5 08:39:28 2011
New Revision: 1132086
URL: http://svn.apache.org/viewvc?rev=1132086&view=rev
Log:
Added a simple framework called mesos-submit that lets you launch a
process in the cluster and exits once it is launched. This is a way to
run schedulers in the cluster for example (by submitting a script which
starts up your scheduler). Usage is similar in spirit to mpiexec.
Added:
incubator/mesos/trunk/frameworks/mesos-submit/
incubator/mesos/trunk/frameworks/mesos-submit/executor (with props)
incubator/mesos/trunk/frameworks/mesos-submit/executor.py (with props)
incubator/mesos/trunk/frameworks/mesos-submit/mesos-submit (with props)
incubator/mesos/trunk/frameworks/mesos-submit/mesos_submit.py (with props)
Added: incubator/mesos/trunk/frameworks/mesos-submit/executor
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/mesos-submit/executor?rev=1132086&view=auto
==============================================================================
--- incubator/mesos/trunk/frameworks/mesos-submit/executor (added)
+++ incubator/mesos/trunk/frameworks/mesos-submit/executor Sun Jun 5 08:39:28 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+if [ "x$PYTHON" == "x" ]; then
+ PYTHON=python
+ if [ "`uname`" == "SunOS" ]; then
+ PYTHON=python2.6
+ fi
+fi
+
+if [ "x$MESOS_HOME" == "x" ]; then
+ MESOS_HOME="$(dirname $0)/../.."
+fi
+
+export PYTHONPATH=$MESOS_HOME/lib/python
+exec $PYTHON "$(dirname $0)/executor.py" $@
Propchange: incubator/mesos/trunk/frameworks/mesos-submit/executor
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/mesos/trunk/frameworks/mesos-submit/executor.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/mesos-submit/executor.py?rev=1132086&view=auto
==============================================================================
--- incubator/mesos/trunk/frameworks/mesos-submit/executor.py (added)
+++ incubator/mesos/trunk/frameworks/mesos-submit/executor.py Sun Jun 5 08:39:28 2011
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+import os
+import subprocess
+import sys
+import threading
+import time
+from threading import Thread
+
+import mesos
+
+
+# This function is called in its own thread to actually run the user's command.
+# When it finishes, it shuts down the scheduler driver (disconnecting the
+# framework) and exits the program.
+def run_command(command, driver):
+ print "Running " + command
+ equal_signs = "=" * 40
+ print equal_signs
+ try:
+ code = os.system(command)
+ print equal_signs
+ print "Command completed with code %d" % code
+ except OSError,e:
+ print equal_signs
+ print "os.system call failed, see stderr for details"
+ print >>sys.stderr, "Error executing command"
+ print >>sys.stderr, e
+ driver.stop()
+ sys.exit(2)
+ driver.stop()
+ sys.exit(0)
+
+
+# A secondary scheduler registered for our framework with Mesos so that
+# our first scheduler (on the machine that ran mesos-submit) can disconnect.
+# This scheduler launches no further tasks but allows our one task to continue
+# running in the cluster -- the task essentially becomes its own scheduler.
+class SecondaryScheduler(mesos.Scheduler):
+ def __init__(self, command):
+ mesos.Scheduler.__init__(self)
+ self.command = command
+ print "here"
+
+ def getFrameworkName(self, driver):
+ print "here 2"
+ return "mesos-submit " + self.command
+
+ def getExecutorInfo(self, driver):
+ print "here 3"
+ executorPath = os.path.join(os.getcwd(), "executor")
+ return mesos.ExecutorInfo(executorPath, "")
+
+ def resourceOffer(self, driver, oid, offers):
+ # Reject the offer with an infinite timeout, since we are here
+ # only to serve as a second scheduler to keep the framework running
+ driver.replyToOffer(oid, [], {"timeout": "-1"})
+
+ def registered(self, driver, fid):
+ print "Registered with Mesos; starting command"
+ Thread(target=run_command, args=[self.command, driver]).start()
+
+ def error(self, driver, code, message):
+ print "Error from Mesos: %s (code %s)" % (message, code)
+
+
+# This function is called in a separate thread to run our secondary scheduler;
+# for some reason, things fail if we launch it from the executor's launchTask
+# callback (this is likely to be SWIG/Python related).
+def run_scheduler(command, master, fid):
+ print "Starting secondary scheduler"
+ sched = SecondaryScheduler(command)
+ sched_driver = mesos.MesosSchedulerDriver(sched, master, fid)
+ sched_driver.run()
+
+
+
+# Executor class for mesos-scheduler. Expects to be given a single task
+# to launch with a framework ID, master URL and command as parameters.
+# Once this task is received, the executor registers as a scheduler for the
+# framework by creating a SecondaryScheduler object, allowing the mesos-submit
+# command on the user's machine to exit, and it starts the user's command
+# on this cluster node as a subprocess.
+class MyExecutor(mesos.Executor):
+ def __init__(self):
+ mesos.Executor.__init__(self)
+ self.sched = None
+
+ def launchTask(self, driver, task):
+ if self.sched == None:
+ print "Received task; going to register as scheduler"
+ # Recover framework ID, master and command from task arg
+ pieces = task.arg.split("|")
+ fid = pieces[0]
+ master = pieces[1]
+ command = "|".join(pieces[2:]) # In case there are | characters in command
+ print "Parsed parameters:"
+ print " framework ID = %s" % fid
+ print " master = %s" % master
+ print " command = %s" % command
+ # Start our secondary scheduler in a different thread (for some reason,
+ # this fails if we do it from the same thread.. probably due to some
+ # SWIG Python interaction).
+ Thread(target=run_scheduler, args=[command, master, fid]).start()
+ else:
+ print "Error: received a second task -- this should never happen!"
+
+ def killTask(self, driver, tid):
+ sys.exit(1)
+
+ def error(self, driver, code, message):
+ print "Error from Mesos: %s (code %s)" % (message, code)
+
+
+if __name__ == "__main__":
+ print "Starting mesos-submit executor"
+ executor = MyExecutor()
+ mesos.MesosExecutorDriver(executor).run()
Propchange: incubator/mesos/trunk/frameworks/mesos-submit/executor.py
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/mesos/trunk/frameworks/mesos-submit/mesos-submit
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/mesos-submit/mesos-submit?rev=1132086&view=auto
==============================================================================
--- incubator/mesos/trunk/frameworks/mesos-submit/mesos-submit (added)
+++ incubator/mesos/trunk/frameworks/mesos-submit/mesos-submit Sun Jun 5 08:39:28 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+if [ "x$PYTHON" == "x" ]; then
+ PYTHON=python
+ if [ "`uname`" == "SunOS" ]; then
+ PYTHON=python2.6
+ fi
+fi
+
+if [ "x$MESOS_HOME" == "x" ]; then
+ MESOS_HOME="$(dirname $0)/../.."
+fi
+
+export PYTHONPATH=$MESOS_HOME/lib/python
+exec $PYTHON "$(dirname $0)/mesos_submit.py" $@
Propchange: incubator/mesos/trunk/frameworks/mesos-submit/mesos-submit
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/mesos/trunk/frameworks/mesos-submit/mesos_submit.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/mesos-submit/mesos_submit.py?rev=1132086&view=auto
==============================================================================
--- incubator/mesos/trunk/frameworks/mesos-submit/mesos_submit.py (added)
+++ incubator/mesos/trunk/frameworks/mesos-submit/mesos_submit.py Sun Jun 5 08:39:28 2011
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+import os
+import re
+import sys
+import time
+from optparse import OptionParser
+
+import mesos
+
+# Default resources to use for the command we execute.
+DEFAULT_CPUS = 1
+DEFAULT_MEM = 512
+
+
+class SubmitScheduler(mesos.Scheduler):
+ def __init__(self, cpus, mem, master, command):
+ mesos.Scheduler.__init__(self)
+ self.cpus = cpus
+ self.mem = mem
+ self.master = master
+ self.command = command
+ self.task_launched = False
+
+ def getFrameworkName(self, driver):
+ print "In getFrameworkName"
+ return "mesos-submit " + self.command
+
+ def getExecutorInfo(self, driver):
+ print "In getExecutorInfo"
+ executorPath = os.path.join(os.getcwd(), "executor")
+ return mesos.ExecutorInfo(executorPath, "")
+
+ def registered(self, driver, fid):
+ print "Registered with Mesos, FID = %s" % fid
+ self.fid = "" + fid
+
+ def resourceOffer(self, driver, oid, offers):
+ if self.task_launched:
+ driver.replyToOffer(oid, [], {"timeout": "-1"})
+ else:
+ for offer in offers:
+ cpus = int(offer.params["cpus"])
+ mem = int(offer.params["mem"])
+ if cpus >= self.cpus and mem >= self.mem:
+ print "Accepting slot on slave %s (%s)" % (offer.slaveId, offer.host)
+ params = {"cpus": "%d" % self.cpus, "mem": "%d" % self.mem}
+ arg = "%s|%s|%s" % (self.fid, self.master, self.command)
+ task = mesos.TaskDescription(0, offer.slaveId, "task", params, arg)
+ driver.replyToOffer(oid, [task], {"timeout": "1"})
+ self.task_launched = True
+ return
+
+ def statusUpdate(self, driver, update):
+ print "Task %d in state %d" % (update.taskId, update.state)
+
+ def error(self, driver, code, message):
+ if message == "Framework failover":
+ print "Secondary scheduler registered successfully; exiting mesos-submit"
+ else:
+ print "Error from Mesos: %s (error code: %d)" % (message, code)
+ driver.stop()
+
+
+if __name__ == "__main__":
+ parser = OptionParser(usage="Usage: %prog [options] <master_url> <command>")
+ parser.add_option("-c","--cpus",
+ help="number of CPUs to request (default: 1)",
+ dest="cpus", type="int", default=DEFAULT_CPUS)
+ parser.add_option("-m","--mem",
+ help="MB of memory to request (default: 512)",
+ dest="mem", type="int", default=DEFAULT_MEM)
+ (options,args)= parser.parse_args()
+ if len(args) < 2:
+ parser.error("At least two parameters are required.")
+ exit(2)
+ master = args[0]
+ command = " ".join(args[1:])
+ print "Connecting to mesos master %s" % master
+ sched = SubmitScheduler(options.cpus, options.mem, master, command)
+ mesos.MesosSchedulerDriver(sched, master).run()
Propchange: incubator/mesos/trunk/frameworks/mesos-submit/mesos_submit.py
------------------------------------------------------------------------------
svn:executable = *