You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2013/04/10 22:36:37 UTC

svn commit: r1466660 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm: PingDriver.java ServiceManagerComponent.java ServiceSet.java UimaAsPing.java

Author: challngr
Date: Wed Apr 10 20:36:36 2013
New Revision: 1466660

URL: http://svn.apache.org/r1466660
Log:
UIMA-2807
Make default UIMA-AS pinger run as an internal thread instead of a process.

Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java?rev=1466660&r1=1466659&r2=1466660&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java Wed Apr 10 20:36:36 2013
@@ -63,7 +63,11 @@ class PingDriver
     String ping_class;
     String classpath;
     boolean ping_ok;
-    int missed_pings = 0;
+
+    int missed_pings = 0;            // didn't ping in specified time, but no error thrown
+    int errors = 0;                  // error, no good
+    int error_threshold = 5;         // max errors before we die
+
     ServiceSet sset;
     boolean test_mode = false;
 
@@ -85,9 +89,10 @@ class PingDriver
     String user;
     String working_directory;
     String log_directory;
-
     boolean do_log = true;
 
+    boolean shutdown = false;
+    
     PingDriver(ServiceSet sset)
     {        
         this.sset = sset;
@@ -97,12 +102,14 @@ class PingDriver
         this.endpoint          = meta_props.getStringProperty("endpoint");
         this.user              = meta_props.getStringProperty("user");
         String jvm_args_str    = job_props.getStringProperty("service_ping_jvm_args", "");
-        this.ping_class        = job_props.getStringProperty("service_ping_class");
-        this.meta_ping_timeout = job_props.getStringProperty("service_ping_timeout");
-        this.do_log            = job_props.getBooleanProperty("service_ping_dolog", true);
-        this.classpath         = job_props.getStringProperty("service_ping_classpath");
-        this.working_directory = job_props.getStringProperty("working_directory");
-        this.log_directory     = job_props.getStringProperty("log_directory");
+        this.ping_class        = job_props.getStringProperty("service_ping_class", null);
+        if ( this.ping_class != null ) {     // otherwise it's implicit or submitted and we don't care about any of these
+            this.meta_ping_timeout = job_props.getStringProperty("service_ping_timeout");
+            this.do_log            = job_props.getBooleanProperty("service_ping_dolog", true);
+            this.classpath         = job_props.getStringProperty("service_ping_classpath");
+            this.working_directory = job_props.getStringProperty("working_directory");
+            this.log_directory     = job_props.getStringProperty("log_directory");
+        }
 
         jvm_args_str = jvm_args_str + " -Dducc.sm.meta.ping.timeout=" + meta_ping_timeout;
         jvm_args_str = jvm_args_str.trim();
@@ -153,9 +160,82 @@ class PingDriver
         return meta_ping_rate;
     }
 
