You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2015/11/09 20:31:59 UTC
svn commit: r1713517 -
/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceInstance.java
Author: challngr
Date: Mon Nov 9 19:31:59 2015
New Revision: 1713517
URL: http://svn.apache.org/viewvc?rev=1713517&view=rev
Log:
UIMA-4690 Use async I/O spawning service instances to avoid deadlocks.
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceInstance.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceInstance.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceInstance.java?rev=1713517&r1=1713516&r2=1713517&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceInstance.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceInstance.java Mon Nov 9 19:31:59 2015
@@ -19,6 +19,7 @@
package org.apache.uima.ducc.sm;
import java.io.BufferedReader;
+import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
@@ -193,6 +194,8 @@ class ServiceInstance
return args.toArray(new String[args.size()]);
}
+ ArrayList<String> stdout_lines = new ArrayList<String>();
+ ArrayList<String> stderr_lines = new ArrayList<String>();
long start(DuccProperties svc_props, DuccProperties meta_props)
{
String methodName = "start";
@@ -216,34 +219,33 @@ class ServiceInstance
}
ProcessBuilder pb = new ProcessBuilder(args);
+ StdioListener sin_listener = null;
+ StdioListener ser_listener = null;
+
Map<String, String> env = pb.environment();
env.put("DUCC_HOME", System.getProperty("DUCC_HOME"));
env.put("DUCC_SERVICE_INSTANCE", Integer.toString(instance_id)); // UIMA-4258
- ArrayList<String> stdout_lines = new ArrayList<String>();
- ArrayList<String> stderr_lines = new ArrayList<String>();
try {
Process p = pb.start();
- int rc = p.waitFor();
- logger.debug(methodName, null, "DuccServiceSubmit returns with rc", rc);
-
// TODO: we should attach these streams to readers in threads because too much output
// can cause blocking, deadlock, ugliness.
InputStream stdout = p.getInputStream();
InputStream stderr = p.getErrorStream();
- BufferedReader stdout_reader = new BufferedReader(new InputStreamReader(stdout));
- BufferedReader stderr_reader = new BufferedReader(new InputStreamReader(stderr));
- String line = null;
- while ( (line = stdout_reader.readLine()) != null ) {
- stdout_lines.add(line);
- }
-
- line = null;
- while ( (line = stderr_reader.readLine()) != null ) {
- stderr_lines.add(line);
- }
+ sin_listener = new StdioListener(1, stdout);
+ ser_listener = new StdioListener(2, stderr);
+ Thread sol = new Thread(sin_listener);
+ Thread sel = new Thread(ser_listener);
+ sol.start();
+ sel.start();
+
+ int rc = p.waitFor();
+ logger.debug(methodName, null, "DuccServiceSubmit returns with rc", rc);
+
+ sin_listener.stop();
+ ser_listener.stop();
} catch (Throwable t) {
logger.error(methodName, sset.getId(), t);
try {
@@ -407,4 +409,65 @@ class ServiceInstance
}
}
+ class StdioListener
+ implements Runnable
+ {
+ InputStream in;
+ String tag;
+ boolean done = false;
+ int which = 0;
+
+ StdioListener(int which, InputStream in)
+ {
+ this.in = in;
+ this.which = which;
+ switch ( which ) {
+ case 1: tag = "STDOUT: "; break;
+ case 2: tag = "STDERR: "; break;
+ }
+ }
+
+ void stop()
+ {
+ this.done = true;
+ }
+
+ public void run()
+ {
+ long tid = Thread.currentThread().getId();
+ String methodName = "SvcSubmit[" + tid + "]";
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(in));
+ while ( true ) {
+ try {
+ if ( done ) return;
+ String s = br.readLine();
+ if ( logger.isTrace() ) {
+ logger.trace(methodName, sset.getId(), "[]", tag, s);
+ }
+ if ( s == null ) {
+ String msg = tag + "closed, listener returns";
+ logger.info(methodName, sset.getId(), msg);
+ return;
+ }
+ switch ( which ) {
+ case 1:
+ stdout_lines.add(s);
+ break;
+ case 2:
+ stderr_lines.add(s);
+ break;
+ }
+
+
+ } catch (Exception e) {
+ // if anything goes wrong this guy is toast.
+ logger.error(methodName, sset.getId(), e);
+ return;
+ }
+ }
+
+ }
+ }
+
}