You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2015/11/09 20:31:59 UTC

svn commit: r1713517 - /uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceInstance.java

Author: challngr
Date: Mon Nov  9 19:31:59 2015
New Revision: 1713517

URL: http://svn.apache.org/viewvc?rev=1713517&view=rev
Log:
UIMA-4690 Use async I/O spawning service instances to avoid deadlocks.

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceInstance.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceInstance.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceInstance.java?rev=1713517&r1=1713516&r2=1713517&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceInstance.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceInstance.java Mon Nov  9 19:31:59 2015
@@ -19,6 +19,7 @@
 package org.apache.uima.ducc.sm;
 
 import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
@@ -193,6 +194,8 @@ class ServiceInstance
         return args.toArray(new String[args.size()]);
     }
 
+    ArrayList<String> stdout_lines = new ArrayList<String>();
+    ArrayList<String> stderr_lines = new ArrayList<String>();
     long start(DuccProperties svc_props, DuccProperties meta_props)
     {
     	String methodName = "start";
@@ -216,34 +219,33 @@ class ServiceInstance
         }
 
         ProcessBuilder pb = new ProcessBuilder(args);
+        StdioListener sin_listener = null;
+        StdioListener ser_listener = null;
+    
         Map<String, String> env = pb.environment();
         env.put("DUCC_HOME", System.getProperty("DUCC_HOME"));
         env.put("DUCC_SERVICE_INSTANCE", Integer.toString(instance_id));  // UIMA-4258
 
-        ArrayList<String> stdout_lines = new ArrayList<String>();
-        ArrayList<String> stderr_lines = new ArrayList<String>();
 		try {
 			Process p = pb.start();
 
-            int rc = p.waitFor();
-            logger.debug(methodName, null, "DuccServiceSubmit returns with rc", rc);
-
             // TODO: we should attach these streams to readers in threads because too much output
             //       can cause blocking, deadlock, ugliness.
 			InputStream stdout = p.getInputStream();
 			InputStream stderr = p.getErrorStream();
-			BufferedReader stdout_reader = new BufferedReader(new InputStreamReader(stdout));
-			BufferedReader stderr_reader = new BufferedReader(new InputStreamReader(stderr));
-			String line = null;
-			while ( (line = stdout_reader.readLine()) != null ) {
-			    stdout_lines.add(line);
-			}
-
-			line = null;
-			while ( (line = stderr_reader.readLine()) != null ) {
-			    stderr_lines.add(line);
-			}
 
+            sin_listener = new StdioListener(1, stdout);
+            ser_listener = new StdioListener(2, stderr);
+            Thread sol = new Thread(sin_listener);
+            Thread sel = new Thread(ser_listener);
+            sol.start();
+            sel.start();
+
+            int rc = p.waitFor();
+            logger.debug(methodName, null, "DuccServiceSubmit returns with rc", rc);
+
+            sin_listener.stop();
+            ser_listener.stop();
 		} catch (Throwable t) {
             logger.error(methodName, sset.getId(), t);
             try {
@@ -407,4 +409,65 @@ class ServiceInstance
         }
     }
 
+    class StdioListener
+        implements Runnable
+    {
+        InputStream in;
+        String tag;
+        boolean done = false;
+        int which = 0;
+
+        StdioListener(int which, InputStream in)
+        {
+            this.in = in;
+            this.which = which;
+            switch ( which ) {
+               case 1: tag = "STDOUT: "; break;
+               case 2: tag = "STDERR: "; break;
+            }
+        }
+
+        void stop()
+        {
+            this.done = true;
+        }
+
+        public void run()
+        {
+            long tid = Thread.currentThread().getId();
+            String methodName = "SvcSubmit[" + tid + "]";
+
+            BufferedReader br = new BufferedReader(new InputStreamReader(in));
+            while ( true ) {
+                try {
+                    if ( done ) return;
+                    String s = br.readLine();
+                    if ( logger.isTrace() ) {
+                        logger.trace(methodName, sset.getId(), "[]", tag, s);
+                    }
+                    if ( s == null ) {
+                        String msg = tag + "closed, listener returns";
+                        logger.info(methodName, sset.getId(), msg);
+                        return;
+                    }
+                    switch ( which ) {
+                        case 1:
+                            stdout_lines.add(s);
+                            break;
+                        case 2:
+                            stderr_lines.add(s);
+                            break;
+                    }
+
+
+				} catch (Exception e) {
+                    // if anything goes wrong this guy is toast.
+                    logger.error(methodName, sset.getId(), e);
+                    return;
+				} 
+            }
+
+        }
+    }
+
 }