You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2015/12/11 15:18:06 UTC

svn commit: r1719426 - /uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy

Author: challngr
Date: Fri Dec 11 14:18:06 2015
New Revision: 1719426

URL: http://svn.apache.org/viewvc?rev=1719426&view=rev
Log:
UIMA-4577 Redo rm_qoccupancy using cqlsh because it's so much faster than 
          spawning java!

Modified:
    uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy

Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy?rev=1719426&r1=1719425&r2=1719426&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy (original)
+++ uima/sandbox/uima-ducc/trunk/src/main/admin/rm_qoccupancy Fri Dec 11 14:18:06 2015
@@ -37,12 +37,12 @@ class DuccRmQOccupancy(DuccUtil):
                 status = 'up'
             else:
                 status = 'down'
-            print "%20s %11s %6s %6s %15s %10s %3s(Q) %6s %6s %8s %7s %10s  %8s" %  (n['name'], n['blacklisted'], n['online'], status, n['nodepool'], 
+            print "%20s %11s %6s %6s %15s %10s %3s(Q) %6s %6s %8s %7s %10s  %-8s" %  (n['name'], n['blacklisted'], n['online'], status, n['nodepool'], 
                                                                              n['memory'], n['share_order'], n['shares_left'], n['assignments'], 
                                                                              n['np_assignments'], n['quantum'], n['reservable'], n['classes'])
             if ( shares.has_key(n['name']) ):
                 for s in shares[n['name']]:
-                    fmt = '%19s ' + s['jobtype'] +'[%8d] S[%8d] O[%d] II[%8d] IR[%8d] E[%5s] P[%5s] F[%5s] S[%10s]'
+                    fmt = '%19s ' + s['jobtype'] +'[%8s] S[%8s] O[%s] II[%8s] IR[%8s] E[%5s] P[%5s] F[%5s] S[%10s]'
                     state = s['state']
                     if ( state == 'null' ):
                         state = "Assigned"
@@ -50,13 +50,66 @@ class DuccRmQOccupancy(DuccUtil):
 
                 print ''
 
-    
-    # Given DUCC_HOME, a directory, and part of the name of a jar, find the actual name of the jar which will
-    # likeley be versioned
-    def resolve_jar(self, DH, dirname, basename):
-        partial = DH + '/' + dirname + '/'+ basename + '*'
-        ret = glob.glob(partial)
-        return ret[0]
+
+    def parse_header(self, header):
+        ret = []
+        parts = header.split('|')
+        for p in parts:
+            ret.append(p.strip())
+        return ret
+
+    def parse_node(self, header, line):
+        parts = line.split('|')
+        ret = {}
+        for k, v in zip(header, parts):
+            ret[k] = v.strip()
+        return ret
+
+    def parse_share(self, header, line):
+        parts = line.split('|')
+        ret = {}
+        for k, v in zip(header, parts):
+            ret[k] = v.strip()
+        return ret
+
+
+    def rmnodes(self, lines):
+        nodes = []
+        shares = {}
+        header = []
+        for l in lines:
+            l = l.strip()
+            # print '[]', l
+            if ( l == '' ):
+                continue
+            if ( '---' in l ):
+                continue;
+            if ( 'rows)' in l ):
+                continue;
+            if ( 'assignments' in l ):
+                doing_nodes = True
+                doing_shares = False
+                header = self.parse_header(l)
+                continue
+            if ( 'investment' in l ):
+                doing_nodes = False
+                doing_shares = True
+                header = self.parse_header(l)
+                continue
+            if ( doing_nodes ):
+                nodes.append(self.parse_node(header, l))
+                continue
+            if ( doing_shares ):
+                s = self.parse_share(header, l)
+                k = s['node']
+                if ( shares.has_key(k) ):
+                    share_list = shares[k]
+                else:
+                    share_list = []
+                    shares[k] = share_list
+                share_list.append(s)
+                continue
+        return nodes, shares
     
     def main(self, argv):
 
@@ -65,53 +118,20 @@ class DuccRmQOccupancy(DuccUtil):
             sys.exit(1);
 
         DH = self.DUCC_HOME
-        CP = [self.resolve_jar(DH, '/lib/uima-ducc', 'uima-ducc-database'),
-              self.resolve_jar(DH, '/lib/uima-ducc', 'uima-ducc-common'),
-              DH + '/lib/cassandra/*',
-              DH + '/lib/apache-log4j/*',
-              DH + '/lib/guava/*',
-              self.resolve_jar(DH, '/cassandra-server/lib', 'slf4j-api'),
-              self.resolve_jar(DH, '/apache-uima/apache-activemq/lib', 'slf4j-log4j12'),
-             ]
-        os.environ['CLASSPATH'] = ':'.join(CP)
-        
-        DUCC_JVM_OPTS = ''
-        DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME
-        DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.rm.persistence.impl=org.apache.uima.ducc.database.RmStatePersistence'
-
         dbn = self.ducc_properties.get('ducc.database.host')
-        CMD = [self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.database.RmShareState', dbn]
-        CMD = ' '.join(CMD)
-        lines = ''
-        proc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, shell=True)
-        for line in proc.stdout:
-            lines = lines + line
 
-        shares = eval(lines)
-        
-        shares_by_machine = {}
-        for share in shares:
-            k = share['node']
-            if ( shares_by_machine.has_key(k) ):
-                share_list = shares_by_machine[k]
-            else:
-                share_list = []
-                shares_by_machine[k] = share_list
-            share_list.append(share)
-    
-        CMD = [self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.database.RmNodeState', dbn]
+        CMD = [DH + '/cassandra-server/bin/cqlsh', dbn, '-u', 'guest', '-p', 'guest', '-e', '"select * from ducc.rmnodes; select * from ducc.rmshares;"']
         CMD = ' '.join(CMD)
-        lines = ''
+
+        lines = []
         proc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, shell=True)
         for line in proc.stdout:
-            lines = lines + line
+            # print line.strip()
+            lines.append(line)
+
+        nodes, shares = self.rmnodes(lines)
+        self.format(nodes, shares)
 
-        nodes = eval(lines)
-        nodes = sorted(nodes, key=lambda n: n["name"])
-                                           
-        self.format(nodes, shares_by_machine)
-        
-        return
 
 if __name__ == "__main__":
     stopper = DuccRmQOccupancy()