You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 06:57:35 UTC

svn commit: r1131686 - in /incubator/mesos/trunk/frameworks/torque: torquelib.py torquesched.py

Author: benh
Date: Sun Jun  5 04:57:35 2011
New Revision: 1131686

URL: http://svn.apache.org/viewvc?rev=1131686&view=rev
Log:
Added python logging so that subsets of logging can be turned on and off.

Modified:
    incubator/mesos/trunk/frameworks/torque/torquelib.py
    incubator/mesos/trunk/frameworks/torque/torquesched.py

Modified: incubator/mesos/trunk/frameworks/torque/torquelib.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/torque/torquelib.py?rev=1131686&r1=1131685&r2=1131686&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/torque/torquelib.py (original)
+++ incubator/mesos/trunk/frameworks/torque/torquelib.py Sun Jun  5 04:57:35 2011
@@ -1,12 +1,15 @@
 import re
 import tempfile
+import logging
 import xml.dom.minidom
 
 from subprocess import *
 
+logging.basicConfig()
+
 class Job:
   def __init__(self, xmlJobElt):
-    #print "creating a new Job obj out of xml"
+    #logging.debug("creating a new Job obj out of xml")
     assert xmlJobElt.nodeName == "Job", "xml element passed to Job constructor was not named 'Job'"
     self.resourceList = {}
     for res in xmlJobElt.getElementsByTagName("Resource_List")[0].childNodes:
@@ -14,37 +17,37 @@ class Job:
     self.jobState = xmlJobElt.getElementsByTagName("job_state")[0].childNodes[0].data
 
 def getActiveJobs():
-  print "in getJobs, grabbing xml output from qstat"
+  logging.debug("in getJobs, grabbing xml output from qstat")
   qstat_out = tempfile.TemporaryFile()
   qstat_obj = Popen("qstat -x", shell=True, stdout=qstat_out)
   qstat_obj.wait()
-  print "output of qstat: "
+  logging.debug("output of qstat: ")
   jobs = []
   jobsExist = False
   qstat_out.seek(0)
   for line in qstat_out:
     if re.match(".*Job.*", line):
-      #print "#############" + line + "#############"
+      #logging.debug("#############" + line + "#############")
       jobsExist = True
       break
   if jobsExist:
     qstat_out.seek(0)
     dom_doc = xml.dom.minidom.parse(qstat_out)
-    print "grabbing the Job elements from the xml dom doc"
+    logging.debug("grabbing the Job elements from the xml dom doc")
     xmljobs = dom_doc.getElementsByTagName("Job")
-    print "creating a new job object for each job dom elt"
+    logging.debug("creating a new job object for each job dom elt")
     for j in xmljobs:
       #make sure job's state is not complete
       newJob = Job(j)
       if newJob.jobState != "C":
         jobs.append(newJob)
   if len(jobs) == 0:
-    print "the string \"Job\" was not found in the qstat -x command output"
+    logging.debug("the string \"Job\" was not found in the qstat -x command output")
   return jobs
 
 class Node:
   def __init__(self, xmlHostElt):
-    #print "creating a new Host obj out of xml"
+    #logging.debug("creating a new Host obj out of xml")
     assert xmlHostElt.nodeName == "Node", "xml element passed to Node constructor was not named 'Node'"
     self.name = xmlHostElt.getElementsByTagName("name")[0].childNodes[0].data
 
@@ -66,7 +69,7 @@ class Node:
 
 #returns nodes whose state is not marked as "down"
 def getNodes():
-  print "in getNodes, grabbing xml output from pbsnodes"
+  logging.debug("in getNodes, grabbing xml output from pbsnodes")
   pbsnodes_out = tempfile.TemporaryFile()
   pbsnodes_obj = Popen("pbsnodes -x", shell=True, stdout=pbsnodes_out)
   pbsnodes_obj.wait()
@@ -74,26 +77,26 @@ def getNodes():
   nodesExist = False
   pbsnodes_out.seek(0)
   for line in pbsnodes_out:
