You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2013/09/30 20:29:31 UTC

svn commit: r1527698 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts: runducc start_sim stop_sim

Author: challngr
Date: Mon Sep 30 18:29:31 2013
New Revision: 1527698

URL: http://svn.apache.org/r1527698
Log:
UIMA-3301 Multithread the simulation scripting.

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc
    uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/start_sim
    uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/stop_sim

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc?rev=1527698&r1=1527697&r2=1527698&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc Mon Sep 30 18:29:31 2013
@@ -192,7 +192,7 @@ class DuccProcess(Thread):
             jvm_classpath = jvm_classpath + ':' + self.DUCC_HOME + '/examples/simple/resources/service'
         
         plain_broker_url = self.runner.broker_protocol + '://' + self.runner.broker_host + ':' + self.runner.broker_port
-        cr_parms         = 'jobfile:' + self.jobfile + ',compression:' + self.runner.compression + ',error_rate:' + str(self.runner.error_rate)
+        cr_parms         = '"jobfile=' + self.jobfile + ' compression=' + self.runner.compression + ' error_rate=' + str(self.runner.error_rate) + '"'
         process_args.append(process_xmx)
         process_args.append('-DdefaultBrokerURL=' + plain_broker_url )
 
@@ -222,9 +222,6 @@ class DuccProcess(Thread):
         CMD = CMD + ' --driver_descriptor_CR '                + cr
         CMD = CMD + ' --driver_descriptor_CR_overrides '      + cr_parms
         CMD = CMD + ' --driver_jvm_args '                     + jvm_driver_args
-        CMD = CMD + ' --driver_classpath '                    + jvm_classpath
-        CMD = CMD + ' --driver_environment '                  + 'LD_LIBRARY_PATH=/a/other/bogus/path'
-        # CMD = CMD + ' --driver_exception_handler '            + 'org.apache.uima.ducc.test.handler.TimeoutHandler'
         
         if ( self.runner.style == 'DD' ):
             CMD = CMD + ' --process_DD '                      + dd
@@ -237,7 +234,7 @@ class DuccProcess(Thread):
             #CMD = CMD + ' --working_directory '               + working_dir
 
         CMD = CMD + ' --process_memory_size '                 + memory
-        CMD = CMD + ' --process_classpath '                   + jvm_classpath
+        CMD = CMD + ' --classpath '                   + jvm_classpath
         
         CMD = CMD + ' --process_jvm_args '                    + jvm_process_args
         CMD = CMD + ' --process_thread_count '                + nthreads
@@ -249,7 +246,7 @@ class DuccProcess(Thread):
         if ( self.runner.init_timeout > 0 ):
             CMD = CMD + ' --process_initialization_time_max ' + str(self.runner.init_timeout)
 
-        CMD = CMD + ' --process_environment ' \
+        CMD = CMD + ' --environment ' \
                   + '"' \
                   + ' AE_INIT_TIME='  + str(self.runner.init_time) \
                   + ' AE_INIT_RANGE=' + str(self.runner.init_range) \
@@ -258,8 +255,11 @@ class DuccProcess(Thread):
                   + ' LD_LIBRARY_PATH=/a/bogus/path' \
                   + '"'
 
-        if ( machines != None ):
-            CMD = CMD + ' --process_deployments_max '  + machines
+        if ( self.runner.max_machines == 0 ):
+            if ( machines != None ):
+                CMD = CMD + ' --process_deployments_max '  + machines
+        elif (self.runner.max_machines != -1 ):
+            CMD = CMD + ' --process_deployments_max '  + self.runner.max_machines
 
         if ( self.runner.watch ):
             CMD = CMD + ' --wait_for_completion true'     
@@ -620,6 +620,9 @@ class RunDucc(DuccUtil):
         print '   -m, --memory_override mem-in-GB'
         print '       Use this instead of what is in the props file. Default: None'
         print ''
