You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2013/04/04 16:11:06 UTC

svn commit: r1464561 - /uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ConsoleListener.java

Author: challngr
Date: Thu Apr  4 14:11:05 2013
New Revision: 1464561

URL: http://svn.apache.org/r1464561
Log:
UIMA-2742
ConsoleListener is blocking on stdin.  Need to poll instead of listen for stdin
to avoid blocking the thread.

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ConsoleListener.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ConsoleListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ConsoleListener.java?rev=1464561&r1=1464560&r2=1464561&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ConsoleListener.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ConsoleListener.java Thu Apr  4 14:11:05 2013
@@ -19,10 +19,8 @@
 */
 package org.apache.uima.ducc.cli;
 
-import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
@@ -115,8 +113,8 @@ class ConsoleListener
         try {
             sock.close();
             for ( Pair<StdioReader, StdioWriter> handler: listeners.values() ) {
-                handler.first().close();
-                handler.second().close();
+                handler.first().shutdown();
+                handler.second().shutdown();
             }
         } catch (Throwable t) {
             t.printStackTrace();
@@ -127,7 +125,16 @@ class ConsoleListener
     {
         int count;
         synchronized(this) {
-            listeners.remove(port);
+            Pair<StdioReader, StdioWriter> listener = listeners.remove(port);
+            if ( listener != null ) {
+                try {
+					listener.first().shutdown();
+					listener.second().shutdown();
+				} catch (Exception e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+            }
             count = listeners.size();
         }
 
@@ -150,7 +157,7 @@ class ConsoleListener
             try {                    
                 Socket s = sock.accept();
                 StdioReader sr = new StdioReader(s, this);
-                StdioWriter sw = new StdioWriter(s, this);
+                StdioWriter sw = new StdioWriter(s);
                 int p = s.getPort();
                 synchronized(this) {
                     listeners.put(p, new Pair<StdioReader, StdioWriter>(sr, sw));
@@ -178,7 +185,7 @@ class ConsoleListener
     {
         Socket sock;
         InputStream is;
-        boolean done = false;
+        boolean shutdown = false;
         ConsoleListener cl;
         String remote_host;
         String logfile = "N/A";
@@ -199,11 +206,12 @@ class ConsoleListener
             if ( debug ) System.out.println("===== Listener starting: " + remote_host + ":" + sock.getPort());
         }
 
-        public void close()
-            throws Throwable
+        public void shutdown()
+            throws Exception
         {
+            if ( shutdown ) return;  // idempotency, things can happen in all sorts of orders
             if ( debug ) System.out.println("===== Listener completing: " + remote_host + ":" + sock.getPort());
-            this.done = true;
+            shutdown = true;
             is.close();
             cl.delete(sock.getPort());
         }
@@ -293,12 +301,12 @@ class ConsoleListener
                 if ( debug ) System.out.println(remote_host + ": EOF:  exiting");
             } catch ( Throwable t ) {
                 t.printStackTrace();
-            }
-            try {
-                close();
-            } catch (Throwable e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
+            } finally {
+                try {
+                    shutdown();
+                } catch (Throwable e) {
+                    // crash here, don't care, can't do anything about it
+                }
             }
         }
     }
@@ -310,19 +318,20 @@ class ConsoleListener
         OutputStream out;
 
         boolean done = false;
-        ConsoleListener cl;
+        boolean is_shutdown = false;
 
-        boolean shutdown = false;
-
-        StdioWriter(Socket sock, ConsoleListener cl)
+        StdioWriter(Socket sock)
         {
             this.sock = sock;
-            this.cl = cl;
         }
 
-        void close()
+        synchronized void shutdown()
+        {
+            is_shutdown = true;
+        }
+
+        private void close()
         {
-            shutdown = true;
             try {
                 if ( out != null ) {
                     out.close();
@@ -345,21 +354,36 @@ class ConsoleListener
 
 
             byte[] buf = new byte[4096];
-            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
-            int ch;
-            int ndx = 0;
-            int max = buf.length - 1;
+            int dbg = 0;
             try {
-				while ( ((ch = in.read()) != -1) && (!shutdown) ) {
-				    buf[ndx++] = (byte) ch;
-				    if ( (ch == '\n') || (ndx > max)) {
-				        out.write(buf, 0, ndx);
-				        ndx = 0;
-				    }
-				}
+				while ( true ) {
+                    int cnt = System.in.available();
+                    if ( cnt > 0 ) {
+                        while ( cnt > 0) {
+                            int min = Math.min(cnt, buf.length);
+                            System.in.read(buf, 0, min);
+                            out.write(buf, 0, min);
+                            cnt -= min;
+                        }
+                    } else {
+                        synchronized(this) {
+                            if ( is_shutdown ) break;
+                        }
+                        try {
+                            Thread.sleep(100);
+                            if ( ++dbg % 100 == 0 ) {
+                                if ( debug ) System.out.println("STDIN: Sleep: " + dbg);
+                            }
+                        } catch ( InterruptedException e ) {
+                            break;
+                        }
+                    }
+                }
 			} catch (IOException e) {
                 System.out.println("Error in process stdin redirection - redirection ended. " + e.toString());
-			}
+			} finally {
+                close();
+            }
             if ( debug ) System.out.println("***********STDIN returns");
         }
     }