You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:30:00 UTC

svn commit: r1132055 - in /incubator/mesos/trunk/frameworks/deploy_jar: daemon_framework daemon_scheduler.py

Author: benh
Date: Sun Jun  5 08:30:00 2011
New Revision: 1132055

URL: http://svn.apache.org/viewvc?rev=1132055&view=rev
Log:
Updated to alpha release API.

Modified:
    incubator/mesos/trunk/frameworks/deploy_jar/daemon_framework
    incubator/mesos/trunk/frameworks/deploy_jar/daemon_scheduler.py

Modified: incubator/mesos/trunk/frameworks/deploy_jar/daemon_framework
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/deploy_jar/daemon_framework?rev=1132055&r1=1132054&r2=1132055&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/deploy_jar/daemon_framework (original)
+++ incubator/mesos/trunk/frameworks/deploy_jar/daemon_framework Sun Jun  5 08:30:00 2011
@@ -6,6 +6,6 @@ if [ "`uname`" == "SunOS" ]; then
   PYTHON=python2.6
 fi
 
-export PYTHONPATH=`dirname $0`/../../src/swig/python:$PYTHONPATH
+export PYTHONPATH=`dirname $0`/../../lib/python:$PYTHONPATH
 
 $PYTHON ./daemon_scheduler.py $@

Modified: incubator/mesos/trunk/frameworks/deploy_jar/daemon_scheduler.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/deploy_jar/daemon_scheduler.py?rev=1132055&r1=1132054&r2=1132055&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/deploy_jar/daemon_scheduler.py (original)
+++ incubator/mesos/trunk/frameworks/deploy_jar/daemon_scheduler.py Sun Jun  5 08:30:00 2011
@@ -12,51 +12,17 @@ from optparse import OptionParser
 from subprocess import *
 from socket import gethostname
 
-MIN_SERVERS = 1
-START_THRESHOLD = 25
-KILL_THRESHOLD = 5
-#HAPROXY_EXE = "/root/haproxy-1.3.20/haproxy"
-HAPROXY_EXE = "/home/andyk/mesos/frameworks/haproxy+apache/haproxy-1.3.20/haproxy"
 
 class MyScheduler(mesos.Scheduler):
   def __init__(self, num_tasks, jar_url, jar_class, jar_args):
     mesos.Scheduler.__init__(self)
     self.lock = threading.RLock()
-    self.id = 0
-    self.haproxy = -1
-    self.reconfigs = 0
     self.task_count = 0
-    self.overloaded = False
     self.num_tasks = num_tasks
     self.jar_url = jar_url
     self.jar_class = jar_class
     self.jar_args = jar_args
 
-#  def reconfigure(self):
-#    name = "/tmp/haproxy.conf.%d" % self.reconfigs
-#    with open(name, 'w') as config:
-#      with open('haproxy.config.template', 'r') as template:
-#        for line in template:
-#          config.write(line)
-#      for id, host in self.servers.iteritems():
-#        config.write("       ")
-#        config.write("server %d %s:80 check\n" % (id, host))
-#
-#    cmd = []
-#    if self.haproxy != -1:
-#      cmd = [HAPROXY_EXE,
-#             "-f",
-#             name,
-#             "-sf",
-#             str(self.haproxy.pid)]
-#    else:
-#      cmd = [HAPROXY_EXE,
-#             "-f",
-#             name]
-#
-#    self.haproxy = Popen(cmd, shell = False)
-#    self.reconfigs += 1
-  
   def getExecutorInfo(self, driver):
     execPath = os.path.join(os.getcwd(), "daemon_executor.sh")
     return mesos.ExecutorInfo(execPath, "")
@@ -64,13 +30,22 @@ class MyScheduler(mesos.Scheduler):
   def registered(self, driver, fid):
     print "Mesos daemon scheduler registered as framework #%s" % fid
 
+  def getFrameworkName(self, driver):
+    return "Python Deploy Jar"
+
   def resourceOffer(self, driver, oid, slave_offers):
-    print "Got slot offer %d" % oid
+    print "Got resource offer %s" % oid
     self.lock.acquire()
     tasks = []
     for offer in slave_offers:
-      if int(self.task_count) < int(self.num_tasks) and int(offer.params['mem']) >= 1073741824 and int(offer.params['cpus']) > 0:
-        print "accept slot here"
+      if int(self.task_count) > int(self.num_tasks): 
+        print "Rejecting slot because we've launched enough tasks"
+      elif int(offer.params['mem']) < 1024 or int(offer.params['cpus']) < 1:
+        print "Rejecting slot because it is too small"
+        print "It had mem=" + offer.params['mem']\
+        + " and cpus=" + offer.params['cpus']
+      else:
+        print "accepting slot of size 1 cpu and 1024MB to launch a jar"
         params = {"cpus": "1", "mem": "1024"}
         task_args = self.jar_url + "\t" + self.jar_class + "\t" + self.jar_args
         print "task args are: " + task_args
@@ -80,94 +55,16 @@ class MyScheduler(mesos.Scheduler):
         print "incrementing self.task_count from " + str(self.task_count)
         print "self.num_tasks is " + str(self.num_tasks)
         self.task_count += 1
-      else:
-        print "Rejecting slot because we've launched enough tasks"
     driver.replyToOffer(oid, tasks, {"timeout": "1"})
-    #self.reconfigure()
     self.lock.release()
 
-  #def statusUpdate(self, driver, status):
-  #  self.lock.acquire()
-  #  if status.taskId in self.servers.keys():
-  #    if status.state == mesos.TASK_FINISHED:
-  #      del self.servers[status.taskId]
-  #      self.reconfigure()
-  #      reconfigured = True
-  #  self.lock.release()
-  #  if reconfigured:
-  #    self.reviveOffers()
-
-  #def scaleUp(self):
-  #  print "SCALING UP"
-  #  self.lock.acquire()
-  #  self.overloaded = True
-  #  self.lock.release()
-
-  #def scaleDown(self, id):
-  #  print "SCALING DOWN (removing server %d)" % id
-  #  kill = False
-  #  self.lock.acquire()
-  #  if self.overloaded:
-  #    self.overloaded = False
-  #  else:
-  #    kill = True
-  #  self.lock.release()
-  #  if kill:
-  #    self.killTask(id)
-
-
-def monitor(sched):
-  while True:
-    time.sleep(1)
-    try:
-      conn = httplib.HTTPConnection("ec2-72-44-51-87.compute-1.amazonaws.com")
-      conn.request("GET", "/stats;csv")
-      res = conn.getresponse()
-      if (res.status != 200):
-        print "response != 200"
-        continue
-      else:
-        data = res.read()
-        lines = data.split('\n')[2:-2]
-
-        data = data.split('\n')
-        data = data[1].split(',')
-
-        if int(data[33]) >= START_THRESHOLD:
-          sched.scaleUp()
-        elif int(data[4]) <= KILL_THRESHOLD:
-          minload, minid = (sys.maxint, 0)
-          for l in lines:
-            cols = l.split(',')
-            id = int(cols[1])
-            load = int(cols[4])
-            if load < minload:
-              minload = load
-              minid = id
-
-          if len(lines) > MIN_SERVERS and minload == 0:
-            sched.scaleDown(minid)
-
-        conn.close()
-    except Exception, e:
-      continue
-
 if __name__ == "__main__":
-  #parser = OptionParser(usage = "Usage: daemon_framework <mesos-master> <num tasks> <URL of jar> <class name> <arguments>")
-
-  #(options,args) = parser.parse_args()
-  #if len(args) < 5:
-  #  print >> sys.stderr, "five parameters required. " + str(len(args))
-  #  print >> sys.stderr, "Use --help to show usage."
-  #  exit(2)
   args = sys.argv[1:len(sys.argv)]
   
   print "sched = MyScheduler(" + args[1] + ", "+  args[2]+ ", "+ args[3]+ ", "+ " ".join(args[4:len(args)])+")"
 
   sched = MyScheduler(args[1], args[2], args[3], " ".join(args[4:len(args)]))
 
-  #threading.Thread(target = monitor, args=[sched]).start()
-
   print "Connecting to mesos master %s" % args[0]
 
   mesos.MesosSchedulerDriver(sched, sys.argv[1]).run()