-    #print "node line is %s" % line
+    #logging.debug("node line is %s" % line)
     if re.match(".*Node.*", line):
       nodesExist = True
       break
   if nodesExist:
     pbsnodes_out.seek(0)
     dom_doc = xml.dom.minidom.parse(pbsnodes_out)
-    print "grabbing the Job elements from the xml dom doc"
+    logging.debug("grabbing the Job elements from the xml dom doc")
     xmlnodes = dom_doc.getElementsByTagName("Node")
-    print "creating a new node object for each node dom elt"
+    logging.debug("creating a new node object for each node dom elt")
     for n in xmlnodes:
       #make sure node's state is online
       nodes.append(Node(n))
   if len(nodes) == 0:
-    print "the string \"Node\" was not found in the pbsnodes -x command output"
+    logging.debug("the string \"Node\" was not found in the pbsnodes -x command output")
   return nodes
 
 #TODO: DELETE THIS? Might note be used eventually
 def getQueueLength():
-  #print "computing the number of active jobs in the queue"
+  #logging.debug("computing the number of active jobs in the queue")
   qstat = Popen("qstat -Q",shell=True,stdout=PIPE).stdout
   jobcount = 0
   for line in qstat:

Modified: incubator/mesos/trunk/frameworks/torque/torquesched.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/torque/torquesched.py?rev=1131686&r1=1131685&r2=1131686&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/torque/torquesched.py (original)
+++ incubator/mesos/trunk/frameworks/torque/torquesched.py Sun Jun  5 04:57:35 2011
@@ -10,16 +10,32 @@ import threading
 import re
 import socket
 import torquelib
+import datetime
+import logging
+import logging.handlers
 
 from optparse import OptionParser
 from subprocess import *
 from socket import gethostname
 
 PBS_SERVER_FILE = "/var/spool/torque/server_name"
+EVENT_LOG_FILE = "log_fw_utilization.txt"
 
 SAFE_ALLOCATION = {"cpus":5,"mem":134217728} #just set statically for now, 128MB
 MIN_SLOT_SIZE = {"cpus":"1","mem":1073741824} #1GB
 
+eventlog = logging.getLogger("event_logger")
+eventlog.setLevel(logging.DEBUG)
+fh = logging.FileHandler(EVENT_LOG_FILE,'w') #create handler
+fh.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
+eventlog.addHandler(fh)
+
+driverlog = logging.getLogger("driver_logger")
+driverlog.setLevel(logging.INFO)
+
+monitorlog = logging.getLogger("monitor_logger")
+monitorlog.setLevel(logging.INFO)
+
 class MyScheduler(nexus.Scheduler):
   def __init__(self, ip):
     nexus.Scheduler.__init__(self)
@@ -29,35 +45,35 @@ class MyScheduler(nexus.Scheduler):
     self.servers = {}
     self.overloaded = False
     self.numToRegister = 1
-  
+
   def getExecutorInfo(self, driver):
     execPath = os.path.join(os.getcwd(), "start_pbs_mom.sh")
     initArg = self.ip # tell executor which node the pbs_server is running on
-    print "in getExecutorInfo, setting execPath = " + execPath + " and initArg = " + initArg
+    driverlog.info("in getExecutorInfo, setting execPath = " + execPath + " and initArg = " + initArg)
     return nexus.ExecutorInfo(execPath, initArg)
 
   def registered(self, driver, fid):
-    print "Nexus torque+pbs scheduler registered as framework #%s" % fid
+    driverlog.info("Nexus torque framwork registered with frameworkID %s" % fid)
 
   def resourceOffer(self, driver, oid, slave_offers):
     self.driver = driver
-    print "Got slot offer %d" % oid
+    driverlog.debug("Got slot offer %d" % oid)
     self.lock.acquire()
-    print "resourceOffer() acquired lock"
+    driverlog.debug("resourceOffer() acquired lock")
     tasks = []
     for offer in slave_offers:
       # if we haven't registered this node, accept slot & register w pbs_server
       #TODO: check to see if slot is big enough 
       if self.numToRegister <= 0:
