You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2015/02/02 19:50:38 UTC

svn commit: r1656539 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main: java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java java/org/apache/uima/ducc/test/randomsleep/FixedSleepCR.java scripts/runducc

Author: challngr
Date: Mon Feb  2 18:50:37 2015
New Revision: 1656539

URL: http://svn.apache.org/r1656539
Log:
UIMA-4195 AE error processing: AE_INIT_EXIT, AE_INIT_ERROR, AE_RUNTIME_EXIT, AE_RUNTIME_ERROR

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepCR.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java?rev=1656539&r1=1656538&r2=1656539&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepAE.java Mon Feb  2 18:50:37 2015
@@ -119,32 +119,31 @@ public class FixedSleepAE extends CasAnn
             }
         }
 
-
         long sleep;
         if ( !initComplete ) {                                    // longer init only the first tim
             initComplete = true;
         } 
 
-        String itime = System.getenv("AE_INIT_TIME");        // the minimum time to sleep
-        String irange = System.getenv("AE_INIT_RANGE");      // the range of a random amount added to the minimum time
-        String ierror = System.getenv("INIT_ERROR");         // probability of init error, int, 0:100
-
-        if ( itime == null ) {
-            throw new IllegalArgumentException("Missing AE_INIT_TIME");
-        }
+        int i_error  = getIntFromEnv("AE_INIT_ERROR", false);      // probability of init error, int, 0:100
+        int i_exit   = getIntFromEnv("AE_INIT_EXIT" , false);
+        int i_itime  = getIntFromEnv("AE_INIT_TIME" , true );
+        int i_irange = getIntFromEnv("AE_INIT_RANGE", true );
 
