You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2013/09/30 20:29:31 UTC
svn commit: r1527698 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts: runducc
start_sim stop_sim
Author: challngr
Date: Mon Sep 30 18:29:31 2013
New Revision: 1527698
URL: http://svn.apache.org/r1527698
Log:
UIMA-3301 Multithread the simulation scripting.
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc
uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/start_sim
uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/stop_sim
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc?rev=1527698&r1=1527697&r2=1527698&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc Mon Sep 30 18:29:31 2013
@@ -192,7 +192,7 @@ class DuccProcess(Thread):
jvm_classpath = jvm_classpath + ':' + self.DUCC_HOME + '/examples/simple/resources/service'
plain_broker_url = self.runner.broker_protocol + '://' + self.runner.broker_host + ':' + self.runner.broker_port
- cr_parms = 'jobfile:' + self.jobfile + ',compression:' + self.runner.compression + ',error_rate:' + str(self.runner.error_rate)
+ cr_parms = '"jobfile=' + self.jobfile + ' compression=' + self.runner.compression + ' error_rate=' + str(self.runner.error_rate) + '"'
process_args.append(process_xmx)
process_args.append('-DdefaultBrokerURL=' + plain_broker_url )
@@ -222,9 +222,6 @@ class DuccProcess(Thread):
CMD = CMD + ' --driver_descriptor_CR ' + cr
CMD = CMD + ' --driver_descriptor_CR_overrides ' + cr_parms
CMD = CMD + ' --driver_jvm_args ' + jvm_driver_args
- CMD = CMD + ' --driver_classpath ' + jvm_classpath
- CMD = CMD + ' --driver_environment ' + 'LD_LIBRARY_PATH=/a/other/bogus/path'
- # CMD = CMD + ' --driver_exception_handler ' + 'org.apache.uima.ducc.test.handler.TimeoutHandler'
if ( self.runner.style == 'DD' ):
CMD = CMD + ' --process_DD ' + dd
@@ -237,7 +234,7 @@ class DuccProcess(Thread):
#CMD = CMD + ' --working_directory ' + working_dir
CMD = CMD + ' --process_memory_size ' + memory
- CMD = CMD + ' --process_classpath ' + jvm_classpath
+ CMD = CMD + ' --classpath ' + jvm_classpath
CMD = CMD + ' --process_jvm_args ' + jvm_process_args
CMD = CMD + ' --process_thread_count ' + nthreads
@@ -249,7 +246,7 @@ class DuccProcess(Thread):
if ( self.runner.init_timeout > 0 ):
CMD = CMD + ' --process_initialization_time_max ' + str(self.runner.init_timeout)
- CMD = CMD + ' --process_environment ' \
+ CMD = CMD + ' --environment ' \
+ '"' \
+ ' AE_INIT_TIME=' + str(self.runner.init_time) \
+ ' AE_INIT_RANGE=' + str(self.runner.init_range) \
@@ -258,8 +255,11 @@ class DuccProcess(Thread):
+ ' LD_LIBRARY_PATH=/a/bogus/path' \
+ '"'
- if ( machines != None ):
- CMD = CMD + ' --process_deployments_max ' + machines
+ if ( self.runner.max_machines == 0 ):
+ if ( machines != None ):
+ CMD = CMD + ' --process_deployments_max ' + machines
+ elif (self.runner.max_machines != -1 ):
+ CMD = CMD + ' --process_deployments_max ' + self.runner.max_machines
if ( self.runner.watch ):
CMD = CMD + ' --wait_for_completion true'
@@ -620,6 +620,9 @@ class RunDucc(DuccUtil):
print ' -m, --memory_override mem-in-GB'
print ' Use this instead of what is in the props file. Default: None'
print ''
+ print ' -n, --nmachines_override process_deployments_max'
+ print ' Override the preconfigured max machines. Use -1 to fully inhibit max machines'
+ print ''
print ' -p, --process_timeout sec'
print ' Process timeout, in seconds. Default:', self.process_timeout
print ''
@@ -670,13 +673,14 @@ class RunDucc(DuccUtil):
self.use_apache_package = True
self.use_ibm_package = False
self.submit_package = 'org.apache.uima.ducc'
+ self.max_machines = 0
try:
- opts, args = getopt.getopt(argv, 'b:d:i:r:m:p:wx:y:?h', ['AE', 'DD', 'SE=', 'IB=', 'PB=', 'directory=', 'batchfile=', 'init_time=',
- 'init_fail_cap=', 'range=', 'memory_override=', 'process_timeout=', 'init_timeout=',
- 'watch',
- 'jd_uima_log=', 'jp_uima_log=',
- 'use_jar'
+ opts, args = getopt.getopt(argv, 'b:d:i:r:m:n:p:wx:y:?h', ['AE', 'DD', 'SE=', 'IB=', 'PB=', 'directory=', 'batchfile=', 'init_time=',
+ 'init_fail_cap=', 'range=', 'memory_override=', 'nmachines=', 'process_timeout=',
+ 'init_timeout=','watch',
+ 'jd_uima_log=', 'jp_uima_log=',
+ 'use_jar'
])
except:
print "Unknown option"
@@ -695,6 +699,8 @@ class RunDucc(DuccUtil):
self.init_range = int(a) * 1000
elif o in ('-m', '--memory_override'):
self.memory_override = a
+ elif o in ('-n', '--nmachines'):
+ self.max_machines = int(a)
elif o in ('-p', '--process_timeout'):
self.process_timeout = a
elif o in ('-w', '--watch' ):
@@ -748,6 +754,7 @@ class RunDucc(DuccUtil):
print ' init_error :', self.init_error
print ' process_timeout :', self.process_timeout
print ' memory_override :', self.memory_override
+ print ' max_machines :', self.max_machines
print ' jd_uima_log :', self.jd_uima_log
print ' jp_uima_log :', self.jp_uima_log
print ' use_jar :', self.use_jar
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/start_sim
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/start_sim?rev=1527698&r1=1527697&r2=1527698&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/start_sim (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/start_sim Mon Sep 30 18:29:31 2013
@@ -32,6 +32,9 @@ import sys
import time
import getopt
+from threading import *
+import Queue
+
#designed to run only here, relative to ducc_runtime
os.environ['DUCC_HOME'] = os.path.abspath(sys.argv[0] + '/../../..')
sys.path.append(os.environ['DUCC_HOME'] + '/admin')
@@ -39,6 +42,8 @@ sys.path.append(os.environ['DUCC_HOME']
from ducc_util import DuccUtil
from ducc_util import DuccProperties
from ducc import Ducc
+from ducc_util import ThreadPool
+
class StartSim(DuccUtil):
@@ -96,39 +101,40 @@ class StartSim(DuccUtil):
#
# Start admin components rm pm sm ws or, on local node using Ducc.py
#
- def startComponents(self, components, or_parms):
+ def startComponent(self, args):
- for (com, com) in components.items():
+ msgs = []
+ com, or_parms = args
- if ( com == 'broker' ):
- continue
-
- if ( com in ('ws', 'viz') ):
- node = self.webserver_node
- else:
- node = self.localhost
+ if ( com in ('ws', 'viz') ):
+ node = self.webserver_node
+ else:
+ node = self.localhost
+
+ if ( com == 'or' ):
+ lines = self.ssh(node, True, "'",
+ self.DUCC_HOME + '/admin/ducc.py', '-c', 'or', '-b',
+ '--or_parms', or_parms, "'")
+ else:
+ lines = self.ssh(node, True, "'",
+ self.DUCC_HOME + '/admin/ducc.py', '-c', com, '-b', "'")
- if ( com == 'or' ):
- lines = self.ssh(node, True, "'",
- self.DUCC_HOME + '/admin/ducc.py', '-c', 'or', '-b',
- '--or_parms', or_parms, "'")
- else:
- lines = self.ssh(node, True, "'",
- self.DUCC_HOME + '/admin/ducc.py', '-c', com, '-b', "'")
-
- print 'Start', com, 'on', node,
- while 1:
- line = lines.readline().strip()
- if ( not line ):
- break
- # print '[] ' + line
- if ( line.startswith('PID') ):
- toks = line.split(' ') # get the PID
- self.pids.put(com, self.localhost + ' ' + toks[1] + ' ' + self.localhost)
- lines.close()
- print 'PID', toks[1]
- break
+ msgs.append(('Start', com, 'on', node))
+ while 1:
+ line = lines.readline().strip()
+ if ( not line ):
+ break
+ # print '[] ' + line
+ if ( line.startswith('PID') ):
+ toks = line.split(' ') # get the PID
+ self.pidlock.acquire();
+ self.pids.put(com, self.localhost + ' ' + toks[1] + ' ' + self.localhost)
+ self.pidlock.release();
+ lines.close()
+ msgs.append((' PID', toks[1]))
+ break
+ return msgs
#
# Read the special nodelist and start "special" agents
@@ -144,10 +150,31 @@ class StartSim(DuccUtil):
# We use Jerry's memory override for each agent to get it to falsely report the memory
# instead of reading from the real machine.
#
-
- def startAgents(self, nodelist, instances):
-
+ def startOneAgent(self, args):
+ response = []
+ node, cmd, mem, ip, pnode, index = args
+ response.append(('Starting agent on', node, 'instance', index, 'as pseudo-node', pnode, 'IP', ip, 'memory', mem))
+ lines = self.ssh(node, True, "'", cmd, '--agent', '--memory', mem, '--addr', ip, '--pseudoname', pnode, "'")
+ while 1:
+ line = lines.readline().strip()
+ if ( not line ):
+ break
+ #print '[]', line
+ if ( line.startswith('PID')):
+ toks = line.split(' ') # get the PID
+ lines.close()
+ response.append((' ... Started, PID', toks[1]))
+
+ # Gottoa run on old python that doesn't know 'with'
+ self.pidlock.acquire()
+ self.pids.put(index, node + ' ' + toks[1] + ' ' + pnode)
+ self.pidlock.release()
+
+ break
+ return response
+
+ def startAgents(self, nodelist, instances):
do_all = True
if ( len(instances) > 0 ):
do_all = False
@@ -176,30 +203,16 @@ class StartSim(DuccUtil):
allnodes.append( (str(ndx), node, mem) )
ndx = ndx + 1
+ here = os.getcwd()
+ cmd = os.path.abspath(sys.argv[0])
for (index, node, mem) in allnodes:
if ( not do_all ):
if ( not instances.has_key(index) ):
continue
- print 'Starting', index, node, mem
ip = '192.168.4.' + index
pnode = node + '-' + index
- print 'Starting agent on', node, 'instance', index, 'as pseudo-node', pnode, 'IP', ip, 'memory', mem,
-
- here = os.getcwd()
- cmd = os.path.abspath(sys.argv[0])
- lines = self.ssh(node, True, "'", cmd, '--agent', '--memory', mem, '--addr', ip, '--pseudoname', pnode, "'")
- while 1:
- line = lines.readline().strip()
- if ( not line ):
- break
- #print '[1]', line
- if ( line.startswith('PID')):
- toks = line.split(' ') # get the PID
- lines.close()
- print 'Started, PID', toks[1]
- self.pids.put(index, node + ' ' + toks[1] + ' ' + pnode)
- break
+ self.threadpool.invoke(self.startOneAgent, node, cmd, mem, ip, pnode, index)
def usage(self, msg):
@@ -267,6 +280,7 @@ class StartSim(DuccUtil):
sys.exit(1)
def main(self, argv):
+ print "Running as", os.getpid()
if ( len(argv) == 0 ):
self.usage(None)
@@ -341,6 +355,9 @@ class StartSim(DuccUtil):
self. run_local_agent(pseudoname, IP, memory)
sys.exit(0)
else:
+ self.pidlock = Lock()
+ self.threadpool = ThreadPool(50)
+
if ( (IP != None) or (memory != None) or ( pseudoname != None )) :
self.invalid("Running with a nodelist is not compatible with running a single agent.");
@@ -350,9 +367,12 @@ class StartSim(DuccUtil):
if ( node_config != None ):
self.startAgents(node_config, instances)
- self.startComponents(components, or_parms)
+ for (com, com) in components.items():
+ if ( com != 'broker' ): # started separately
+ self.threadpool.invoke(self.startComponent, com, or_parms)
- self.pids.write('sim.pids')
+ self.threadpool.quit()
+ self.pids.write('sim.pids')
if __name__ == "__main__":
starter = StartSim()
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/stop_sim
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/stop_sim?rev=1527698&r1=1527697&r2=1527698&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/stop_sim (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/stop_sim Mon Sep 30 18:29:31 2013
@@ -36,20 +36,25 @@ import getopt
from ducc_util import DuccUtil
from ducc_util import DuccProperties
from ducc import Ducc
+from ducc_util import ThreadPool
class StopSim(DuccUtil):
- def signal_process(self, inst, data, signal):
+ def signal_process(self, args):
+ inst, data, signal = args
(node, pid, pname) = data.split(' ')
+
+ msgs = []
+ msgs.append(('Stopping process', inst, 'on', node, pid, pname, 'with signal', signal))
if ( not (inst in self.default_components )):
cmp = 'agent'
else:
cmp = inst
- print 'Stopping process', inst, 'on', node, pid, pname, 'with signal', signal
self.ssh(node, False, 'kill', signal, pid)
if ( inst == 'or' ):
self.remove_orchestrator_lock()
+ return msgs
def usage(self, msg):
@@ -129,19 +134,24 @@ class StopSim(DuccUtil):
pids = DuccProperties()
pids.load('sim.pids')
+ self.threadpool = ThreadPool(50)
if ( (len(components) + len(instances)) == 0 ):
for (inst, data) in pids.items():
- self.signal_process(inst, data, signal)
+ self.threadpool.invoke(self.signal_process, inst, data, signal)
+ # self.signal_process(inst, data, signal)
if ( signal in ('-KILL', '-INT') ):
pids.delete(inst)
else:
for inst in instances:
data = pids.get(inst)
- self.signal_process(inst, pids.get(inst), signal)
+ self.threadpool.invoke(self.signal_process, inst, pids.get(inst), signal)
+ #self.signal_process(inst, pids.get(inst), signal)
if ( signal in ('-KILL', '-INT') ):
pids.delete(inst)
+ self.threadpool.quit();
+
pids.write('sim.pids')
sleeptime = 5