-        print "Rejecting slot, no need for more slaves"
+        driverlog.debug("Rejecting slot, no need for more slaves")
         continue
       if offer.host in self.servers.values():
-        print "Rejecting slot, already registered node " + offer.host
+        driverlog.debug("Rejecting slot, already registered node " + offer.host)
         continue
       if len(self.servers) >= SAFE_ALLOCATION["cpus"]:
-        print "Rejecting slot, already at safe allocation (i.e. %d CPUS)" % SAFE_ALLOCATION["cpus"]
+        driverlog.debug("Rejecting slot, already at safe allocation (i.e. %d CPUS)" % SAFE_ALLOCATION["cpus"])
         continue
-      print "Need %d more nodes, so accepting slot, setting up params for it..." % self.numToRegister
+      driverlog.info("Need %d more nodes, so accepting slot, setting up params for it..." % self.numToRegister)
       params = {"cpus": "1", "mem": "1073741824"}
       td = nexus.TaskDescription(
           self.id, offer.slaveId, "task %d" % self.id, params, "")
@@ -66,59 +82,64 @@ class MyScheduler(nexus.Scheduler):
       self.regComputeNode(offer.host)
       self.numToRegister -= 1
       self.id += 1
-      print "self.id now set to " + str(self.id)
-    print "---"
+      driverlog.info("writing logfile")
+      eventlog.info(len(self.servers))
+      driverlog.info("done writing logfile")
+      driverlog.info("self.id now set to " + str(self.id))
+    #print "---"
     driver.replyToOffer(oid, tasks, {"timeout": "1"})
     self.lock.release()
-    print "resourceOffer() finished, released lock"
-    print "\n"
+    driverlog.debug("resourceOffer() finished, released lock\n\n")
 
   def statusUpdate(self, driver, status):
-    print "got status update from TID %s, state is: %s, data is: %s" %(status.taskId,status.state,status.data)
+    driverlog.info("got status update from TID %s, state is: %s, data is: %s" %(status.taskId,status.state,status.data))
 
   def regComputeNode(self, new_node):
-    print "registering new compute node, "+new_node+", with pbs_server"
-    print "checking to see if node is registered with server already"
+    driverlog.info("registering new compute node, "+new_node+", with pbs_server")
+    driverlog.info("checking to see if node is registered with server already")
     #nodes = Popen("qmgr -c 'list node'", shell=True, stdout=PIPE).stdout
     nodes = Popen("pbsnodes", shell=True, stdout=PIPE).stdout
-    print "output of pbsnodes command is: "
+    driverlog.info("output of pbsnodes command is: ")
     for line in nodes: 
-      print line
+      driverlog.info(line)
       if line.find(new_node) != -1:
-        print "Warn: tried to register node that's already registered, skipping"
+        driverlog.info("Warn: tried to register node that's already registered, skipping")
         return
     #add node to server
-    print "registering node with command: qmgr -c create node " + new_node
+    driverlog.info("registering node with command: qmgr -c create node " + new_node)
     qmgr_add = Popen("qmgr -c \"create node " + new_node + "\"", shell=True, stdout=PIPE).stdout
-    print "output of qmgr:"
-    for line in qmgr_add: print line
+    driverlog.info("output of qmgr:")
+    for line in qmgr_add: driverlog.info(line)
 
   def unregComputeNode(self, node_name):
     #remove node from server
-    print("removing node from pbs_server: qmgr -c delete node " + node_name)
-    print Popen('qmgr -c "delete node ' + node_name + '"', shell=True, stdout=PIPE).stdout
+    monitorlog.info("removing node from pbs_server: qmgr -c delete node " + node_name)
+    monitorlog.info(Popen('qmgr -c "delete node ' + node_name + '"', shell=True, stdout=PIPE).stdout)
   
   #unreg up to N random compute nodes, leave at least one
