You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:26:43 UTC
svn commit: r1131588 - in /incubator/mesos/trunk:
frameworks/torque/torquesched.py
src/ec2/deploy.karmic64/root/nexus-ec2/setup-torque
Author: benh
Date: Sun Jun 5 03:26:43 2011
New Revision: 1131588
URL: http://svn.apache.org/viewvc?rev=1131588&view=rev
Log:
Updates to the torque setup script and framework.
Modified:
incubator/mesos/trunk/frameworks/torque/torquesched.py
incubator/mesos/trunk/src/ec2/deploy.karmic64/root/nexus-ec2/setup-torque
Modified: incubator/mesos/trunk/frameworks/torque/torquesched.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/frameworks/torque/torquesched.py?rev=1131588&r1=1131587&r2=1131588&view=diff
==============================================================================
--- incubator/mesos/trunk/frameworks/torque/torquesched.py (original)
+++ incubator/mesos/trunk/frameworks/torque/torquesched.py Sun Jun 5 03:26:43 2011
@@ -6,6 +6,7 @@ import sys
import time
import httplib
import Queue
+import threading
from optparse import OptionParser
from subprocess import *
@@ -14,18 +15,6 @@ from socket import gethostname
SAFE_ALLOCATION = {"cpus":5,"mem":134217728} #just set statically for now, 128MB
MIN_SLOT_SIZE = {"cpus":"1","mem":1073741824} #1GB
-TORQUE_DL_URL = "http://www.clusterresources.com/downloads/torque/torque-2.4.6.tar.gz"
-TORQUE_UNCOMPRESSED_DIR = "torque-2.4.6"
-
-TORQUE_CFG = "/var/spool/torque"
-PBS_SERVER_CONF_FILE = TORQUE_CFG + "/server_priv/nodes" #hopefully can use qmgr and not edit this
-
-TORQUE_INSTALL_DIR = "/usr/local/sbin"
-PBS_SERVER_EXE = TORQUE_INSTALL_DIR + "/pbs_server"
-SCHEDULER_EXE = TORQUE_INSTALL_DIR + "/pbs_sched"
-QMGR_EXE = "/usr/local/bin/qmgr"
-
-
class MyScheduler(nexus.Scheduler):
def __init__(self, ip):
nexus.Scheduler.__init__(self)
@@ -45,38 +34,26 @@ class MyScheduler(nexus.Scheduler):
print "Nexus torque+pbs scheduler registered as framework #%s" % fid
- #DESIGN PLAN:
- #
- #for each slot in the offer
- # if we are at safe allocation, don't accept any more of these slot offers
- # else if the slot is not >= min-slot size, reject
- # else if we have already set up some resources on this machine and started
- # pbs_mom on it, don't accept more resources.
- # TODO: Eventually, set up max and min resources per compute node and
- # accept resources on an existing compute node and just update
- # the config settings for the torque daemon on that node to use
- # more resources on it.
- # else accept the offer and launch pbs_mom on the node
def resourceOffer(self, driver, oid, slave_offers):
print "Got slot offer %d" % oid
tasks = []
for offer in slave_offers:
- # for a first step, just keep track of whether we have started a pbs_mom
- # on this node before or not, if not then accept the slot and launch one
- if not offer.host in self.servers.values():
- #TODO: check to see if slot is big enough
- print "setting up params"
- #print "params = cpus: "# + MIN_SLOT_SIZE["cpus"]# + ", mem: " + MIN_SLOT_SIZE["mem"]
- params = {"cpus": "%d" % 1, "mem": "%d" % 1073741824}
- td = nexus.TaskDescription(
- self.id, offer.slaveId, "task %d" % self.id, params, "")
- tasks.append(td)
- self.servers[self.id] = offer.host
- regComputeNode(offer.host)
- self.id += 1
- print "self.id now set to " + str(self.id)
- else:
- print "Rejecting slot because we've aleady launched pbs_mom on that node"
+ # if we haven't registered this node, accept slot & register w pbs_server# #TODO: check to see if slot is big enough
+ if not offer.host in self.servers.values():
+ print "Rejecting slot, already registered node " + offer.host
+ continue
+ if len(self.servers) >= SAFE_ALLOCATION:
+ print "Rejecting slot, already at safe allocation"
+ continue
+ print "Accepting slot, setting up params for it..."
+ params = {"cpus": "%d" % 1, "mem": "%d" % 1073741824}
+ td = nexus.TaskDescription(
+ self.id, offer.slaveId, "task %d" % self.id, params, "")
+ tasks.append(td)
+ self.servers[self.id] = offer.host
+ regComputeNode(offer.host)
+ self.id += 1
+ print "self.id now set to " + str(self.id)
driver.replyToOffer(oid, tasks, {"timeout": "-1"})
# def statusUpdate(self, status):
@@ -88,35 +65,38 @@ class MyScheduler(nexus.Scheduler):
# self.reviveOffers()
def regComputeNode(new_node):
- print "in reg"
- print "chdir"
- os.chdir(TORQUE_CFG)
- # first grep the server conf file to be sure this node isn't already
- # registered with the server
- f = open(PBS_SERVER_CONF_FILE, 'r')
- for line in f:
- if line.find(new_node) != -1:
- print "ERROR! Trying to register compute node which "\
- "is already registered, aborting node register"
- return
- f.close()
+ print "registering new compute node, "+new_node+", with pbs_server"
- #add node to server
- print("adding a node to pbs_server using: qmgr -c create node " + new_node)
- Popen(QMGR_EXE + ' "-c create node ' + new_node + '"',shell=True, stdout=PIPE).stdout
+ print "checking to see if node is registered with server already"
+ nodes = Popen("pbsnodes", shell=True, stdout=PIPE).stdout.readline()
+ print "output of pbsnodes command is: " + nodes
+
+ if nodes.find(new_node) != -1:
+ print "Warn: tried to register a node that is already registered, skipping"
+ return
-# #add line to server_priv/nodes file for new_node
-# f = open(PBS_SERVER_CONF_FILE,'a')
-# f.write(new_node+"\n")
-# f.close
+ #add node to server
+ print "registering node w/ pbs_server using: qmgr -c create node " + new_node
+ Popen("qmgr -c \"create node " + new_node + "\"", shell=True, stdout=PIPE)
def unregComputeNode(node_name):
#remove node from server
- print("removing a node from pbs_server using: qmgr -c delete node " + node)
+ print("removing node from pbs_server using: qmgr -c delete node " + node)
print Popen(QMGR_EXE + ' "-c delete node ' + node + '"').stdout
+
def getFrameworkName(self, driver):
return "Nexus torque Framework"
+def monitor(sched):
+ while True:
+ time.sleep(1)
+ try:
+ print "checking to see if queue is empty"
+ print Popen("qstat",shell=True,stdout=PIPE).stdout.readline()
+ except Exception, e:
+ print "error running/parsing qstat"
+ continue
+
if __name__ == "__main__":
parser = OptionParser(usage = "Usage: %prog nexus_master")
@@ -126,31 +106,14 @@ if __name__ == "__main__":
print >> sys.stderr, "Use --help to show usage."
exit(2)
- print "setting up pbs_server"
-
- #if torque isn't installed, install our own
- #print "Torque doesn't seem to be installed. Downloading and installing it"
- #Popen("curl " + TORQUE_DL_URL + " > torque.tgz)
- #Popen("tar xzf torque.tgz")
- #os.chdir(TORQUE_UNCOMPRESSED_DIR)
- #Popen(os.getcwd()+"/configure --prefix=/usr/local")
- #Popen("make")
- #Popen("make install")
- #Popen(os.getcwd()+"torque.setup root")
-
- try:
- pbs_server = Popen(PBS_SERVER_EXE)
- except OSError,e:
- print >>sys.stderr, "Error starting pbs server"
- print >>sys.stderr, e
- exit(2)
+ print "running qterm"
+ Popen("qterm", shell=True).wait()
- try:
- pbs_scheduler = Popen(SCHEDULER_EXE)
- except OSError,e:
- print >>sys.stderr, "Error starting scheduler"
- print >>sys.stderr, e
- exit(2)
+ print "starting pbs_server"
+ Popen("/etc/init.d/pbs_server start", shell=True)
+
+ print "starting pbs_scheduler"
+ Popen("/etc/init.d/pbs_sched start", shell=True)
ip = Popen("hostname -i", shell=True, stdout=PIPE).stdout.readline().rstrip()
print "Remembering IP address of scheduler (" + ip + ")"
@@ -158,9 +121,8 @@ if __name__ == "__main__":
print "Connecting to nexus master %s" % args[0]
sched = MyScheduler(ip)
- nexus.NexusSchedulerDriver(sched, args[0]).run()
+ threading.Thread(target = monitor, args=[sched]).start()
- #WARNING: this is new in python 2.6
- pbs_server.terminate() #don't leave pbs_server running, not sure if this works
+ nexus.NexusSchedulerDriver(sched, args[0]).run()
print "Finished!"
Modified: incubator/mesos/trunk/src/ec2/deploy.karmic64/root/nexus-ec2/setup-torque
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ec2/deploy.karmic64/root/nexus-ec2/setup-torque?rev=1131588&r1=1131587&r2=1131588&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ec2/deploy.karmic64/root/nexus-ec2/setup-torque (original)
+++ incubator/mesos/trunk/src/ec2/deploy.karmic64/root/nexus-ec2/setup-torque Sun Jun 5 03:26:43 2011
@@ -14,25 +14,48 @@ SLAVES="`cat $SLAVES_FILE`"
#install torque: download/unzip torque
function installmaster {
pushd ~
+ echo "downloading and installing torque on master"
wget http://www.clusterresources.com/downloads/torque/torque-2.4.7.tar.gz
tar xzf torque-2.4.7.tar.gz
cd torque-2.4.7
./configure --prefix=/usr/local
make -j8
make install
- make packages
+ ./torque.setup `whoami` #or should this just be root
+
+ echo "copying init.d control scripts to master"
+ cp contrib/init.d/debian.pbs_mom /nfs/torque/init.d/pbs_mom
+ cp contrib/init.d/debian.pbs_sched /nfs/torque/init.d/pbs_sched
+ cp contrib/init.d/debian.pbs_server /nfs/torque/init.d/pbs_server
- #initialize/configure torque
+ echo "running ldconfig on master"
ldconfig
popd
}
function installslaves {
pushd ~/torque-2.4.7
+ echo "building packages for slave"
+ make packages
#install torque-mom on slave nodes
apt-get install -y dsh
- cp torque-package-mom-linux-x86_64.sh /nfs/torque-package-mom-linux-x86_64.sh
- dsh -f $SLAVES_FILE /nfs/torque-package-mom-linux-x86_64.sh --install
+
+ echo "copying slave install packages to nfs"
+ mkdir /nfs/torque
+ cp torque-package-mom-linux-x86_64.sh /nfs/torque/torque-package-mom-linux-x86_64.sh
+ cp torque-package-mom-linux-x86_64.sh /nfs/torque/torque-package-clients-linux-x86_64.sh
+
+ echo "installing torque mom and clients package on slaves"
+ dsh -f $SLAVES_FILE /nfs/torque/torque-package-mom-linux-x86_64.sh --install
+ dsh -f $SLAVES_FILE /nfs/torque/torque-package-clients-linux-x86_64.sh --install
+
+ echo "copying pbs_mom init.d control script to slaves"
+ mkdir /nfs/torque/init.d
+ cp contrib/init.d/debian.pbs_mom /nfs/torque/init.d/debian.pbs_mom
+ dsh -f $SLAVES_FILE cp /nfs/torque/init.d/debian.pbs_mom /etc/init.d/pbs_mom
+
+ echo "Running ldconfig on slaves"
+ dsh -f $SLAVES_FILE ldconfig
popd
}