You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2015/12/11 15:18:06 UTC
svn commit: r1719426 -
/uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy
Author: challngr
Date: Fri Dec 11 14:18:06 2015
New Revision: 1719426
URL: http://svn.apache.org/viewvc?rev=1719426&view=rev
Log:
UIMA-4577 Redo rm_qoccupancy using cqlsh because it's so much faster than
spawning java!
Modified:
uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy
Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy?rev=1719426&r1=1719425&r2=1719426&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy (original)
+++ uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy Fri Dec 11 14:18:06 2015
@@ -37,12 +37,12 @@ class DuccRmQOccupancy(DuccUtil):
status = 'up'
else:
status = 'down'
- print "%20s %11s %6s %6s %15s %10s %3s(Q) %6s %6s %8s %7s %10s %8s" % (n['name'], n['blacklisted'], n['online'], status, n['nodepool'],
+ print "%20s %11s %6s %6s %15s %10s %3s(Q) %6s %6s %8s %7s %10s %-8s" % (n['name'], n['blacklisted'], n['online'], status, n['nodepool'],
n['memory'], n['share_order'], n['shares_left'], n['assignments'],
n['np_assignments'], n['quantum'], n['reservable'], n['classes'])
if ( shares.has_key(n['name']) ):
for s in shares[n['name']]:
- fmt = '%19s ' + s['jobtype'] +'[%8d] S[%8d] O[%d] II[%8d] IR[%8d] E[%5s] P[%5s] F[%5s] S[%10s]'
+ fmt = '%19s ' + s['jobtype'] +'[%8s] S[%8s] O[%s] II[%8s] IR[%8s] E[%5s] P[%5s] F[%5s] S[%10s]'
state = s['state']
if ( state == 'null' ):
state = "Assigned"
@@ -50,13 +50,66 @@ class DuccRmQOccupancy(DuccUtil):
print ''
-
- # Given DUCC_HOME, a directory, and part of the name of a jar, find the actual name of the jar which will
- # likeley be versioned
- def resolve_jar(self, DH, dirname, basename):
- partial = DH + '/' + dirname + '/'+ basename + '*'
- ret = glob.glob(partial)
- return ret[0]
+
+ def parse_header(self, header):
+ ret = []
+ parts = header.split('|')
+ for p in parts:
+ ret.append(p.strip())
+ return ret
+
+ def parse_node(self, header, line):
+ parts = line.split('|')
+ ret = {}
+ for k, v in zip(header, parts):
+ ret[k] = v.strip()
+ return ret
+
+ def parse_share(self, header, line):
+ parts = line.split('|')
+ ret = {}
+ for k, v in zip(header, parts):
+ ret[k] = v.strip()
+ return ret
+
+
+ def rmnodes(self, lines):
+ nodes = []
+ shares = {}
+ header = []
+ for l in lines:
+ l = l.strip()
+ # print '[]', l
+ if ( l == '' ):
+ continue
+ if ( '---' in l ):
+ continue;
+ if ( 'rows)' in l ):
+ continue;
+ if ( 'assignments' in l ):
+ doing_nodes = True
+ doing_shares = False
+ header = self.parse_header(l)
+ continue
+ if ( 'investment' in l ):
+ doing_nodes = False
+ doing_shares = True
+ header = self.parse_header(l)
+ continue
+ if ( doing_nodes ):
+ nodes.append(self.parse_node(header, l))
+ continue
+ if ( doing_shares ):
+ s = self.parse_share(header, l)
+ k = s['node']
+ if ( shares.has_key(k) ):
+ share_list = shares[k]
+ else:
+ share_list = []
+ shares[k] = share_list
+ share_list.append(s)
+ continue
+ return nodes, shares
def main(self, argv):
@@ -65,53 +118,20 @@ class DuccRmQOccupancy(DuccUtil):
sys.exit(1);
DH = self.DUCC_HOME
- CP = [self.resolve_jar(DH, '/lib/uima-ducc', 'uima-ducc-database'),
- self.resolve_jar(DH, '/lib/uima-ducc', 'uima-ducc-common'),
- DH + '/lib/cassandra/*',
- DH + '/lib/apache-log4j/*',
- DH + '/lib/guava/*',
- self.resolve_jar(DH, '/cassandra-server/lib', 'slf4j-api'),
- self.resolve_jar(DH, '/apache-uima/apache-activemq/lib', 'slf4j-log4j12'),
- ]
- os.environ['CLASSPATH'] = ':'.join(CP)
-
- DUCC_JVM_OPTS = ''
- DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME
- DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.rm.persistence.impl=org.apache.uima.ducc.database.RmStatePersistence'
-
dbn = self.ducc_properties.get('ducc.database.host')
- CMD = [self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.database.RmShareState', dbn]
- CMD = ' '.join(CMD)
- lines = ''
- proc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, shell=True)
- for line in proc.stdout:
- lines = lines + line
- shares = eval(lines)
-
- shares_by_machine = {}
- for share in shares:
- k = share['node']
- if ( shares_by_machine.has_key(k) ):
- share_list = shares_by_machine[k]
- else:
- share_list = []
- shares_by_machine[k] = share_list
- share_list.append(share)
-
- CMD = [self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.database.RmNodeState', dbn]
+ CMD = [DH + '/cassandra-server/bin/cqlsh', dbn, '-u', 'guest', '-p', 'guest', '-e', '"select * from ducc.rmnodes; select * from ducc.rmshares;"']
CMD = ' '.join(CMD)
- lines = ''
+
+ lines = []
proc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, shell=True)
for line in proc.stdout:
- lines = lines + line
+ # print line.strip()
+ lines.append(line)
+
+ nodes, shares = self.rmnodes(lines)
+ self.format(nodes, shares)
- nodes = eval(lines)
- nodes = sorted(nodes, key=lambda n: n["name"])
-
- self.format(nodes, shares_by_machine)
-
- return
if __name__ == "__main__":
stopper = DuccRmQOccupancy()