+        print '   -n, --nmachines_override process_deployments_max'
+        print '       Override the preconfigured max machines. Use -1 to fully inhibit max machines'
+        print ''
         print '   -p, --process_timeout sec'
         print '       Process timeout, in seconds. Default:', self.process_timeout
         print ''
@@ -670,13 +673,14 @@ class RunDucc(DuccUtil):
         self.use_apache_package = True
         self.use_ibm_package = False
         self.submit_package = 'org.apache.uima.ducc'
+        self.max_machines = 0
 
         try:
-            opts, args  = getopt.getopt(argv, 'b:d:i:r:m:p:wx:y:?h', ['AE', 'DD', 'SE=', 'IB=', 'PB=', 'directory=', 'batchfile=', 'init_time=',
-                                                                      'init_fail_cap=', 'range=', 'memory_override=', 'process_timeout=', 'init_timeout=',
-                                                                      'watch',
-                                                                      'jd_uima_log=', 'jp_uima_log=', 
-                                                                      'use_jar'
+            opts, args  = getopt.getopt(argv, 'b:d:i:r:m:n:p:wx:y:?h', ['AE', 'DD', 'SE=', 'IB=', 'PB=', 'directory=', 'batchfile=', 'init_time=',
+                                                                        'init_fail_cap=', 'range=', 'memory_override=', 'nmachines=', 'process_timeout=', 
+                                                                        'init_timeout=','watch',
+                                                                        'jd_uima_log=', 'jp_uima_log=', 
+                                                                        'use_jar'
                                                                       ])
         except:
             print "Unknown option"
@@ -695,6 +699,8 @@ class RunDucc(DuccUtil):
                 self.init_range = int(a) * 1000
             elif o in ('-m', '--memory_override'):
                 self.memory_override = a
+            elif o in ('-n', '--nmachines'):
+                self.max_machines = int(a)
             elif o in ('-p', '--process_timeout'):
                 self.process_timeout = a
             elif o in ('-w', '--watch' ):
@@ -748,6 +754,7 @@ class RunDucc(DuccUtil):
         print '    init_error         :', self.init_error
         print '    process_timeout    :', self.process_timeout
         print '    memory_override    :', self.memory_override
+        print '    max_machines       :', self.max_machines
         print '    jd_uima_log        :', self.jd_uima_log
         print '    jp_uima_log        :', self.jp_uima_log
         print '    use_jar            :', self.use_jar

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/start_sim
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/start_sim?rev=1527698&r1=1527697&r2=1527698&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/start_sim (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/start_sim Mon Sep 30 18:29:31 2013
@@ -32,6 +32,9 @@ import sys
 import time
 import getopt
 
+from threading import *
+import Queue
+
 #designed to run only here, relative to ducc_runtime
 os.environ['DUCC_HOME'] = os.path.abspath(sys.argv[0] + '/../../..')
 sys.path.append(os.environ['DUCC_HOME'] + '/admin')
@@ -39,6 +42,8 @@ sys.path.append(os.environ['DUCC_HOME'] 
 from ducc_util import DuccUtil
 from ducc_util import DuccProperties
 from ducc import Ducc
+from ducc_util import ThreadPool
+
 
 class StartSim(DuccUtil):
 
@@ -96,39 +101,40 @@ class StartSim(DuccUtil):
     #
     # Start admin components rm pm sm ws or, on local node using Ducc.py
     #        
-    def startComponents(self, components, or_parms):
+    def startComponent(self, args):
 
-        for (com, com) in components.items():
+        msgs = []
+        com, or_parms = args
 
-            if ( com == 'broker' ):
-                continue
-
-            if ( com in ('ws', 'viz') ):
-                node = self.webserver_node
-            else:
-                node = self.localhost
+        if ( com in ('ws', 'viz') ):
+            node = self.webserver_node
+        else:
+            node = self.localhost
+
+        if ( com == 'or' ):
+            lines = self.ssh(node, True, "'", 
+                             self.DUCC_HOME + '/admin/ducc.py', '-c', 'or', '-b', 
+                             '--or_parms', or_parms, "'")
+        else:
+            lines = self.ssh(node, True, "'", 
+                             self.DUCC_HOME + '/admin/ducc.py', '-c', com, '-b', "'")
 
-            if ( com == 'or' ):
-                lines = self.ssh(node, True, "'", 
-                                 self.DUCC_HOME + '/admin/ducc.py', '-c', 'or', '-b', 
-                                 '--or_parms', or_parms, "'")
-            else:
-                lines = self.ssh(node, True, "'", 
-                                 self.DUCC_HOME + '/admin/ducc.py', '-c', com, '-b', "'")
-
-            print 'Start', com, 'on', node,
-            while 1:
-                line = lines.readline().strip()
-                if ( not line ):
-                    break
-            # print '[] ' + line
-                if ( line.startswith('PID') ):
-                    toks = line.split(' ')    # get the PID
-                    self.pids.put(com, self.localhost + ' ' + toks[1] + ' ' + self.localhost)
-                    lines.close()
-                    print 'PID', toks[1]
-                    break
+        msgs.append(('Start', com, 'on', node))
+        while 1:
+            line = lines.readline().strip()
+            if ( not line ):
+                break
+        # print '[] ' + line
+            if ( line.startswith('PID') ):
+                toks = line.split(' ')    # get the PID
+                self.pidlock.acquire();
+                self.pids.put(com, self.localhost + ' ' + toks[1] + ' ' + self.localhost)
+                self.pidlock.release();
+                lines.close()
+                msgs.append(('    PID', toks[1]))
+                break
 
+        return msgs
 
     #
     # Read the special nodelist and start "special" agents
@@ -144,10 +150,31 @@ class StartSim(DuccUtil):
     # We use Jerry's memory override for each agent to get it to falsely report the memory
     # instead of reading from the real machine.
     #
-  
-    def startAgents(self, nodelist, instances):
 
-        
+    def startOneAgent(self, args):
+        response = []
+        node, cmd, mem, ip, pnode, index = args
+        response.append(('Starting agent on', node, 'instance', index, 'as pseudo-node', pnode, 'IP', ip, 'memory', mem))
+        lines = self.ssh(node, True, "'", cmd, '--agent', '--memory', mem, '--addr', ip, '--pseudoname', pnode, "'")
+        while 1:
+            line = lines.readline().strip()
+            if ( not line ):
+                break
+            #print '[]', line
+            if ( line.startswith('PID')):
+                toks = line.split(' ')    # get the PID
+                lines.close()
+                response.append(('  ... Started,  PID', toks[1]))
+
+                # Gottoa run on old python that doesn't know 'with'
+                self.pidlock.acquire()
+                self.pids.put(index, node + ' ' + toks[1] + ' ' + pnode)
+                self.pidlock.release()
+
+                break
+        return response
+  
+    def startAgents(self, nodelist, instances):        
         do_all = True
         if ( len(instances) > 0 ):
             do_all = False
@@ -176,30 +203,16 @@ class StartSim(DuccUtil):
                         allnodes.append( (str(ndx), node, mem) )
                         ndx = ndx + 1
 
+        here = os.getcwd()
+        cmd = os.path.abspath(sys.argv[0])   
         for (index, node, mem) in allnodes:
             if ( not do_all ):
                 if ( not instances.has_key(index) ):
                     continue
 
-            print 'Starting', index, node, mem
             ip = '192.168.4.' + index
             pnode = node + '-' + index
-            print 'Starting agent on', node, 'instance', index, 'as pseudo-node', pnode, 'IP', ip, 'memory', mem,
-
-            here = os.getcwd()
-            cmd = os.path.abspath(sys.argv[0])   
-            lines = self.ssh(node, True, "'", cmd, '--agent', '--memory', mem, '--addr', ip, '--pseudoname', pnode, "'")
-            while 1:
-                line = lines.readline().strip()
-                if ( not line ):
-                    break
-                #print '[1]', line
-                if ( line.startswith('PID')):
-                    toks = line.split(' ')    # get the PID
-                    lines.close()
-                    print 'Started,  PID', toks[1]
-                    self.pids.put(index, node + ' ' + toks[1] + ' ' + pnode)
-                    break
+            self.threadpool.invoke(self.startOneAgent, node, cmd, mem, ip, pnode, index)
 
 
     def usage(self, msg):
@@ -267,6 +280,7 @@ class StartSim(DuccUtil):
         sys.exit(1)
 
     def main(self, argv):
+        print "Running as", os.getpid()
 
         if ( len(argv) == 0 ):
             self.usage(None)
@@ -341,6 +355,9 @@ class StartSim(DuccUtil):
             self. run_local_agent(pseudoname, IP, memory)
             sys.exit(0)
         else:         
+            self.pidlock = Lock()
+            self.threadpool = ThreadPool(50)
+
             if ( (IP != None) or (memory != None) or ( pseudoname != None )) :
                 self.invalid("Running with a nodelist is not compatible with running a single agent.");
 
@@ -350,9 +367,12 @@ class StartSim(DuccUtil):
             if ( node_config != None ):
                 self.startAgents(node_config, instances)
 
-            self.startComponents(components, or_parms)
+            for (com, com) in components.items():
+                if ( com != 'broker' ):    # started separately
+                    self.threadpool.invoke(self.startComponent, com, or_parms)
 
-        self.pids.write('sim.pids')            
+            self.threadpool.quit()
+            self.pids.write('sim.pids')            
 
 if __name__ == "__main__":
     starter = StartSim()

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/stop_sim
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/stop_sim?rev=1527698&r1=1527697&r2=1527698&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/stop_sim (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/stop_sim Mon Sep 30 18:29:31 2013
@@ -36,20 +36,25 @@ import getopt
 from ducc_util import DuccUtil
 from ducc_util import DuccProperties
 from ducc import Ducc
+from ducc_util import ThreadPool
 
 class StopSim(DuccUtil):
 
-    def signal_process(self, inst, data, signal):
+    def signal_process(self, args):
+        inst, data, signal = args
         (node, pid, pname) = data.split(' ')
+
+        msgs = []
+        msgs.append(('Stopping process', inst, 'on', node, pid, pname, 'with signal', signal))
         if ( not (inst in self.default_components )):
             cmp = 'agent'
         else:
             cmp = inst
-        print 'Stopping process', inst, 'on', node, pid, pname, 'with signal', signal
         self.ssh(node, False, 'kill', signal, pid)
 
         if ( inst == 'or' ):
             self.remove_orchestrator_lock()
+        return msgs
 
     def usage(self, msg):
 
@@ -129,19 +134,24 @@ class StopSim(DuccUtil):
         pids = DuccProperties()
         pids.load('sim.pids')
 
+        self.threadpool = ThreadPool(50)
         if ( (len(components) + len(instances)) == 0 ):
             for (inst, data) in pids.items():
-                self.signal_process(inst, data, signal)
+                self.threadpool.invoke(self.signal_process, inst, data, signal)
+                # self.signal_process(inst, data, signal)
                 if ( signal in ('-KILL', '-INT') ):
                     pids.delete(inst)
 
         else:
             for inst in instances:
                 data = pids.get(inst)
-                self.signal_process(inst, pids.get(inst), signal)
+                self.threadpool.invoke(self.signal_process, inst, pids.get(inst), signal)
+                #self.signal_process(inst, pids.get(inst), signal)
                 if ( signal in ('-KILL', '-INT') ):
                     pids.delete(inst)
 
+        self.threadpool.quit();
+
         pids.write('sim.pids')
 
         sleeptime = 5