You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 06:57:35 UTC
svn commit: r1131686 - in /incubator/mesos/trunk/frameworks/torque:
torquelib.py torquesched.py
Author: benh
Date: Sun Jun 5 04:57:35 2011
New Revision: 1131686
URL: http://svn.apache.org/viewvc?rev=1131686&view=rev
Log:
Added python logging so that subsets of logging can be turned on and off.
Modified:
incubator/mesos/trunk/frameworks/torque/torquelib.py
incubator/mesos/trunk/frameworks/torque/torquesched.py
Modified: incubator/mesos/trunk/frameworks/torque/torquelib.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/torque/torquelib.py?rev=1131686&r1=1131685&r2=1131686&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/torque/torquelib.py (original)
+++ incubator/mesos/trunk/frameworks/torque/torquelib.py Sun Jun 5 04:57:35 2011
@@ -1,12 +1,15 @@
import re
import tempfile
+import logging
import xml.dom.minidom
from subprocess import *
+logging.basicConfig()
+
class Job:
def __init__(self, xmlJobElt):
- #print "creating a new Job obj out of xml"
+ #logging.debug("creating a new Job obj out of xml")
assert xmlJobElt.nodeName == "Job", "xml element passed to Job constructor was not named 'Job'"
self.resourceList = {}
for res in xmlJobElt.getElementsByTagName("Resource_List")[0].childNodes:
@@ -14,37 +17,37 @@ class Job:
self.jobState = xmlJobElt.getElementsByTagName("job_state")[0].childNodes[0].data
def getActiveJobs():
- print "in getJobs, grabbing xml output from qstat"
+ logging.debug("in getJobs, grabbing xml output from qstat")
qstat_out = tempfile.TemporaryFile()
qstat_obj = Popen("qstat -x", shell=True, stdout=qstat_out)
qstat_obj.wait()
- print "output of qstat: "
+ logging.debug("output of qstat: ")
jobs = []
jobsExist = False
qstat_out.seek(0)
for line in qstat_out:
if re.match(".*Job.*", line):
- #print "#############" + line + "#############"
+ #logging.debug("#############" + line + "#############")
jobsExist = True
break
if jobsExist:
qstat_out.seek(0)
dom_doc = xml.dom.minidom.parse(qstat_out)
- print "grabbing the Job elements from the xml dom doc"
+ logging.debug("grabbing the Job elements from the xml dom doc")
xmljobs = dom_doc.getElementsByTagName("Job")
- print "creating a new job object for each job dom elt"
+ logging.debug("creating a new job object for each job dom elt")
for j in xmljobs:
#make sure job's state is not complete
newJob = Job(j)
if newJob.jobState != "C":
jobs.append(newJob)
if len(jobs) == 0:
- print "the string \"Job\" was not found in the qstat -x command output"
+ logging.debug("the string \"Job\" was not found in the qstat -x command output")
return jobs
class Node:
def __init__(self, xmlHostElt):
- #print "creating a new Host obj out of xml"
+ #logging.debug("creating a new Host obj out of xml")
assert xmlHostElt.nodeName == "Node", "xml element passed to Node constructor was not named 'Node'"
self.name = xmlHostElt.getElementsByTagName("name")[0].childNodes[0].data
@@ -66,7 +69,7 @@ class Node:
#returns nodes whose state is not marked as "down"
def getNodes():
- print "in getNodes, grabbing xml output from pbsnodes"
+ logging.debug("in getNodes, grabbing xml output from pbsnodes")
pbsnodes_out = tempfile.TemporaryFile()
pbsnodes_obj = Popen("pbsnodes -x", shell=True, stdout=pbsnodes_out)
pbsnodes_obj.wait()
@@ -74,26 +77,26 @@ def getNodes():
nodesExist = False
pbsnodes_out.seek(0)
for line in pbsnodes_out:
- #print "node line is %s" % line
+ #logging.debug("node line is %s" % line)
if re.match(".*Node.*", line):
nodesExist = True
break
if nodesExist:
pbsnodes_out.seek(0)
dom_doc = xml.dom.minidom.parse(pbsnodes_out)
- print "grabbing the Job elements from the xml dom doc"
+ logging.debug("grabbing the Job elements from the xml dom doc")
xmlnodes = dom_doc.getElementsByTagName("Node")
- print "creating a new node object for each node dom elt"
+ logging.debug("creating a new node object for each node dom elt")
for n in xmlnodes:
#make sure node's state is online
nodes.append(Node(n))
if len(nodes) == 0:
- print "the string \"Node\" was not found in the pbsnodes -x command output"
+ logging.debug("the string \"Node\" was not found in the pbsnodes -x command output")
return nodes
#TODO: DELETE THIS? Might note be used eventually
def getQueueLength():
- #print "computing the number of active jobs in the queue"
+ #logging.debug("computing the number of active jobs in the queue")
qstat = Popen("qstat -Q",shell=True,stdout=PIPE).stdout
jobcount = 0
for line in qstat:
Modified: incubator/mesos/trunk/frameworks/torque/torquesched.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/torque/torquesched.py?rev=1131686&r1=1131685&r2=1131686&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/torque/torquesched.py (original)
+++ incubator/mesos/trunk/frameworks/torque/torquesched.py Sun Jun 5 04:57:35 2011
@@ -10,16 +10,32 @@ import threading
import re
import socket
import torquelib
+import datetime
+import logging
+import logging.handlers
from optparse import OptionParser
from subprocess import *
from socket import gethostname
PBS_SERVER_FILE = "/var/spool/torque/server_name"
+EVENT_LOG_FILE = "log_fw_utilization.txt"
SAFE_ALLOCATION = {"cpus":5,"mem":134217728} #just set statically for now, 128MB
MIN_SLOT_SIZE = {"cpus":"1","mem":1073741824} #1GB
+eventlog = logging.getLogger("event_logger")
+eventlog.setLevel(logging.DEBUG)
+fh = logging.FileHandler(EVENT_LOG_FILE,'w') #create handler
+fh.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
+eventlog.addHandler(fh)
+
+driverlog = logging.getLogger("driver_logger")
+driverlog.setLevel(logging.INFO)
+
+monitorlog = logging.getLogger("monitor_logger")
+monitorlog.setLevel(logging.INFO)
+
class MyScheduler(nexus.Scheduler):
def __init__(self, ip):
nexus.Scheduler.__init__(self)
@@ -29,35 +45,35 @@ class MyScheduler(nexus.Scheduler):
self.servers = {}
self.overloaded = False
self.numToRegister = 1
-
+
def getExecutorInfo(self, driver):
execPath = os.path.join(os.getcwd(), "start_pbs_mom.sh")
initArg = self.ip # tell executor which node the pbs_server is running on
- print "in getExecutorInfo, setting execPath = " + execPath + " and initArg = " + initArg
+ driverlog.info("in getExecutorInfo, setting execPath = " + execPath + " and initArg = " + initArg)
return nexus.ExecutorInfo(execPath, initArg)
def registered(self, driver, fid):
- print "Nexus torque+pbs scheduler registered as framework #%s" % fid
+ driverlog.info("Nexus torque framwork registered with frameworkID %s" % fid)
def resourceOffer(self, driver, oid, slave_offers):
self.driver = driver
- print "Got slot offer %d" % oid
+ driverlog.debug("Got slot offer %d" % oid)
self.lock.acquire()
- print "resourceOffer() acquired lock"
+ driverlog.debug("resourceOffer() acquired lock")
tasks = []
for offer in slave_offers:
# if we haven't registered this node, accept slot & register w pbs_server
#TODO: check to see if slot is big enough
if self.numToRegister <= 0:
- print "Rejecting slot, no need for more slaves"
+ driverlog.debug("Rejecting slot, no need for more slaves")
continue
if offer.host in self.servers.values():
- print "Rejecting slot, already registered node " + offer.host
+ driverlog.debug("Rejecting slot, already registered node " + offer.host)
continue
if len(self.servers) >= SAFE_ALLOCATION["cpus"]:
- print "Rejecting slot, already at safe allocation (i.e. %d CPUS)" % SAFE_ALLOCATION["cpus"]
+ driverlog.debug("Rejecting slot, already at safe allocation (i.e. %d CPUS)" % SAFE_ALLOCATION["cpus"])
continue
- print "Need %d more nodes, so accepting slot, setting up params for it..." % self.numToRegister
+ driverlog.info("Need %d more nodes, so accepting slot, setting up params for it..." % self.numToRegister)
params = {"cpus": "1", "mem": "1073741824"}
td = nexus.TaskDescription(
self.id, offer.slaveId, "task %d" % self.id, params, "")
@@ -66,59 +82,64 @@ class MyScheduler(nexus.Scheduler):
self.regComputeNode(offer.host)
self.numToRegister -= 1
self.id += 1
- print "self.id now set to " + str(self.id)
- print "---"
+ driverlog.info("writing logfile")
+ eventlog.info(len(self.servers))
+ driverlog.info("done writing logfile")
+ driverlog.info("self.id now set to " + str(self.id))
+ #print "---"
driver.replyToOffer(oid, tasks, {"timeout": "1"})
self.lock.release()
- print "resourceOffer() finished, released lock"
- print "\n"
+ driverlog.debug("resourceOffer() finished, released lock\n\n")
def statusUpdate(self, driver, status):
- print "got status update from TID %s, state is: %s, data is: %s" %(status.taskId,status.state,status.data)
+ driverlog.info("got status update from TID %s, state is: %s, data is: %s" %(status.taskId,status.state,status.data))
def regComputeNode(self, new_node):
- print "registering new compute node, "+new_node+", with pbs_server"
- print "checking to see if node is registered with server already"
+ driverlog.info("registering new compute node, "+new_node+", with pbs_server")
+ driverlog.info("checking to see if node is registered with server already")
#nodes = Popen("qmgr -c 'list node'", shell=True, stdout=PIPE).stdout
nodes = Popen("pbsnodes", shell=True, stdout=PIPE).stdout
- print "output of pbsnodes command is: "
+ driverlog.info("output of pbsnodes command is: ")
for line in nodes:
- print line
+ driverlog.info(line)
if line.find(new_node) != -1:
- print "Warn: tried to register node that's already registered, skipping"
+ driverlog.info("Warn: tried to register node that's already registered, skipping")
return
#add node to server
- print "registering node with command: qmgr -c create node " + new_node
+ driverlog.info("registering node with command: qmgr -c create node " + new_node)
qmgr_add = Popen("qmgr -c \"create node " + new_node + "\"", shell=True, stdout=PIPE).stdout
- print "output of qmgr:"
- for line in qmgr_add: print line
+ driverlog.info("output of qmgr:")
+ for line in qmgr_add: driverlog.info(line)
def unregComputeNode(self, node_name):
#remove node from server
- print("removing node from pbs_server: qmgr -c delete node " + node_name)
- print Popen('qmgr -c "delete node ' + node_name + '"', shell=True, stdout=PIPE).stdout
+ monitorlog.info("removing node from pbs_server: qmgr -c delete node " + node_name)
+ monitorlog.info(Popen('qmgr -c "delete node ' + node_name + '"', shell=True, stdout=PIPE).stdout)
#unreg up to N random compute nodes, leave at least one
- def unregNNodes(self, num_nodes):
- print "unregNNodes called with arg %d" % num_nodes
- if num_nodes > len(self.servers)-1:
- print "... however, only unregistering %d nodes, leaving one alive" % (len(self.servers)-1)
- print "getting and filtering list of nodes using torquelib"
+ def unregNNodes(self, numNodes):
+ monitorlog.debug("unregNNodes called with arg %d" % numNodes)
+ if numNodes > len(self.servers)-1:
+ monitorlog.debug("... however, only unregistering %d nodes, leaving one alive" % (len(self.servers)-1))
+ toKill = (len(self.servers)-1)
+
+ monitorlog.debug("getting and filtering list of nodes using torquelib")
noJobs = lambda x: x.status.has_key("jobs") == False or (x.status.has_key("jobs") == True and x.status["jobs"] == "")
inactiveNodes = map(lambda x: x.name,filter(noJobs, torquelib.getNodes()))
- print "victim pool of inactive nodes:"
+ monitorlog.debug("victim pool of inactive nodes:")
for inode in inactiveNodes:
- print inode
+ monitorlog.debug(inode)
for tid, hostname in self.servers.items():
- if len(self.servers) > 1 and num_nodes > 0 and hostname in inactiveNodes:
- print "we still have to kill %d of the %d compute nodes which master is tracking" % (num_nodes, len(self.servers))
- print "unregistering node " + str(hostname)
+ if len(self.servers) > 1 and toKill > 0 and hostname in inactiveNodes:
+ monitorlog.info("We still have to kill %d of the %d compute nodes which master is tracking" % (toKill, len(self.servers)))
+ monitorlog.info("unregistering node " + str(hostname))
self.unregComputeNode(hostname)
self.servers.pop(tid)
- num_nodes = num_nodes - 1
- print "killing corresponding task with tid %d" % tid
+ toKill = toKill - 1
+ monitorlog.info("killing corresponding task with tid %d" % tid)
self.driver.killTask(tid)
- #handle the case where we didn't kill enough nodes
+ if toKill > 0:
+ monitorlog.warn("Done killing. We were supposed to kill %d nodes, but only found and killed %d free nodes" % (numNodes, numNodes-toKill))
def getFrameworkName(self, driver):
return "Nexus Torque Framework"
@@ -127,29 +148,29 @@ class MyScheduler(nexus.Scheduler):
def monitor(sched):
while True:
time.sleep(1)
- print "monitor thread acquiring lock"
+ monitorlog.debug("monitor thread acquiring lock")
sched.lock.acquire()
- print "computing num nodes needed to satisfy eligable jobs in queue"
+ monitorlog.debug("computing num nodes needed to satisfy eligable jobs in queue")
needed = 0
jobs = torquelib.getActiveJobs()
- print "retreived jobs in queue, count: %d" % len(jobs)
+ monitorlog.debug("retreived jobs in queue, count: %d" % len(jobs))
for j in jobs:
#WARNING: this check should only be used if torque is using fifo queue
#if needed + j.needsnodes <= SAFE_ALLOCATION:
- print "job resource list is: " + str(j.resourceList)
+ monitorlog.debug("job resource list is: " + str(j.resourceList))
needed += int(j.resourceList["nodect"])
- print "number of nodes needed by jobs in queue: %d" % needed
+ monitorlog.debug("number of nodes needed by jobs in queue: %d" % needed)
numToRelease = len(sched.servers) - needed
- print "number of nodes to release is %d - %d" % (len(sched.servers),needed)
+ monitorlog.debug("number of nodes to release is %d - %d" % (len(sched.servers),needed))
if numToRelease > 0:
sched.unregNNodes(numToRelease)
sched.numToRegister = 0
else:
- print "monitor updating sched.numToRegister from %d to %d" % (sched.numToRegister, numToRelease * -1)
+ monitorlog.debug("monitor updating sched.numToRegister from %d to %d" % (sched.numToRegister, numToRelease * -1))
sched.numToRegister = numToRelease * -1
sched.lock.release()
- print "monitor thread releasing lock"
- print "\n"
+ monitorlog.debug("monitor thread releasing lock")
+ monitorlog.debug("\n")
if __name__ == "__main__":
parser = OptionParser(usage = "Usage: %prog nexus_master")
@@ -163,22 +184,21 @@ if __name__ == "__main__":
fqdn = socket.getfqdn()
ip = socket.gethostbyname(gethostname())
- print "running killall pbs_server"
+ monitorlog.info("running killall pbs_server")
Popen("killall pbs_server", shell=True)
time.sleep(1)
- print "writing $(TORQUECFG)/server_name file with fqdn of pbs_server: " + fqdn
+ monitorlog.info("writing $(TORQUECFG)/server_name file with fqdn of pbs_server: " + fqdn)
FILE = open(PBS_SERVER_FILE,'w')
FILE.write(fqdn)
FILE.close()
- print "starting pbs_server"
+ monitorlog.info("starting pbs_server")
#Popen("/etc/init.d/pbs_server start", shell=True)
Popen("pbs_server", shell=True)
time.sleep(2)
- print "\n"
- print "running command: qmgr -c \"set queue batch resources_available.nodes=%s\"" % SAFE_ALLOCATION["cpus"]
+ monitorlog.info("running command: qmgr -c \"set queue batch resources_available.nodes=%s\"" % SAFE_ALLOCATION["cpus"])
Popen("qmgr -c \"set queue batch resources_available.nodect=%s\"" % SAFE_ALLOCATION["cpus"], shell=True)
Popen("qmgr -c \"set server resources_available.nodect=%s\"" % SAFE_ALLOCATION["cpus"], shell=True)
@@ -188,40 +208,37 @@ if __name__ == "__main__":
outp = Popen("qmgr -c \"l queue batch\"", shell=True, stdout=PIPE).stdout
for l in outp:
- print l
+ monitorlog.info(l)
- print "RE-killing pbs_server for resources_available setting to take effect"
+ monitorlog.info("RE-killing pbs_server for resources_available setting to take effect")
#Popen("/etc/init.d/pbs_server start", shell=True)
Popen("qterm", shell=True)
#time.sleep(1)
- print("\n")
- print "RE-starting pbs_server for resources_available setting to take effect"
+ monitorlog.info("RE-starting pbs_server for resources_available setting to take effect")
Popen("pbs_server", shell=True)
- "qmgr list queue settings: "
+ monitorlog.debug("qmgr list queue settings: ")
output = Popen("qmgr -c 'l q batch'", shell=True, stdout=PIPE).stdout
for line in output:
- print line
- print("\n")
+ monitorlog.debug(line)
- print "running killall pbs_sched"
+ monitorlog.info("running killall pbs_sched")
Popen("killall pbs_sched", shell=True)
#time.sleep(2)
- print("\n")
- print "starting pbs_scheduler"
+ monitorlog.info("starting pbs_scheduler")
#Popen("/etc/init.d/pbs_sched start", shell=True)
Popen("pbs_sched", shell=True)
#ip = Popen("hostname -i", shell=True, stdout=PIPE).stdout.readline().rstrip() #linux
#ip = Popen("ifconfig en1 | awk '/inet / { print $2 }'", shell=True, stdout=PIPE).stdout.readline().rstrip() # os x
- print "Remembering IP address of scheduler (" + ip + "), and fqdn: " + fqdn
+ monitorlog.info("Remembering IP address of scheduler (" + ip + "), and fqdn: " + fqdn)
- print "Connecting to nexus master %s" % args[0]
+ monitorlog.info("Connecting to nexus master %s" % args[0])
sched = MyScheduler(fqdn)
threading.Thread(target = monitor, args=[sched]).start()
nexus.NexusSchedulerDriver(sched, args[0]).run()
- print "Finished!"
+ monitorlog.info("Finished!")