-  def unregNNodes(self, num_nodes):
-    print "unregNNodes called with arg %d" % num_nodes
-    if num_nodes > len(self.servers)-1:
-      print "... however, only unregistering %d nodes, leaving one alive" % (len(self.servers)-1)
-    print "getting and filtering list of nodes using torquelib"
+  def unregNNodes(self, numNodes):
+    monitorlog.debug("unregNNodes called with arg %d" % numNodes)
+    if numNodes > len(self.servers)-1:
+      monitorlog.debug("... however, only unregistering %d nodes, leaving one alive" % (len(self.servers)-1))
+    toKill = (len(self.servers)-1)
+    
+    monitorlog.debug("getting and filtering list of nodes using torquelib")
     noJobs = lambda x: x.status.has_key("jobs") == False or (x.status.has_key("jobs") == True and x.status["jobs"] == "")
     inactiveNodes = map(lambda x: x.name,filter(noJobs, torquelib.getNodes()))
-    print "victim pool of inactive nodes:"
+    monitorlog.debug("victim pool of inactive nodes:")
     for inode in inactiveNodes:
-      print inode
+      monitorlog.debug(inode)
     for tid, hostname in self.servers.items():
-      if len(self.servers) > 1 and num_nodes > 0 and hostname in inactiveNodes:
-        print "we still have to kill %d of the %d compute nodes which master is tracking" % (num_nodes, len(self.servers))
-        print "unregistering node " + str(hostname)
+      if len(self.servers) > 1 and toKill > 0 and hostname in inactiveNodes:
+        monitorlog.info("We still have to kill %d of the %d compute nodes which master is tracking" % (toKill, len(self.servers)))
+        monitorlog.info("unregistering node " + str(hostname))
         self.unregComputeNode(hostname)
         self.servers.pop(tid)
-        num_nodes = num_nodes - 1
-        print "killing corresponding task with tid %d" % tid
+        toKill = toKill - 1
+        monitorlog.info("killing corresponding task with tid %d" % tid)
         self.driver.killTask(tid)
-    #handle the case where we didn't kill enough nodes 
+    if toKill > 0: 
+      monitorlog.warn("Done killing. We were supposed to kill %d nodes, but only found and killed %d free nodes" % (numNodes, numNodes-toKill))
 
   def getFrameworkName(self, driver):
     return "Nexus Torque Framework"
@@ -127,29 +148,29 @@ class MyScheduler(nexus.Scheduler):
 def monitor(sched):
   while True:
     time.sleep(1)
-    print "monitor thread acquiring lock"
+    monitorlog.debug("monitor thread acquiring lock")
     sched.lock.acquire()
-    print "computing num nodes needed to satisfy eligable jobs in queue"
+    monitorlog.debug("computing num nodes needed to satisfy eligable jobs in queue")
     needed = 0
     jobs = torquelib.getActiveJobs()
-    print "retreived jobs in queue, count: %d" % len(jobs)
+    monitorlog.debug("retreived jobs in queue, count: %d" % len(jobs))
     for j in jobs:
       #WARNING: this check should only be used if torque is using fifo queue
       #if needed + j.needsnodes <= SAFE_ALLOCATION:
-      print "job resource list is: " + str(j.resourceList)
+      monitorlog.debug("job resource list is: " + str(j.resourceList))
       needed += int(j.resourceList["nodect"])
-    print "number of nodes needed by jobs in queue: %d" % needed
+    monitorlog.debug("number of nodes needed by jobs in queue: %d" % needed)
     numToRelease = len(sched.servers) - needed
-    print "number of nodes to release is %d - %d" % (len(sched.servers),needed)
+    monitorlog.debug("number of nodes to release is %d - %d" % (len(sched.servers),needed))
     if numToRelease > 0:
       sched.unregNNodes(numToRelease)
       sched.numToRegister = 0
     else:
-      print "monitor updating sched.numToRegister from %d to %d" % (sched.numToRegister, numToRelease * -1)
+      monitorlog.debug("monitor updating sched.numToRegister from %d to %d" % (sched.numToRegister, numToRelease * -1))
       sched.numToRegister = numToRelease * -1
     sched.lock.release()