-        if ( irange == null ) {
-            throw new IllegalArgumentException("Missing AE_INIT_RANGE");
+        if ( i_error > 0 ) {
+            int toss = nextrand(100);
+            logger.log(Level.INFO, "Init errors: probability[" + i_error + "] toss[" + toss + "]");
+            if ( i_error > toss ) {
+                throwAnException("Random Error in Initialization");
+            }
         }
 
-        int i_itime;
-        int i_irange;
-        try {
-            i_itime = Integer.parseInt(itime);
-            i_irange = Integer.parseInt(irange);
-        } catch (NumberFormatException e) {
-            logger.log(Level.INFO, "Invalid AE_INIT_TIME[" + itime + "] or AE_INIT_RANGE[" + irange + "] - must be numbers.");
-            throw e;
+        if ( i_exit > 0 ) {
+            int toss = nextrand(100);
+            logger.log(Level.INFO, "Init hard exit: probability[" + i_exit + "] toss[" + toss + "]");
+            if ( i_exit > toss ) {
+                logger.log(Level.INFO, "Init hard exit: croaking hard now.");
+                Runtime.getRuntime().halt(19);
+            }
         }
 
         if ( i_itime < 0 ) {
@@ -154,15 +153,6 @@ public class FixedSleepAE extends CasAnn
         if ( i_irange <= 0 ) {
             throw new IllegalArgumentException("Invalid AE_INIT_RANGE, must be > 0");
         }
-
-        if ( ierror != null ) {
-            int probability = Integer.parseInt(ierror);
-            int toss = nextrand(100);
-            logger.log(Level.INFO, "Init errors: probability[" + probability + "] toss[" + toss + "]");
-            if ( probability > toss ) {
-                throwAnException("Random Error in Initialization");
-            }
-        }
         
         sleep = i_itime + nextrand(i_irange);  // pick off some random number of milliseconds, min of 5 minutes init sleep
 
@@ -185,7 +175,40 @@ public class FixedSleepAE extends CasAnn
         logger.log(Level.INFO, "^^-------> AE process " + pid + " TID " + tid + " initialization " + ok);
         return;
     }
-        
+
+    int getIntFromEnv(String key, boolean fail)
+    {
+        String s = System.getenv(key);
+        if ( s == null ) {
+            if ( fail ) throw new IllegalArgumentException("Missing " + key);
+            else        return 0;
+        }
+
+        try {
+            return Integer.parseInt(s);
+        } catch (NumberFormatException e) {
+            logger.log(Level.INFO, "Invalid " + key + "[" + s + "].  Must be integer.");
+            throw e;
+        }
+    }
+
+    double getDoubleFromEnv(String key, boolean fail)
+    {
+        String s = System.getenv(key);
+        if ( s == null ) {
+            if ( fail ) throw new IllegalArgumentException("Missing " + key);
+            else        return 0.0;
+        }
+
+        try {
+            return Double.parseDouble(s);
+        } catch (NumberFormatException e) {
+            logger.log(Level.INFO, "Invalid " + key + "[" + s + "].  Must be double.");
+            throw e;
+        }
+    }
+
+
     /**
      * Need to simulate a process that leaks.  We just allocate stuff until we die somehow.  
      * Careful, this can be pretty nasty if not contained by the infrastructure.  
@@ -228,7 +251,7 @@ public class FixedSleepAE extends CasAnn
      * This thows all kinds of stuff.
      */
     @SuppressWarnings("null")
-        void throwAnException(String msgheader)
+    void throwAnException(String msgheader)
     {
         int MAX_EXCEPTIONS = 7;        // deliberately wrong, this is a foul-up simulator after all!
 
@@ -280,7 +303,7 @@ public class FixedSleepAE extends CasAnn
         return ( ((int) r.nextLong()) & Integer.MAX_VALUE) % max;
     }
 
-    void randomError(double error_rate, String msgheader)
+    void randomError(double error_rate, String msgheader, boolean do_exit)
     //throws Exception
     {
         //
@@ -298,9 +321,13 @@ public class FixedSleepAE extends CasAnn
         String msg = msgheader + " simulated error.";        
         
         int check = (int) Math.round(RANGE * (error_rate / 100.0));
-        dolog("**-------> AE Error Coin toss " + cointoss + " vs " + check + ": " + (cointoss < check));
+        dolog("**-------> AE Error Coin toss " + cointoss + " vs " + check + ": " + (cointoss < check), do_exit ? "Exiting." : "Throwing.");
         if ( cointoss < check ) {
-            throwAnException(msg);
+            if ( do_exit ) {
+                Runtime.getRuntime().halt(19);
+            } else {
+                throwAnException(msg);
+            }
         }
         //throw new AnalysisEngineProcessException(msg);
     }
@@ -352,7 +379,8 @@ public class FixedSleepAE extends CasAnn
         long          elapsed    = Long.parseLong(tok.nextToken());
         int           qid        = Integer.parseInt(tok.nextToken());
         int           total      = Integer.parseInt(tok.nextToken());
-        double        error_rate = Double.parseDouble(tok.nextToken());
+        double        error_rate = getDoubleFromEnv("AE_RUNTIME_ERROR", false);
+        double        exit_rate  = getDoubleFromEnv("AE_RUNTIME_EXIT", false);
         String        logid      = tok.nextToken();
 
         RuntimeMXBean rmxb       = ManagementFactory.getRuntimeMXBean();
@@ -386,7 +414,9 @@ public class FixedSleepAE extends CasAnn
                 bl.start();
             }
 
-            randomError(error_rate, msgheader);           
+            randomError(error_rate, msgheader, false);           
+            randomError(exit_rate, msgheader, true);
+
             Thread.sleep(elapsed);
             completion = "OK";
             dolog(msgheader + " returns after " + elapsed + " MS completion " + completion);

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepCR.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepCR.java?rev=1656539&r1=1656538&r2=1656539&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepCR.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/test/randomsleep/FixedSleepCR.java Mon Feb  2 18:50:37 2015
@@ -48,8 +48,6 @@ import org.apache.uima.util.ProgressImpl
  *   - compression - this is a number used to adjust each sleep time and hence the duration 
  *                   of the test.  The sleep time is divided by this number, so a larger
  *                   compression produces a shorter sleep and a faster run.
- *   - error_rate - this is passed to the JP for error injection.  It is a float, percentage and
- *                  indicates the expected rate of errors processing work items to be simulated.
  */
 
 public class FixedSleepCR extends CollectionReader_ImplBase {
@@ -57,7 +55,6 @@ public class FixedSleepCR extends Collec
     private volatile Logger logger;
     private volatile ArrayList<Long> workitems;
     private volatile int index = 0;
-    private volatile String error_rate = "0";
     private volatile String logdir = "None";
     private volatile String jobid;
     PrintStream jdmark;
@@ -80,9 +77,6 @@ public class FixedSleepCR extends Collec
         String comp = ((String) getConfigParameterValue("compression"));
         logger.log(Level.INFO, " ****** BB compression " + comp);
 
-        error_rate = ((String) getConfigParameterValue("error_rate"));
-        logger.log(Level.INFO, " ****** BB error_rate " + error_rate);
-
         Map<String, String> env = System.getenv();
         for ( String k : env.keySet() ) {
             System.out.println(String.format("Environment[%s] = %s", k, env.get(k)));
@@ -146,7 +140,7 @@ public class FixedSleepCR extends Collec
     public synchronized void getNext(CAS cas) throws IOException, CollectionException 
     {
         logger.log(Level.INFO, " ****** getNext[" + index + "]: " + workitems.get(index) + " getNext invocation " + get_next_counter++);
-        String parm = "" + workitems.get(index) + " " + (index+1) + " " + workitems.size() + " " + error_rate + " " + logdir;
+        String parm = "" + workitems.get(index) + " " + (index+1) + " " + workitems.size() + " " + logdir;
 
         if ( jdmark != null ) {
             jdmark.println("" + System.currentTimeMillis() + " " + parm);

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc?rev=1656539&r1=1656538&r2=1656539&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/scripts/runducc Mon Feb  2 18:50:37 2015
@@ -138,7 +138,7 @@ class DuccProcess(Thread):
         #        return                 
         #    print 'VERIFY ' + realid + ':', line
 
-        if ( self.runner.watch ):
+        if ( self.runner.observe ):
             self.runner.queue.get()            # remove marker so main() can eventually exit
             self.runner.queue.task_done()
 
@@ -200,7 +200,7 @@ class DuccProcess(Thread):
         else:
             plain_broker_url = self.runner.broker_protocol + '://' + self.runner.broker_host + ':' + self.runner.broker_port
 
-        cr_parms         = '"jobfile=' + self.jobfile + ' compression=' + self.runner.compression + ' error_rate=' + str(self.runner.error_rate) + '"'
+        cr_parms         = '"jobfile=' + self.jobfile + ' compression=' + self.runner.compression + '"'
         process_args.append(process_xmx)
         process_args.append('-DdefaultBrokerURL=' + plain_broker_url )
 
@@ -211,8 +211,6 @@ class DuccProcess(Thread):
         if ( self.runner.jp_uima_log != None ):
             driver_args.append(' -Djava.util.logging.config.file=' + self.runner.jp_uima_log)
 
-        description      = '"' + self.jobfile + '"'
-
         jvm_driver_args = self.mkargs(driver_args)
         jvm_process_args = self.mkargs(process_args)
 
@@ -222,7 +220,7 @@ class DuccProcess(Thread):
         CMD = os.environ['JAVA_HOME'] + '/bin/java'
         CMD = CMD + ' ' + self.runner.submit_package + '.cli.DuccJobSubmit'
 
-        CMD = CMD + ' --description '                         + description + '[' + self.runner.style + ']'
+        CMD = CMD + ' --description '                         + '"' + self.jobfile + '[' + self.runner.style + ']"'
         CMD = CMD + ' --driver_descriptor_CR '                + cr
         CMD = CMD + ' --driver_descriptor_CR_overrides '      + cr_parms
         CMD = CMD + ' --driver_jvm_args '                     + jvm_driver_args
@@ -252,9 +250,16 @@ class DuccProcess(Thread):
 
         CMD = CMD + ' --environment ' \
                   + '"' \
-                  + ' AE_INIT_TIME='  + str(self.runner.init_time) \
-                  + ' AE_INIT_RANGE=' + str(self.runner.init_range) \
-                  + ' INIT_ERROR='    + str(self.runner.init_error) \
+                  + ' AE_INIT_TIME='     + str(self.runner.init_time) \
+                  + ' AE_INIT_RANGE='    + str(self.runner.init_range) \
+                  + ' AE_INIT_EXIT='     + str(self.runner.ae_init_exit) \
+                  + ' AE_INIT_ERROR='    + str(self.runner.ae_init_error) \
+                  + ' AE_RUNTIME_EXIT='  + str(self.runner.ae_runtime_exit) \
+                  + ' AE_RUNTIME_ERROR=' + str(self.runner.ae_runtime_error) \
+                  + ' CR_INIT_EXIT='     + str(self.runner.cr_init_exit) \
+                  + ' CR_INIT_ERROR='    + str(self.runner.cr_init_error) \
+                  + ' CR_RUNTIME_EXIT='  + str(self.runner.cr_runtime_exit) \
+                  + ' CR_RUNTIME_ERROR=' + str(self.runner.cr_runtime_error) \
                   +  bloat_parms \
                   + ' LD_LIBRARY_PATH=/a/bogus/path' \
                   + '"'
@@ -265,7 +270,7 @@ class DuccProcess(Thread):
         elif (self.runner.max_machines != -1 ):
             CMD = CMD + ' --process_deployments_max '  + self.runner.max_machines
 
-        if ( self.runner.watch ):
+        if ( self.runner.observe ):
             CMD = CMD + ' --wait_for_completion'     
 
         self.user = user
@@ -454,7 +459,7 @@ class RunDucc(DuccUtil):
                 ducc_process = DuccProcess(self, jobfile)
                 ducc_process.submit()
 
-                if ( self.watch ) :
+                if ( self.observe ) :
                     self.queue.put(jobfile)         # any old marker will do
                     ducc_process.start()
                 else:
@@ -549,6 +554,10 @@ class RunDucc(DuccUtil):
         print '   -n, --nmachines_override process_deployments_max'
         print '       Override the preconfigured max machines. Use -1 to fully inhibit max machines'
         print ''
+        print '   -o, --observe'
+        print '       Specifies that we submit in keepalive mode and observe(watch) the jobs, creating a dir with outputs. Default:', self.observe
+        print '       If specified, we run verification against the results.'
+        print ''
         print '   -p, --process_timeout sec'
         print '       Process timeout, in seconds. Default:', self.process_timeout
         print ''
@@ -558,15 +567,17 @@ class RunDucc(DuccUtil):
         print '   --jp_uima_log log-properties'
         print '       If specified, use the indicated properties file for JP UIMA/UIMA-AS logging. Default:', self.jp_uima_log
         print ''
-        print '   -w, --watch'
-        print '       Specifies that we submit in keepalive mode and watch the jobs, creating a dir with outputs. Default:', self.watch
-        print '       If specified, we run verification against the results.'
-        print 
-        print '   -x'
-        print '       Error rate (expecting float, 0-100, which is translated into a percentage. Default:', self.error_rate
+        print '   -s'
+        print '       AE Probability that a JP will spontaneously exit during initialization.  Default:', self.ae_init_exit
+        print ''
+        print '   -t'
+        print '       AE Probability that a JP will throw an exception during initialization.  Default:', self.ae_init_error
+        print ''
+        print '   -u'
+        print '       AE Probability that a JP will spontaneously exit in the process method.  Default:', self.ae_runtime_exit
         print ''
-        print '   -y'
-        print '       Error probability during initialation, int, 0:100', self.init_error
+        print '   -v'
+        print '       AE Probability that a JP will throw an exception in the process method.  Default:', self.ae_runtime_error
         print ''
         print 'We run with DUCC_HOME set to', self.DUCC_HOME
         sys.exit(1)
@@ -575,9 +586,15 @@ class RunDucc(DuccUtil):
     
         self.test_dir   = None
         self.batchfile  = None
-        self.watch   = False
-        self.error_rate = 0.0
-        self.init_error = 0
+        self.observe   = False
+        self.ae_init_exit = 0            # -s  int 0-100
+        self.ae_init_error = 0           # -t  int 0-100
+        self.ae_runtime_exit = 0.0       # -u  float
+        self.ae_runtime_error = 0.0      # -v  float
+        self.cr_init_exit = 0            # -w  int 0-100
+        self.cr_init_error = 0           # -x  int 0-100
+        self.cr_runtime_exit = 0.0       # -y  float
+        self.cr_runtime_error = 0.0      # -z  float
         self.init_fail_cap = '99'
         self.memory_override = None
         self.init_time = 10000
@@ -597,11 +614,11 @@ class RunDucc(DuccUtil):
         self.descriptor_as_file = False
 
         try:
-            opts, args  = getopt.getopt(argv, 'b:d:fi:r:m:n:p:wx:y:?h', ['AE', 'DD', 'file', 'SE=', 'IB=', 'PB=', 'directory=', 'batchfile=', 'init_time=',
+            opts, args  = getopt.getopt(argv, 'b:d:fi:r:m:nop:s:t:u:v:w:x:y:z:?h', ['AE', 'DD', 'file', 'SE=', 'IB=', 'PB=', 'directory=', 'batchfile=', 'init_time=',
                                                                         'init_fail_cap=', 'range=', 'memory_override=', 'nmachines=', 'process_timeout=', 
-                                                                        'init_timeout=','watch',
+                                                                        'init_timeout=', 'observe'
                                                                         'jd_uima_log=', 'jp_uima_log=',
-                                                                        'http'
+                                                                        'http',
                                                                       ])
         except:
             print "Unknown option"
@@ -625,8 +642,8 @@ class RunDucc(DuccUtil):
                 self.max_machines = int(a)
             elif o in ('-p', '--process_timeout'):
                 self.process_timeout = a
-            elif o in ('-w', '--watch' ):
-                self.watch = True        
+            elif o in ('-o', '--observe' ):
+                self.observe = True        
             elif o in ('--init_timeout' ):
                 self.init_timeout = int(a)
             elif o in ('--jd_uima_log' ):
@@ -640,7 +657,7 @@ class RunDucc(DuccUtil):
             elif o in ('--SE'):
                 self.style = 'SE'
                 self.service_startup = a
-                self.watch = True
+                self.observe = True
             elif o in ( '-f', '--file'):
                 self.descriptor_as_file = True
             elif o in ('--http'):
@@ -649,12 +666,22 @@ class RunDucc(DuccUtil):
                 self.init_bloat = a
             elif o in ('--PB'):
                 self.process_bloat = a
-            elif o in ('-w', '--watch' ):
-                self.watch = True        
+            elif ( o == '-s'):
+                self.ae_init_exit = int(a)
+            elif ( o == '-t'):
+                self.ae_init_error = int(a)
+            elif ( o == '-u'):
+                self.ae_runtime_exit = float(a)
+            elif ( o == '-v'):
+                self.ae_runtime_error = float(a)
+            elif ( o == '-w'):
+                self.cr_init_exit = int(a)
             elif ( o == '-x'):
-                self.error_rate = float(a)
+                self.cr_init_error = int(a)
             elif ( o == '-y'):
-                self.init_error = int(a)
+                self.cr_runtime_exit = float(a)
+            elif ( o == '-z'):
+                self.cr_runtime_error = float(a)
             elif ( o == '-?'):
                 self.usage(None)
             else:
@@ -675,9 +702,15 @@ class RunDucc(DuccUtil):
         print '    init-timeout       :', self.init_timeout
         print '    init-bloat         :', self.init_bloat
         print '    process-bloat      :', self.process_bloat
-        print '    watch              :', self.watch           
-        print '    error_rate         :', self.error_rate
-        print '    init_error         :', self.init_error
+        print '    observe            :', self.observe           
+        print '    ae_init_exit       :', self.ae_init_exit
+        print '    ae_init_error      :', self.ae_init_error
+        print '    ae_runtime_exit    :', self.ae_runtime_exit
+        print '    ae_runtime_error   :', self.ae_runtime_error
+        print '    cr_init_exit       :', self.cr_init_exit
+        print '    cr_init_error      :', self.cr_init_error
+        print '    cr_runtime_exit    :', self.cr_runtime_exit
+        print '    cr_runtime_error   :', self.cr_runtime_error
         print '    process_timeout    :', self.process_timeout
         print '    memory_override    :', self.memory_override
         print '    max_machines       :', self.max_machines
@@ -711,12 +744,12 @@ class RunDucc(DuccUtil):
             #return;
 
         os.environ['CLASSPATH'] = self.DUCC_HOME + "/lib/uima-ducc-cli.jar"
-        if ( self.watch ):
+        if ( self.observe ):
             self.queue = Queue.Queue()
 
         self.run_batch()
                 
-        if ( self.watch ):
+        if ( self.observe ):
             self.queue.join()
             print 'All threads returned'
             if ( self.style == 'SE' ):