You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:39:29 UTC

svn commit: r1132086 - in /incubator/mesos/trunk/frameworks/mesos-submit: ./ executor executor.py mesos-submit mesos_submit.py

Author: benh
Date: Sun Jun  5 08:39:28 2011
New Revision: 1132086

URL: http://svn.apache.org/viewvc?rev=1132086&view=rev
Log:
Added a simple framework called mesos-submit that lets you launch a
process in the cluster and exits once it is launched. This is a way to
run schedulers in the cluster for example (by submitting a script which
starts up your scheduler). Usage is similar in spirit to mpiexec.

Added:
    incubator/mesos/trunk/frameworks/mesos-submit/
    incubator/mesos/trunk/frameworks/mesos-submit/executor   (with props)
    incubator/mesos/trunk/frameworks/mesos-submit/executor.py   (with props)
    incubator/mesos/trunk/frameworks/mesos-submit/mesos-submit   (with props)
    incubator/mesos/trunk/frameworks/mesos-submit/mesos_submit.py   (with props)

Added: incubator/mesos/trunk/frameworks/mesos-submit/executor
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/mesos-submit/executor?rev=1132086&view=auto
==============================================================================
--- incubator/mesos/trunk/frameworks/mesos-submit/executor (added)
+++ incubator/mesos/trunk/frameworks/mesos-submit/executor Sun Jun  5 08:39:28 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+if [ "x$PYTHON" == "x" ]; then
+  PYTHON=python
+  if [ "`uname`" == "SunOS" ]; then
+    PYTHON=python2.6
+  fi
+fi
+
+if [ "x$MESOS_HOME" == "x" ]; then
+  MESOS_HOME="$(dirname $0)/../.."
+fi
+
+export PYTHONPATH=$MESOS_HOME/lib/python
+exec $PYTHON "$(dirname $0)/executor.py" $@

Propchange: incubator/mesos/trunk/frameworks/mesos-submit/executor
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/mesos/trunk/frameworks/mesos-submit/executor.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/mesos-submit/executor.py?rev=1132086&view=auto
==============================================================================
--- incubator/mesos/trunk/frameworks/mesos-submit/executor.py (added)
+++ incubator/mesos/trunk/frameworks/mesos-submit/executor.py Sun Jun  5 08:39:28 2011
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+import os
+import subprocess
+import sys
+import threading
+import time
+from threading import Thread
+
+import mesos
+
+
+# This function is called in its own thread to actually run the user's command.
+# When it finishes, it shuts down the scheduler driver (disconnecting the 
+# framework) and exits the program.
+def run_command(command, driver):
+  print "Running " + command
+  equal_signs = "=" * 40
+  print equal_signs
+  try:
+    code = os.system(command)
+    print equal_signs
+    print "Command completed with code %d" % code
+  except OSError,e:
+    print equal_signs
+    print "os.system call failed, see stderr for details"
+    print >>sys.stderr, "Error executing command"
+    print >>sys.stderr, e
+    driver.stop()
+    sys.exit(2)
+  driver.stop()
+  sys.exit(0)
+
+
+# A secondary scheduler registered for our framework with Mesos so that
+# our first scheduler (on the machine that ran mesos-submit) can disconnect.
+# This scheduler launches no further tasks but allows our one task to continue
+# running in the cluster -- the task essentially becomes its own scheduler.
+class SecondaryScheduler(mesos.Scheduler):
+  def __init__(self, command):
+    mesos.Scheduler.__init__(self)
+    self.command = command
+    print "here"
+
+  def getFrameworkName(self, driver):
+    print "here 2"
+    return "mesos-submit " + self.command
+
+  def getExecutorInfo(self, driver):
+    print "here 3"
+    executorPath = os.path.join(os.getcwd(), "executor")
+    return mesos.ExecutorInfo(executorPath, "")
+
+  def resourceOffer(self, driver, oid, offers):
+    # Reject the offer with an infinite timeout, since we are here
+    # only to serve as a second scheduler to keep the framework running
+    driver.replyToOffer(oid, [], {"timeout": "-1"})
+
+  def registered(self, driver, fid):
+    print "Registered with Mesos; starting command"
+    Thread(target=run_command, args=[self.command, driver]).start()
+
+  def error(self, driver, code, message):
+    print "Error from Mesos: %s (code %s)" % (message, code)
+
+
+# This function is called in a separate thread to run our secondary scheduler;
+# for some reason, things fail if we launch it from the executor's launchTask
+# callback (this is likely to be SWIG/Python related).
+def run_scheduler(command, master, fid):
+  print "Starting secondary scheduler"
+  sched = SecondaryScheduler(command)
+  sched_driver = mesos.MesosSchedulerDriver(sched, master, fid)
+  sched_driver.run()
+
+
+
+# Executor class for mesos-scheduler. Expects to be given a single task
+# to launch with a framework ID, master URL and command as parameters.
+# Once this task is received, the executor registers as a scheduler for the
+# framework by creating a SecondaryScheduler object, allowing the mesos-submit
+# command on the user's machine to exit, and it starts the user's command
+# on this cluster node as a subprocess.
+class MyExecutor(mesos.Executor):
+  def __init__(self):
+    mesos.Executor.__init__(self)
+    self.sched = None
+
+  def launchTask(self, driver, task):
+    if self.sched == None:
+      print "Received task; going to register as scheduler"
+      # Recover framework ID, master and command from task arg
+      pieces = task.arg.split("|")
+      fid = pieces[0]
+      master = pieces[1]
+      command = "|".join(pieces[2:]) # In case there are | characters in command
+      print "Parsed parameters:"
+      print "  framework ID = %s" % fid
+      print "  master = %s" % master
+      print "  command = %s" % command
+      # Start our secondary scheduler in a different thread (for some reason,
+      # this fails if we do it from the same thread.. probably due to some
+      # SWIG Python interaction).
+      Thread(target=run_scheduler, args=[command, master, fid]).start()
+    else:
+      print "Error: received a second task -- this should never happen!"
+
+  def killTask(self, driver, tid):
+    sys.exit(1)
+
+  def error(self, driver, code, message):
+    print "Error from Mesos: %s (code %s)" % (message, code)
+
+
+if __name__ == "__main__":
+  print "Starting mesos-submit executor"
+  executor = MyExecutor()
+  mesos.MesosExecutorDriver(executor).run()