-    print "monitor thread releasing lock"
-    print "\n"
+    monitorlog.debug("monitor thread releasing lock")
+    monitorlog.debug("\n")
 
 if __name__ == "__main__":
   parser = OptionParser(usage = "Usage: %prog nexus_master")
@@ -163,22 +184,21 @@ if __name__ == "__main__":
   fqdn = socket.getfqdn()
   ip = socket.gethostbyname(gethostname())
 
-  print "running killall pbs_server"
+  monitorlog.info("running killall pbs_server")
   Popen("killall pbs_server", shell=True)
   time.sleep(1)
 
-  print "writing $(TORQUECFG)/server_name file with fqdn of pbs_server: " + fqdn
+  monitorlog.info("writing $(TORQUECFG)/server_name file with fqdn of pbs_server: " + fqdn)
   FILE = open(PBS_SERVER_FILE,'w')
   FILE.write(fqdn)
   FILE.close()
 
-  print "starting pbs_server"
+  monitorlog.info("starting pbs_server")
   #Popen("/etc/init.d/pbs_server start", shell=True)
   Popen("pbs_server", shell=True)
   time.sleep(2)
-  print "\n"
 
-  print "running command: qmgr -c \"set queue batch resources_available.nodes=%s\"" % SAFE_ALLOCATION["cpus"]
+  monitorlog.info("running command: qmgr -c \"set queue batch resources_available.nodes=%s\"" % SAFE_ALLOCATION["cpus"])
   Popen("qmgr -c \"set queue batch resources_available.nodect=%s\"" % SAFE_ALLOCATION["cpus"], shell=True)
   Popen("qmgr -c \"set server resources_available.nodect=%s\"" % SAFE_ALLOCATION["cpus"], shell=True)
 
@@ -188,40 +208,37 @@ if __name__ == "__main__":
 
   outp = Popen("qmgr -c \"l queue batch\"", shell=True, stdout=PIPE).stdout
   for l in outp:
-    print l
+    monitorlog.info(l)
 
-  print "RE-killing pbs_server for resources_available setting to take effect"
+  monitorlog.info("RE-killing pbs_server for resources_available setting to take effect")
   #Popen("/etc/init.d/pbs_server start", shell=True)
   Popen("qterm", shell=True)
   #time.sleep(1)
-  print("\n")
 
-  print "RE-starting pbs_server for resources_available setting to take effect"
+  monitorlog.info("RE-starting pbs_server for resources_available setting to take effect")
   Popen("pbs_server", shell=True)
-  "qmgr list queue settings: "
+  monitorlog.debug("qmgr list queue settings: ")
   output = Popen("qmgr -c 'l q batch'", shell=True, stdout=PIPE).stdout
   for line in output:
-    print line
-  print("\n")
+    monitorlog.debug(line)
 
-  print "running killall pbs_sched"
+  monitorlog.info("running killall pbs_sched")
   Popen("killall pbs_sched", shell=True)
   #time.sleep(2)
-  print("\n")
 
-  print "starting pbs_scheduler"
+  monitorlog.info("starting pbs_scheduler")
   #Popen("/etc/init.d/pbs_sched start", shell=True)
   Popen("pbs_sched", shell=True)
 
   #ip = Popen("hostname -i", shell=True, stdout=PIPE).stdout.readline().rstrip() #linux
   #ip = Popen("ifconfig en1 | awk '/inet / { print $2 }'", shell=True, stdout=PIPE).stdout.readline().rstrip() # os x
-  print "Remembering IP address of scheduler (" + ip + "), and fqdn: " + fqdn
+  monitorlog.info("Remembering IP address of scheduler (" + ip + "), and fqdn: " + fqdn)
 
-  print "Connecting to nexus master %s" % args[0]
+  monitorlog.info("Connecting to nexus master %s" % args[0])
 
   sched = MyScheduler(fqdn)
   threading.Thread(target = monitor, args=[sched]).start()
 
   nexus.NexusSchedulerDriver(sched, args[0]).run()
 
-  print "Finished!"
+  monitorlog.info("Finished!")