You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2013/03/13 19:53:54 UTC

svn commit: r1456086 - in /uima/sandbox/uima-ducc/trunk: uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ uima-ducc-spawn/src/

Author: challngr
Date: Wed Mar 13 18:53:54 2013
New Revision: 1456086

URL: http://svn.apache.org/r1456086
Log:
UIMA-2742
Console redirection.  ducc_ling always redirects stdin from the socket.
The CLI ConsoleListener captures stdin and writes to the socket for ducclets only.

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliBase.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ConsoleListener.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobSubmit.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccletSubmit.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-spawn/src/ducc_ling.c

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliBase.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliBase.java?rev=1456086&r1=1456085&r2=1456086&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliBase.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/CliBase.java Wed Mar 13 18:53:54 2013
@@ -601,13 +601,13 @@ public abstract class CliBase
     }
 
     // TODO TODO TODO - do we have to support lots of these for multi-threaded stuff?  Hope not ...
-    protected synchronized void startMonitors()
+    protected synchronized void startMonitors(boolean start_stdin)
     	throws Exception
     {
         int wait_count = 0;
 
         if ( console_listener != null ) {
-            startConsoleListener();
+            startConsoleListener(start_stdin);
             wait_count++;
         }
         
@@ -666,10 +666,11 @@ public abstract class CliBase
     /**
      * Be sure to call this BEFORE submission, to insure the callback address is set in properties.
      */
-    protected synchronized void startConsoleListener()
+    protected synchronized void startConsoleListener(boolean start_stdin)
     	throws Exception
     {        
         if ( console_attach ) {
+            console_listener.startStdin(start_stdin);
             Thread t = new Thread(console_listener);
             t.start();
         } else {

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ConsoleListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ConsoleListener.java?rev=1456086&r1=1456085&r2=1456086&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ConsoleListener.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ConsoleListener.java Wed Mar 13 18:53:54 2013
@@ -19,8 +19,11 @@
 */
 package org.apache.uima.ducc.cli;
 
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -28,18 +31,20 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.Pair;
 
 class ConsoleListener
     implements Runnable
 {
     private ServerSocket sock;
     private CliBase submit;
-    private Map<Integer, StdioListener> listeners = new HashMap<Integer, StdioListener>();
+    private Map<Integer, Pair<StdioReader, StdioWriter>> listeners = new HashMap<Integer, Pair<StdioReader, StdioWriter>>();
 
     private int          console_listener_port;
     private String       console_host_address;
 
     private boolean      in_shutdown = false;
+    private boolean      start_stdin = false;
 
     private IConsoleCallback consoleCb;
     // private int          callers;   // number of remote processes we expect to listen for
@@ -105,12 +110,13 @@ class ConsoleListener
 
     void shutdown()
     {
-        if ( debug ) System.out.println("console listener: Shutdown starts");
+        if ( debug ) System.out.println("Console handler: Shutdown starts");
         in_shutdown = true;
         try {
             sock.close();
-            for ( StdioListener sl : listeners.values() ) {
-                sl.close();
+            for ( Pair<StdioReader, StdioWriter> handler: listeners.values() ) {
+                handler.first().close();
+                handler.second().close();
             }
         } catch (Throwable t) {
             t.printStackTrace();
@@ -125,12 +131,17 @@ class ConsoleListener
             count = listeners.size();
         }
 
-        if ( debug ) System.out.println("console listener: Removed listener, size = "  + listeners.size());
+        if ( debug ) System.out.println("Console handler: Removed handler for port " + port + ", size = "  + listeners.size());
         if ( count == 0 ) {
             shutdown();
         }
     }
 
+    void startStdin(boolean start_stdin)
+    {
+        this.start_stdin = start_stdin;
+    }
+
     public void run()
     {
         if ( debug ) System.out.println("Listening on " + console_host_address + " " + console_listener_port);
@@ -138,14 +149,21 @@ class ConsoleListener
         while ( true ) {
             try {                    
                 Socket s = sock.accept();
-                StdioListener sl = new StdioListener(s, this);
+                StdioReader sr = new StdioReader(s, this);
+                StdioWriter sw = new StdioWriter(s, this);
                 int p = s.getPort();
                 synchronized(this) {
-                    listeners.put(p, sl);
+                    listeners.put(p, new Pair<StdioReader, StdioWriter>(sr, sw));
                 }
 
-                Thread t = new Thread(sl);
+                Thread t = new Thread(sr, "STDOUT");
                 t.start();                
+
+                if ( start_stdin ) {
+                    // generally started only for AP (ducclet)
+                    Thread tt = new Thread(sw, "STDIN");
+                    tt.start();             
+                }   
             } catch (Throwable t) {
                 if ( ! in_shutdown ) shutdown();
                 if ( debug ) System.out.println("console listener returns");
@@ -155,7 +173,7 @@ class ConsoleListener
         }
     }
 
-    class StdioListener
+    class StdioReader
         implements Runnable
     {
         Socket sock;
@@ -169,7 +187,7 @@ class ConsoleListener
         int tag_len = 0;
 
 
-        StdioListener(Socket sock, ConsoleListener cl)
+        StdioReader(Socket sock, ConsoleListener cl)
         {
             this.sock = sock;
             this.cl = cl;
@@ -277,5 +295,65 @@ class ConsoleListener
         }
     }
 
+    class StdioWriter
+        implements Runnable
+    {
+        Socket sock;
+        OutputStream out;
+
+        boolean done = false;
+        ConsoleListener cl;
+
+        boolean shutdown = false;
+
+        StdioWriter(Socket sock, ConsoleListener cl)
+        {
+            this.sock = sock;
+            this.cl = cl;
+        }
+
+        void close()
+        {
+            shutdown = true;
+            try {
+                if ( out != null ) {
+                    out.close();
+                }
+			} catch (IOException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+        }
+
+        public void run()
+        {
+        	if ( debug ) System.out.println("STDIN LISTENER STARTS *******");
+            try {
+                out = sock.getOutputStream();
+            } catch (Exception e) {
+                System.out.println("Cannot acquire remote socket for stdin redirection: " + e.toString());
+                return;
+            }
+
+
+            byte[] buf = new byte[4096];
+            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+            int ch;
+            int ndx = 0;
+            int max = buf.length - 1;
+            try {
+				while ( ((ch = in.read()) != -1) && (!shutdown) ) {
+				    buf[ndx++] = (byte) ch;
+				    if ( (ch == '\n') || (ndx > max)) {
+				        out.write(buf, 0, ndx);
+				        ndx = 0;
+				    }
+				}
+			} catch (IOException e) {
+                System.out.println("Error in process stdin redirection - redirection ended. " + e.toString());
+			}
+            if ( debug ) System.out.println("***********STDIN returns");
+        }
+    }
 }
 

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobSubmit.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobSubmit.java?rev=1456086&r1=1456085&r2=1456086&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobSubmit.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobSubmit.java Wed Mar 13 18:53:54 2013
@@ -379,7 +379,7 @@ public class DuccJobSubmit 
 
         if ( rc ) {
             saveSpec("job-specification.properties", jobRequestProperties);
-            startMonitors();       // starts conditionally, based on job spec and console listener present
+            startMonitors(false);       // starts conditionally, based on job spec and console listener present
         }
 
 		return rc;

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccletSubmit.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccletSubmit.java?rev=1456086&r1=1456085&r2=1456086&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccletSubmit.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccletSubmit.java Wed Mar 13 18:53:54 2013
@@ -159,7 +159,7 @@ public class DuccletSubmit 
                     retval = false;
                 } else {
                     saveSpec("process.properties", serviceRequestProperties);
-                    startMonitors();       // starts conditionally, based on job spec and console listener present
+                    startMonitors(true);       // starts conditionally, based on job spec and console listener present
                 }
             }
         }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-spawn/src/ducc_ling.c
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-spawn/src/ducc_ling.c?rev=1456086&r1=1456085&r2=1456086&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-spawn/src/ducc_ling.c (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-spawn/src/ducc_ling.c Wed Mar 13 18:53:54 2013
@@ -482,6 +482,7 @@ void redirect_to_socket(char *sockloc)
     fflush(stdout);
     fflush(stderr);
 
+    int rc0 = dup2(sock, 0);
     int rc1 = dup2(sock, 1);
     int rc2 = dup2(sock, 2);