Propchange: incubator/mesos/trunk/frameworks/mesos-submit/executor.py
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/mesos/trunk/frameworks/mesos-submit/mesos-submit
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/mesos-submit/mesos-submit?rev=1132086&view=auto
==============================================================================
--- incubator/mesos/trunk/frameworks/mesos-submit/mesos-submit (added)
+++ incubator/mesos/trunk/frameworks/mesos-submit/mesos-submit Sun Jun  5 08:39:28 2011
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+if [ "x$PYTHON" == "x" ]; then
+  PYTHON=python
+  if [ "`uname`" == "SunOS" ]; then
+    PYTHON=python2.6
+  fi
+fi
+
+if [ "x$MESOS_HOME" == "x" ]; then
+  MESOS_HOME="$(dirname $0)/../.."
+fi
+
+export PYTHONPATH=$MESOS_HOME/lib/python
+exec $PYTHON "$(dirname $0)/mesos_submit.py" $@

Propchange: incubator/mesos/trunk/frameworks/mesos-submit/mesos-submit
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/mesos/trunk/frameworks/mesos-submit/mesos_submit.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/mesos-submit/mesos_submit.py?rev=1132086&view=auto
==============================================================================
--- incubator/mesos/trunk/frameworks/mesos-submit/mesos_submit.py (added)
+++ incubator/mesos/trunk/frameworks/mesos-submit/mesos_submit.py Sun Jun  5 08:39:28 2011
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+import os
+import re
+import sys
+import time
+from optparse import OptionParser
+
+import mesos
+
+# Default resources to use for the command we execute.
+DEFAULT_CPUS = 1
+DEFAULT_MEM = 512
+
+
+class SubmitScheduler(mesos.Scheduler):
+  def __init__(self, cpus, mem, master, command):
+    mesos.Scheduler.__init__(self)
+    self.cpus = cpus
+    self.mem = mem
+    self.master = master
+    self.command = command
+    self.task_launched = False
+
+  def getFrameworkName(self, driver):
+    print "In getFrameworkName"
+    return "mesos-submit " + self.command
+
+  def getExecutorInfo(self, driver):
+    print "In getExecutorInfo"
+    executorPath = os.path.join(os.getcwd(), "executor")
+    return mesos.ExecutorInfo(executorPath, "")
+
+  def registered(self, driver, fid):
+    print "Registered with Mesos, FID = %s" % fid
+    self.fid = "" + fid
+
+  def resourceOffer(self, driver, oid, offers):
+    if self.task_launched:
+      driver.replyToOffer(oid, [], {"timeout": "-1"})
+    else:
+      for offer in offers:
+        cpus = int(offer.params["cpus"])
+        mem = int(offer.params["mem"])
+        if cpus >= self.cpus and mem >= self.mem:
+          print "Accepting slot on slave %s (%s)" % (offer.slaveId, offer.host)
+          params = {"cpus": "%d" % self.cpus, "mem": "%d" % self.mem}
+          arg = "%s|%s|%s" % (self.fid, self.master, self.command)
+          task = mesos.TaskDescription(0, offer.slaveId, "task", params, arg)
+          driver.replyToOffer(oid, [task], {"timeout": "1"})
+          self.task_launched = True
+          return
+
+  def statusUpdate(self, driver, update):
+    print "Task %d in state %d" % (update.taskId, update.state)
+
+  def error(self, driver, code, message):
+    if message == "Framework failover":
+      print "Secondary scheduler registered successfully; exiting mesos-submit"
+    else:
+      print "Error from Mesos: %s (error code: %d)" % (message, code)
+    driver.stop()
+
+
+if __name__ == "__main__":
+  parser = OptionParser(usage="Usage: %prog [options] <master_url> <command>")
+  parser.add_option("-c","--cpus",
+                    help="number of CPUs to request (default: 1)",
+                    dest="cpus", type="int", default=DEFAULT_CPUS)
+  parser.add_option("-m","--mem",
+                    help="MB of memory to request (default: 512)",
+                    dest="mem", type="int", default=DEFAULT_MEM)
+  (options,args)= parser.parse_args()
+  if len(args) < 2:
+    parser.error("At least two parameters are required.")
+    exit(2)
+  master = args[0]
+  command = " ".join(args[1:])
+  print "Connecting to mesos master %s" % master
+  sched = SubmitScheduler(options.cpus, options.mem, master, command)
+  mesos.MesosSchedulerDriver(sched, master).run()

Propchange: incubator/mesos/trunk/frameworks/mesos-submit/mesos_submit.py
------------------------------------------------------------------------------
    svn:executable = *