-    public void run() 
+    public void run()
+    {
+        if ( this.ping_class == null ) {
+            // This is the default ping driver, as configured in ducc.propeties, to be run in
+            // an in-process thread
+            runAsThread();
+        } else {
+            // The user specified a pinger, run it as an extranal process under that user's identity
+            runAsProcess();
+        }
+
+    }
+
+    void handleStatistics(ServiceStatistics stats)
+    {
+        String methodName = "handleStatistics";
+
+        this.service_statistics = stats;
+        if ( stats == null ) {
+            logger.error(methodName, sset.getId(), "Service statics are null!");
+            errors++;
+        } else {
+            if ( service_statistics.isAlive() ) {
+                synchronized(this) {
+                    sset.setResponsive();
+                }
+                logger.info(methodName, sset.getId(), "Ping ok: ", endpoint, stats.toString());
+                missed_pings = 0;
+            } else {
+                logger.error(methodName, sset.getId(), "Missed_pings ", missed_pings, "endpoint", endpoint);
+                if ( ++missed_pings > meta_ping_stability ) {
+                    sset.setUnresponsive();
+                    logger.info(methodName, sset.getId(), "Seting state to unresponsive, endpoint",endpoint);
+                } else {
+                    sset.setWaiting();
+                    logger.info(methodName, sset.getId(), "Seting state to waiting, endpoint,", endpoint);
+                }                
+            }
+        }
+
+    }
+
+    public void runAsThread()
+    {
+    	String methodName = "runAsThread";
+        UimaAsPing uap = new UimaAsPing(logger);
+        logger.info(methodName, sset.getId(), "Starting INTERNAL ping.");
+        try {
+            uap.init(endpoint);
+        } catch ( Throwable t ) {
+            logger.warn(methodName, sset.getId(), t);
+            sset.pingExited();
+        }
+        while ( ! shutdown ) {
+            
+            handleStatistics(uap.getStatistics());
+            if ( errors > error_threshold ) {
+                uap.stop();
+                logger.warn(methodName, sset.getId(), "Ping exited because of excess errors: ", errors);
+                sset.pingExited();
+            }
+            
+            try {
+				Thread.sleep(meta_ping_rate);
+			} catch (InterruptedException e) {
+                // nothing, if we were shutdown we'll exit anyway, otherwise who cares
+			}
+            
+        }
+    }
+
+    public void runAsProcess() 
     {
         String methodName = "run";
+        logger.info(methodName, sset.getId(), "Starting EXTERNAL ping.");
+
         try {
             pinger =  new PingThread();
         } catch ( Throwable t ) {
@@ -244,10 +324,13 @@ class PingDriver
 
     public void stop()
     {
-        pinger.stop();
-        sin_listener.stop();
-        ser_listener.stop();
-        ping_main.destroy();
+        shutdown = true;
+        if ( this.ping_class != null ) {
+            if ( pinger       != null ) pinger.stop();
+            if ( sin_listener != null ) sin_listener.stop();
+            if ( ser_listener != null ) ser_listener.stop();
+            if ( ping_main    != null ) ping_main.destroy();
+        }
     }
 
     class PingThread
@@ -256,8 +339,6 @@ class PingDriver
         ServerSocket server;
         int port = -1;
         boolean done = false;
-        int errors =0;
-        int error_threshold = 5;
 
         PingThread()
             throws IOException
@@ -309,30 +390,7 @@ class PingDriver
                     }
                     
                     // Try to read the response
-                    // TODO: set the socket timeout on this
-                    service_statistics = (ServiceStatistics) ois.readObject();
-                    if ( service_statistics == null ) {
-                        logger.error(methodName, sset.getId(), "Stats are null!");
-                        errors++;
-                    } else {
-                        if ( service_statistics.isAlive() ) {
-                            synchronized(this) {
-                                if ( done ) return;
-                                sset.setResponsive();
-                            }
-                            logger.info(methodName, sset.getId(), "Ping ok: ", endpoint, service_statistics.toString());
-                            missed_pings = 0;
-                        } else {
-                            logger.error(methodName, sset.getId(), "Missed_pings ", missed_pings, "endpoint", endpoint);
-                            if ( ++missed_pings > meta_ping_stability ) {
-                                sset.setUnresponsive();
-                                logger.info(methodName, sset.getId(), "Seting state to unresponsive, endpoint",endpoint);
-                            } else {
-                                sset.setWaiting();
-                                logger.info(methodName, sset.getId(), "Seting state to waiting, endpoint,", endpoint);
-                            }                
-                        }
-                    }
+                    handleStatistics((ServiceStatistics) ois.readObject());
 
                     // This kliudge is required because we have to insure that pings aren't too frequent or ActiveMQ will get OutOfMemory errors.
                     // So we do exponential backoff.  If a new job references the service we set the ping rate to something frequent for a while
@@ -348,9 +406,9 @@ class PingDriver
 
                     // Wait a bit for the next one
                     try {
-                        logger.info(methodName, sset.getId(), "SLEEPING", my_ping_rate, "ms", sset.toString());
+                        // logger.info(methodName, sset.getId(), "SLEEPING", my_ping_rate, "ms", sset.toString());
                         Thread.sleep(my_ping_rate);
-                        logger.info(methodName, sset.getId(), "SLEEP returns", sset.toString());
+                        // logger.info(methodName, sset.getId(), "SLEEP returns", sset.toString());
                     } catch (InterruptedException e) {
                         // nothing
                     }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java?rev=1466660&r1=1466659&r2=1466660&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java Wed Apr 10 20:36:36 2013
@@ -149,8 +149,8 @@ public class ServiceManagerComponent 
                     metaprops.load(meta_filename);
                     
                     String sc = metaprops.getProperty("service-class");
-                    if ( (sc != null) && ( sc.equals("Implicit") ) ) {
-                        logger.info(methodName, null, "Scrubbing implicit service", stem);
+                    if ( (sc != null) && ( sc.equals("Implicit") || sc.equals("Submitted") ) ) {
+                        logger.info(methodName, null, "Scrubbing", sc, "service", stem);
                         try {
                             File mf = new File(meta_filename);
                             mf.delete();

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java?rev=1466660&r1=1466659&r2=1466660&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Wed Apr 10 20:36:36 2013
@@ -150,16 +150,14 @@ public class ServiceSet
 
         String state_dir = System.getProperty("DUCC_HOME") + "/state";
 
-        // need job props and meta props so pinger works
-        // job props: , service_ping_class, service_ping_classpath, working_directory, log_directory
+        // Need job props and meta props for webserver.
+        // The pinger is always the default configured UIMA-AS pinger.
+        //
+        // job props: working_directory, log_directory
         // meta props: endpoint, user
         job_props = new DuccProperties();
-        job_props.put("service_ping_class", ServiceManagerComponent.default_ping_class);
-        job_props.put("service_ping_classpath", System.getProperty("java.class.path"));
-        job_props.put("service_ping_dolog", "false");
-        job_props.put("service_ping_timeout", ""+ServiceManagerComponent.meta_ping_timeout);
-        job_props.put("working_directory", System.getProperty("user.dir")); // whatever my current dir is
-        job_props.put("log_directory", System.getProperty("user.dir") + "/../logs");
+        // job_props.put("working_directory", System.getProperty("user.dir")); // whatever my current dir is
+        // job_props.put("log_directory", System.getProperty("user.dir") + "/../logs");
         //job_props.put("service_ping_jvm_args", "-Xmx50M");
         props_filename = state_dir + "/services/" + id.toString() + ".svc";
         saveServiceProperties();
@@ -194,14 +192,13 @@ public class ServiceSet
         parseEndpoint(key);
 
         String state_dir = System.getProperty("DUCC_HOME") + "/state";
-
+        // Need job props and meta props for webserver.
+        // The pinger is always the default configured UIMA-AS pinger.
+        // Submitted services must always be UIMA-AS services, for now, checked in caller.
+        //
+        // job props: working_directory, log_directory
+        // meta props: endpoint, user
         job_props = new DuccProperties();
-        job_props.put("service_ping_class", ServiceManagerComponent.default_ping_class);
-        job_props.put("service_ping_classpath", System.getProperty("java.class.path"));
-        job_props.put("service_ping_dolog", "false");
-        job_props.put("service_ping_timeout", ""+ServiceManagerComponent.meta_ping_timeout);
-        job_props.put("working_directory", System.getProperty("user.dir")); // whatever my current dir is
-        job_props.put("log_directory", System.getProperty("user.dir") + "/../logs");
         //job_props.put("service_ping_jvm_args", "-Xmx50M");
         props_filename = state_dir + "/services/" + id.toString() + ".svc";
         saveServiceProperties();
@@ -254,18 +251,12 @@ public class ServiceSet
 
         parseIndependentServices();
 
-        if ( service_type == ServiceType.UimaAs ) {            
-            if ( ! job_props.containsKey("service_ping_class" ) ) {
-                job_props.put("service_ping_class", ServiceManagerComponent.default_ping_class);
-                job_props.put("service_ping_classpath", System.getProperty("java.class.path"));
-                // this lets us turn on debug logging for the default pinger
-                if ( ! job_props.containsKey("service_ping_dolog")) {
-                    job_props.put("service_ping_dolog", "false");
-                }        
-                // job_props.put("service_ping_jvm_args", "-Xmx0M");
-            }
-        }
-
+        if ( ! job_props.containsKey("service_ping_classpath")) {
+            job_props.put("service_ping_classpath", System.getProperty("java.class.path"));
+        }        
+        if ( ! job_props.containsKey("service_ping_dolog")) {
+            job_props.put("service_ping_dolog", "false");
+        }        
         if ( !job_props.containsKey("service_ping_timeout") ) {
             job_props.put("service_ping_timeout", ""+ServiceManagerComponent.meta_ping_timeout);
         }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java?rev=1466660&r1=1466659&r2=1466660&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java Wed Apr 10 20:36:36 2013
@@ -12,6 +12,7 @@ import org.apache.uima.ducc.common.AServ
 import org.apache.uima.ducc.common.ServiceStatistics;
 import org.apache.uima.ducc.common.TcpStreamHandler;
 import org.apache.uima.ducc.common.UimaAsServiceMonitor;
+import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.util.Level;
@@ -30,6 +31,17 @@ public class UimaAsPing
     int    broker_jmx_port;
     boolean connected;
     UimaAsServiceMonitor monitor;
+    DuccLogger logger = null;
+    
+    UimaAsPing()
+    {
+        this.logger = null;
+    }
+
+    UimaAsPing(DuccLogger logger)
+    {
+        this.logger = logger;
+    }
 
     public void init(String ep)
         throws Exception
@@ -55,6 +67,7 @@ public class UimaAsPing
         broker_host = url.getHost();
         // not needed here fyi broker_port = url.getPort();
 
+        
         String to = System.getProperty("ducc.sm.meta.ping.timeout");
         meta_timeout = Integer.parseInt(to);
 
@@ -72,26 +85,38 @@ public class UimaAsPing
 
     public void stop()
     {
+        if ( monitor != null ) monitor.stop();
+    }
+
+    private void doLog(String methodName, String msg)
+    {
+        if ( logger == null ) {
+            System.out.println(msg);
+        } else {
+            logger.info(methodName, null, msg);
+        }
     }
 
     private synchronized void init_monitor()
     {
+        String methodName = "init_monitor";
         if ( ! connected ) {
             try {
-                System.out.println("Initializing monitor");
+                doLog(methodName, "Initializing monitor");
                 monitor.init(ep);
                 connected = true;
-                System.out.println("Monitor initialized");
+                doLog(methodName, "Monitor initialized");
             } catch (Throwable t ) {
                 connected = false;
                 // t.printStackTrace();
-                System.err.println("Cannot initialize monitor: " + t.getMessage());
+                doLog(methodName, "Cannot initialize monitor: " + t.toString());
             }
         }
     }
 
     public ServiceStatistics getStatistics()
     {
+        String methodName = "getStatistics";
         ServiceStatistics statistics = new ServiceStatistics(false, false, "<NA>");
 
         // Instantiate Uima AS Client
@@ -116,8 +141,7 @@ public class UimaAsPing
             // System.out.println("getMeta ok: " + ep);
 
         } catch( ResourceInitializationException e) {
-            System.out.println("Cannot issue getMeta: " + e.getMessage());
-            e.printStackTrace();
+            doLog(methodName, "Cannot issue getMeta: " + e.toString());
         } finally {
             uimaAsEngine.stop();
         }