You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/15 18:37:20 UTC

svn commit: r1433548 - in /uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm: PingDriver.java ServiceHandler.java ServiceManagerComponent.java ServicePingMain.java ServiceSet.java TcpStreamHandler.java UimaAsPing.java

Author: cwiklik
Date: Tue Jan 15 17:37:20 2013
New Revision: 1433548

URL: http://svn.apache.org/viewvc?rev=1433548&view=rev
Log:
UIMA-2577 committing for jim

Added:
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/TcpStreamHandler.java   (with props)
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java   (with props)
Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServicePingMain.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java?rev=1433548&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java Tue Jan 15 17:37:20 2013
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.sm;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+
+import org.apache.uima.ducc.common.ServiceStatistics;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccProperties;
+
+
+/**
+ * This runs the watchdog thread for custom service pingers.
+ *
+ * It spawns a process, as the user, which in turn will instantiate an object which extends
+ * AServiceMeta to implement the pinger.
+ *
+ * The processes communicate via a pipe: every ping interval the meta puts relevent information onto its
+ * stdout:
+ *     0|1 long long
+ * The first token is 1 if the ping succeeded, 0 otherwise.
+ * The second token is the total cumulative work executed by the service.
+ * The third token is the current queue depth of the service.       
+ */
+
+class PingDriver
+    implements IServiceMeta,
+               SmConstants
+{
+    private DuccLogger logger = DuccLogger.getLogger(this.getClass().getName(), COMPONENT_NAME);	
+
+    String[] jvm_args;
+    String endpoint;
+    String ping_class;
+    String classpath;
+    boolean ping_ok;
+    int missed_pings = 0;
+    ServiceSet sset;
+    boolean test_mode = false;
+
+    Process ping_main;
+
+    StdioListener sin_listener = null;
+    StdioListener ser_listener = null;
+    PingThread pinger = null;
+
+    int meta_ping_rate;
+    int meta_ping_stability;
+    String meta_ping_timeout;
+    ServiceStatistics service_statistics = null;
+
+    String user;
+    String working_directory;
+    String log_directory;
+
+    boolean do_log = true;
+
+    PingDriver(ServiceSet sset)
+    {        
+        this.sset = sset;
+        DuccProperties job_props = sset.getJobProperties();
+        DuccProperties meta_props = sset.getMetaProperties();
+
+        this.endpoint          = meta_props.getStringProperty("endpoint");
+        this.user              = meta_props.getStringProperty("user");
+        String jvm_args_str    = job_props.getStringProperty("service_ping_jvm_args", "");
+        this.ping_class        = job_props.getStringProperty("service_ping_class");
+        this.meta_ping_timeout = job_props.getStringProperty("service_ping_timeout");
+        this.do_log            = job_props.getBooleanProperty("service_ping_dolog", true);
+        this.classpath         = job_props.getStringProperty("service_ping_classpath");
+        this.working_directory = job_props.getStringProperty("working_directory");
+        this.log_directory     = job_props.getStringProperty("log_directory");
+
+        jvm_args_str = jvm_args_str + " -Dducc.sm.meta.ping.timeout=" + meta_ping_timeout;
+        jvm_args_str = jvm_args_str.trim();
+        jvm_args = jvm_args_str.split("\\s+");
+
+        this.meta_ping_rate = ServiceManagerComponent.meta_ping_rate;
+        this.meta_ping_stability = ServiceManagerComponent.meta_ping_stability;
+    }
+
+    /**
+     * Test from main only
+     */
+    PingDriver(String props)
+    {        
+        DuccProperties dp = new DuccProperties();
+        try {
+			dp.load(props);
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+
+        this.endpoint = dp.getStringProperty("endpoint");
+        String jvm_args_str = dp.getStringProperty("service_ping_jvm_args", "");
+        this.ping_class = dp.getStringProperty("service_ping_ping");
+        this.classpath = dp.getStringProperty("service_ping_classpath");
+        jvm_args = jvm_args_str.split(" ");
+        this.test_mode = true;
+    }
+
+    public ServiceStatistics getServiceStatistics()
+    {
+        return service_statistics;
+    }
+
+    public void run() 
+    {
+        String methodName = "run";
+        try {
+            pinger =  new PingThread();
+        } catch ( Throwable t ) {
+            logger.error(methodName, sset.getId(), "Cannot start listen socket, pinger not started.", t);
+            sset.setUnresponsive();
+            return;
+        }
+        int port = pinger.getPort();
+
+        Thread ping_thread = new Thread(pinger);
+        ping_thread.start();                            // sets up the listener, before we start the the external process
+
+        ArrayList<String> arglist = new ArrayList<String>();
+        if ( ! test_mode ) {
+            arglist.add(System.getProperty("ducc.agent.launcher.ducc_spawn_path"));
+            arglist.add("-u");
+            arglist.add(user);
+            arglist.add("-w");
+            arglist.add(working_directory);
+            if ( do_log ) {
+                arglist.add("-f");
+                arglist.add(log_directory + "/services/ping/" + sset.getId());
+            }
+            arglist.add("--");
+        }
+
+        arglist.add(System.getProperty("ducc.jvm"));
+        for ( String s : jvm_args) {
+            arglist.add(s);
+        }
+        arglist.add("-cp");
+        arglist.add(System.getProperty("java.class.path") + ":" + classpath);
+        arglist.add("org.apache.uima.ducc.sm.ServicePingMain");
+        arglist.add("--class");
+        arglist.add(ping_class);
+        arglist.add("--endpoint");
+        arglist.add(endpoint);
+        arglist.add("--port");
+        arglist.add(Integer.toString(port));
+        
+        int i = 0;
+        for ( String s : arglist) {
+            logger.debug(methodName, sset.getId(), "Args[", i++,"]:  ", s);
+        }
+
+        ProcessBuilder pb = new ProcessBuilder(arglist);
+        
+        //
+        // Establish our pinger
+        //
+        InputStream stdout = null;
+        InputStream stderr = null;
+        try {
+            ping_main = pb.start();
+            stdout = ping_main.getInputStream();
+            stderr = ping_main.getErrorStream();
+            
+            sin_listener = new StdioListener(1, stdout);
+            ser_listener = new StdioListener(2, stderr);
+            Thread sol = new Thread(sin_listener);
+            Thread sel = new Thread(ser_listener);
+            sol.start();
+            sel.start();
+        } catch (Throwable t) {
+            logger.error(methodName, sset.getId(), "Cannot establish ping process:", t);
+            sset.setUnresponsive();
+            return;
+        }
+        
+        int rc;
+        while ( true ) {
+            try {
+                rc = ping_main.waitFor();
+                logger.debug(methodName, sset.getId(), "Pinger returns rc ", rc);
+                sset.pingExited();
+                break;
+            } catch (InterruptedException e2) {
+                // nothing
+            }
+        }
+		
+		pinger.stop();
+        sin_listener.stop();
+        ser_listener.stop();
+    }
+
+    public void stop()
+    {
+        pinger.stop();
+        sin_listener.stop();
+        ser_listener.stop();
+        ping_main.destroy();
+    }
+
+    class PingThread
+        implements Runnable
+    {
+        ServerSocket server;
+        int port = -1;
+        boolean done = false;
+        int errors =0;
+        int error_threshold = 5;
+
+        PingThread()
+            throws IOException
+        {
+            this.server = new ServerSocket(0);
+            this.port = server.getLocalPort();
+		}
+
+        int getPort()
+        {
+            return this.port;
+        }
+
+        synchronized void stop()
+        {
+            this.done = true;
+        }
+
+        public void run()
+        {
+        	String methodName = "PingThread.run()";
+            try {
+
+                Socket sock = server.accept();
+                // Socket sock = new Socket("localhost", port);
+                sock.setSoTimeout(5000);
+                OutputStream out = sock.getOutputStream();
+                InputStream  in =  sock.getInputStream();
+                ObjectInputStream ois = new ObjectInputStream(in);
+                
+                ping_ok = false;         // we expect the callback to change this
+				while ( true ) {
+                    synchronized(this) {
+                        if ( done ) return;
+                    }
+
+                    if ( errors > error_threshold ) {
+                        stop();
+                    }
+
+                    // Ask for the ping
+                    try {
+                        logger.trace(methodName, sset.getId(), "PingDriver: ping OUT");
+                        out.write('P');
+                        out.flush();
+                    } catch (IOException e1) {
+                        logger.error(methodName, sset.getId(), e1);
+                        errors++;
+                    }
+
+                    // Wait a bit
+                    try {
+                        Thread.sleep(meta_ping_rate);
+                    } catch (InterruptedException e) {
+                        // nothing
+                    }
+                    
+                    // Try to read the response
+                    // TODO: set the socket timeout on this
+                    service_statistics = (ServiceStatistics) ois.readObject();
+                    if ( service_statistics == null ) {
+                        logger.error(methodName, sset.getId(), "Stats are null!");
+                        errors++;
+                    } else {
+                        if ( service_statistics.isAlive() ) {
+                            synchronized(this) {
+                                if ( done ) return;
+                                sset.setResponsive();
+                            }
+                            logger.info(methodName, sset.getId(), "Ping ok: ", endpoint, service_statistics.toString());
+                            missed_pings = 0;
+                        } else {
+                            logger.error(methodName, sset.getId(), "Missed_pings ", missed_pings, "endpoint", endpoint);
+                            if ( ++missed_pings > meta_ping_stability ) {
+                                sset.setUnresponsive();
+                                logger.info(methodName, sset.getId(), "Seting state to unresponsive, endpoint",endpoint);
+                            } else {
+                                sset.setWaiting();
+                                logger.info(methodName, sset.getId(), "Seting state to waiting, endpoint,", endpoint);
+                            }                
+                        }
+                    }
+                }
+			} catch (IOException e) {
+                logger.error(methodName, sset.getId(), "Error receiving ping", e);
+                errors++;
+			} catch (ClassNotFoundException e) {
+                logger.error(methodName, sset.getId(), "Input garbled:", e);
+                errors++;
+			}
+        }       
+    }
+
+    class StdioListener
+        implements Runnable
+    {
+        InputStream in;
+        String tag;
+        boolean done = false;
+
+        StdioListener(int which, InputStream in)
+        {
+            this.in = in;
+            switch ( which ) {
+               case 1: tag = "STDOUT: "; break;
+               case 2: tag = "STDERR: "; break;
+            }
+        }
+
+        void stop()
+        {
+            this.done = true;
+        }
+
+        public void run()
+        {
+            if ( done ) return;
+            String methodName = "StdioListener.run";
+
+            BufferedReader br = new BufferedReader(new InputStreamReader(in));
+            while ( true ) {
+                try {
+                    String s = br.readLine();
+                    if   ( test_mode ) System.out.println(tag + s);
+                    else logger.info(methodName, sset.getId(), tag, s);
+
+                    if ( s == null ) {
+                        String msg = tag + "closed, listener returns";
+                        if   ( test_mode ) System.out.println(msg);
+                        else logger.info(methodName, sset.getId(), msg);
+                        return;
+                    }
+
+				} catch (IOException e) {
+                    // if anything goes wrong this guy is toast.
+                    if   ( test_mode) e.printStackTrace();
+                    else logger.error(methodName, sset.getId(), e);
+                    return;
+				}
+            }
+
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        // arg0 = amqurl = put into -Dbroker.url
+        // arg1 = endpoint - pass to ServicePingMain
+        // call ServicePingMain --class org.apache.uima.ducc.sm.PingTester --endpoint FixedSleepAE_1
+        //    make sure test.jar is in the classpath
+        PingDriver csm = new PingDriver(args[0]);
+        csm.run();
+    }
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java?rev=1433548&r1=1433547&r2=1433548&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java Tue Jan 15 17:37:20 2013
@@ -132,6 +132,7 @@ public class ServiceHandler
             modifiedServices.clear();
         }
         handleModifiedServices(incoming);
+        handleImplicitServices();
 
         incoming = new HashMap<DuccId, IDuccWork>();
         synchronized(newServices) {
@@ -202,6 +203,7 @@ public class ServiceHandler
      */
     protected void resolveState(DuccId id, ServiceDependency dep)
     {        
+        String methodName = "resolveState";
         Map<String, ServiceSet> services = serviceStateHandler.getServicesForJob(id);
         if ( services == null ) {
             dep.setState(ServiceState.NotAvailable);       // says that nothing i need is available
@@ -218,6 +220,7 @@ public class ServiceHandler
         for ( ServiceSet sset : services.values() ) {
             if ( sset.getServiceState().ordinality() < state.ordinality() ) state = sset.getServiceState();
              dep.setIndividualState(sset.getKey(), sset.getServiceState());
+             logger.debug(methodName, id, "Set individual state", sset.getServiceState());
         }
         dep.setState(state);
     }
@@ -266,7 +269,7 @@ public class ServiceHandler
             // we need to bypass any attempt to cope with registered services or updating the sset.
             //
             if ( ! fatal ) {
-                if ( sset.isRegistered() && (sset.countImplementors() == 0) ) {
+                if ( sset.isRegistered() && (sset.countImplementors() == 0) && sset.isStartable() ) {
                     // Registered but not alive, well, we can fix that!
                     int ninstances = sset.getNInstances();
                     logger.debug(methodName, sset.getId(), "Reference-starting registered service, instances =", ninstances);
@@ -419,6 +422,7 @@ public class ServiceHandler
             if ( j.isFinished() ) {
                 stopDependentServices(id);
                 s.setState(ServiceState.NotAvailable);
+                s.clearMessages();
             } else  if ( j.isActive() ) {
                 resolveState(id, s);
             } 
@@ -572,6 +576,22 @@ public class ServiceHandler
                                                                       // from the published map
     }
     
+    /**
+     * The pinger may have died while we weren't looking.  Registered services take care
+     * of themselves from handleModifiedServices, but we know very little about implicit
+     * services so we walk them all and make their ServiceSet keep them clean.
+     */
+    protected void handleImplicitServices()
+    {
+        ArrayList<String> keys = serviceStateHandler.getServiceNames();
+        for ( String k : keys ) {
+            ServiceSet sset = serviceStateHandler.getServiceByName(k);
+            if ( sset.isImplicit() ) {
+                sset.establish();
+            }
+        }
+    }
+
     protected void handleModifiedServices(HashMap<DuccId, IDuccWork> work)
     {
         String methodName = "handleModifiedServices";        
@@ -767,7 +787,11 @@ public class ServiceHandler
         }
                           
         for ( int i = 0; i < wanted; i++ ) {
-            sset.start();
+            if ( sset.isStartable() ) {
+                sset.start();
+            } else {
+                sset.establish();  // this will just start the ping thread
+            }
         } 
 
 
@@ -918,7 +942,7 @@ public class ServiceHandler
 
         ServiceSet sset = serviceStateHandler.getServiceForApi(friendly, epname);
         
-        if ( instances >= 0 ) {
+        if ( instances > 0 ) {
             sset.setNInstances(instances);                // also persists instances
         }
 

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java?rev=1433548&r1=1433547&r2=1433548&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java Tue Jan 15 17:37:20 2013
@@ -31,13 +31,13 @@ import org.apache.uima.ducc.common.boot.
 import org.apache.uima.ducc.common.component.AbstractDuccComponent;
 import org.apache.uima.ducc.common.main.DuccService;
 import org.apache.uima.ducc.common.utils.DuccCollectionUtils;
+import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapDifference;
+import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapValueDifference;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.DuccProperties;
 import org.apache.uima.ducc.common.utils.MissingPropertyException;
 import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
 import org.apache.uima.ducc.common.utils.Version;
-import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapDifference;
-import org.apache.uima.ducc.common.utils.DuccCollectionUtils.DuccMapValueDifference;
 import org.apache.uima.ducc.common.utils.id.DuccId;
 import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
 import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
@@ -52,9 +52,10 @@ import org.apache.uima.ducc.transport.ev
 import org.apache.uima.ducc.transport.event.SmStateDuccEvent;
 import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
 import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
-import org.apache.uima.ducc.transport.event.common.IDuccWork;
 import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
 import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
+import org.apache.uima.ducc.transport.event.common.IDuccWork;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkService;
 import org.apache.uima.ducc.transport.event.sm.ServiceMap;
 
 
@@ -84,6 +85,7 @@ public class ServiceManagerComponent 
     static int meta_ping_rate = 60000;       // interval in ms to ping the service
     static int meta_ping_stability = 5;           // number of missed pings before we mark the service down
     static int meta_ping_timeout = 500;      // timeout on ping 
+    static String default_ping_class;
 
     private String state_dir = null;
     private String state_file = null;
@@ -195,6 +197,7 @@ public class ServiceManagerComponent 
         meta_ping_rate = SystemPropertyResolver.getIntProperty("ducc.sm.meta.ping.rate", meta_ping_rate);
         meta_ping_timeout = SystemPropertyResolver.getIntProperty("ducc.sm.meta.ping.timeout", meta_ping_timeout);
         meta_ping_stability = SystemPropertyResolver.getIntProperty("ducc.sm.meta.ping.stability", meta_ping_stability);
+        default_ping_class = SystemPropertyResolver.getStringProperty("ducc.sm.default.uima-as.ping.class", UimaAsPing.class.getName());
 
         logger.info(methodName, null, "------------------------------------------------------------------------------------");
         logger.info(methodName, null, "Service Manager starting:");
@@ -306,7 +309,20 @@ public class ServiceManagerComponent 
 
               case Service:
                   localMap.addDuccWork(w);
-                  newServices.put(w.getDuccId(), w);
+                  // An arbitrary process is **almost** the same as a service in terms of how most of DUCC
+                  // handles it.  In order to transparently reuse all that code it is classified as a
+                  // special type of service, "other", which the SM treats as a regular job.
+                  switch ( ((IDuccWorkService)w).getServiceDeploymentType() ) 
+                  {
+                      case uima:
+                      case custom:
+                          newServices.put(w.getDuccId(), w);
+                          break;
+                      case other:
+                          newJobs.put(w.getDuccId(), w);
+                          break;
+                  }
+
                   break;
 
               default:
@@ -328,7 +344,16 @@ public class ServiceManagerComponent 
 
               case Service:
                   localMap.removeDuccWork(w.getDuccId());
-                  deletedServices.put(w.getDuccId(), w);
+                  switch ( ((IDuccWorkService)w).getServiceDeploymentType() ) 
+                  {
+                      case uima:
+                      case custom:
+                          deletedServices.put(w.getDuccId(), w);
+                          break;
+                      case other:
+                          deletedJobs.put(w.getDuccId(), w);
+                          break;
+                  }
                   break;
 
               default:
@@ -354,8 +379,17 @@ public class ServiceManagerComponent 
                   break;
 
               case Service:
-                  modifiedServices.put(l.getDuccId(), l);
                   localMap.addDuccWork(l);
+                  switch ( ((IDuccWorkService)l).getServiceDeploymentType() ) 
+                  {
+                      case uima:
+                      case custom:
+                          modifiedServices.put(l.getDuccId(), l);
+                          break;
+                      case other:
+                          modifiedJobs.put(l.getDuccId(), l);
+                          break;
+                  }
                   break;
 
               default:

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServicePingMain.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServicePingMain.java?rev=1433548&r1=1433547&r2=1433548&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServicePingMain.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServicePingMain.java Tue Jan 15 17:37:20 2013
@@ -31,7 +31,7 @@ import org.apache.commons.cli.OptionBuil
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
-import org.apache.uima.ducc.cli.AServicePing;
+import org.apache.uima.ducc.common.AServicePing;
 import org.apache.uima.ducc.common.ServiceStatistics;
 
 
@@ -46,6 +46,11 @@ import org.apache.uima.ducc.common.Servi
 public class ServicePingMain
     implements SmConstants
 {
+
+    boolean debug = false;
+    int error_max = 10;
+    int error_count = 0;
+
     public ServicePingMain()
     {
     	
@@ -126,16 +131,22 @@ public class ServicePingMain
 			Class cls = Class.forName(cl);
 			pinger = (AServicePing) cls.newInstance();
 			pinger.init(ep);
-		} catch (ClassNotFoundException e) {
-            print(e);
-		} catch (InstantiationException e) {
-            print(e);
-		} catch (IllegalAccessException e) {
-            print(e);
-		}
+		} catch (Exception e) {
+            //print(e);         // To the logs
+            e.printStackTrace();
+		} 
         return pinger;
     }
 
+    void handleError(Throwable t)
+    {
+        t.printStackTrace();
+        if ( ++error_count >= error_max ) {
+            System.out.println("Exceeded error count. Exiting.");
+            System.exit(1);
+        }
+    }
+
     //
     // 1. Instantiate the pinger if possible.
     // 2. Read ducc.proeprties to find the ping interval
@@ -153,7 +164,7 @@ public class ServicePingMain
 
         CommandLineParser parser = new PosixParser();
         CommandLine commandLine = null;
-
+        ServiceStatistics default_statistics = new ServiceStatistics(false, false, "<N/A>");
 
 		try {
 			commandLine = parser.parse(options, args);
@@ -190,7 +201,6 @@ public class ServicePingMain
 			return;
 		}
 
-        ServiceStatistics defaultStatistics = new ServiceStatistics();
         ObjectOutputStream oos;
 		try {
 			oos = new ObjectOutputStream(sock_out);
@@ -207,7 +217,7 @@ public class ServicePingMain
         }
 
         while ( true ) {  
-        	print("ServicePingMeta starts ping.");
+        	if ( debug ) print("ServicePingMeta starts ping.");
         	
             byte[] cmd = new byte[1];
             cmd[0] = 0;
@@ -215,10 +225,9 @@ public class ServicePingMain
 			try {
 				eof = sock_in.read(cmd);
 			} catch (IOException e) {
-				// TODO Auto-generated catch block
-				e.printStackTrace();
+                handleError(e);
 			}
-            print("Read cmd", new String(cmd), "eof", eof);
+            if ( debug ) print("Read cmd", new String(cmd), "eof", eof);
 
             if ( eof == -1 ) {
                 print("EOF on input pipe.  Exiting");
@@ -228,13 +237,11 @@ public class ServicePingMain
 
             try {
 				if ( cmd[0] == 'P' ) {
-                    boolean p = custom.ping();
                     ServiceStatistics ss = custom.getStatistics();
                     if ( ss == null ) {
-                        ss = defaultStatistics;
+                        ss = default_statistics;
                     }
-                    ss.setPing(p);
-                    print("Ping is set to " + ss.getPing());
+                    // print("Is alive: " + ss.isAlive());
                     oos.writeObject(ss);
                     oos.flush();
 
@@ -251,8 +258,7 @@ public class ServicePingMain
 				    System.err.println("Invalid command recieved: " +  Byte.toString(cmd[0]));
 				}
 			} catch (Throwable e) {
-				// TODO Auto-generated catch block                
-				e.printStackTrace();   // this is defined to go to stderr
+                handleError(e);
 			}            
         }
     }
@@ -262,5 +268,5 @@ public class ServicePingMain
         ServicePingMain wrapper = new ServicePingMain();
         wrapper.start(args);
     }
-
+    
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java?rev=1433548&r1=1433547&r2=1433548&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Tue Jan 15 17:37:20 2013
@@ -26,8 +26,6 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.net.URLConnection;
-import java.net.URLStreamHandler;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -110,12 +108,6 @@ public class ServiceSet
     ServiceClass service_class = ServiceClass.Undefined;
     ServiceState service_state = ServiceState.Undefined;;
 
-    String custom_classpath;
-    String custom_meta_class;
-    String[] custom_meta_parms;
-
-    long meta_ping_timeout = 500;
-
     // structures to manage service linger after it exits
     Timer timer = null;
     LingerTask linger = null;
@@ -132,6 +124,22 @@ public class ServiceSet
 
         this.service_type = ServiceType.UimaAs;
         this.service_class = ServiceClass.Implicit;
+
+        // need job props and meta props so pinger works
+        // job props: , service_ping_class, service_ping_classpath, working_directory, log_directory
+        // meta props: endpoint, user
+        job_props = new DuccProperties();
+        job_props.put("service_ping_class", ServiceManagerComponent.default_ping_class);
+        job_props.put("service_ping_classpath", System.getProperty("java.class.path"));
+        job_props.put("service_ping_dolog", "false");
+        job_props.put("service_ping_timeout", ""+ServiceManagerComponent.meta_ping_timeout);
+        job_props.put("working_directory", System.getProperty("user.dir")); // whatever my current dir is
+        job_props.put("log_directory", System.getProperty("user.dir") + "/../logs");
+        job_props.put("service_ping_jvm_args", "-Xmx50M");
+
+        meta_props = new DuccProperties();
+        meta_props.put("user", System.getProperty("user.name"));
+        meta_props.put("endpoint", key);
     }
 
     //
@@ -139,12 +147,16 @@ public class ServiceSet
     //
     public ServiceSet(DuccId id, String key, String[] independentServices)
     {
+        // deprecating for now
+        throw new IllegalStateException("Submitted services not supported");
+        /**
         this.key = key;
         this.implementors.put(id, JobState.Undefined);
         this.independentServices = independentServices;
         this.service_class = ServiceClass.Submitted;        
 
         parseEndpoint(key);
+        */
     }
 
     //
@@ -179,6 +191,25 @@ public class ServiceSet
 
         parseIndependentServices();
 
+        if ( service_type == ServiceType.UimaAs ) {            
+            if ( ! job_props.containsKey("service_ping_class" ) ) {
+                job_props.put("service_ping_class", ServiceManagerComponent.default_ping_class);
+                job_props.put("service_ping_classpath", System.getProperty("java.class.path"));
+                // this lets us turn on debug logging for the default pinger
+                if ( ! job_props.containsKey("service_ping_dolog")) {
+                    job_props.put("service_ping_dolog", "false");
+                }        
+                job_props.put("service_ping_jvm_args", "-Xmx50M");
+            }
+        }
+
+        if ( !job_props.containsKey("service_ping_timeout") ) {
+            job_props.put("service_ping_timeout", ""+ServiceManagerComponent.meta_ping_timeout);
+        }
+        if ( ! job_props.containsKey("service_ping_dolog") ) {
+            job_props.put("service_ping_dolog", "true");       // unless we fill in the default pinger        
+        }
+
         //UIMAFramework.getLogger(BaseUIMAAsynchronousEngineCommon_impl.class).setLevel(Level.OFF);
         //UIMAFramework.getLogger(BaseUIMAAsynchronousEngine_impl.class).setLevel(Level.OFF);
         // there are a couple junky messages that slip by the above configurations.  turn the whole danged thing off.
@@ -293,6 +324,21 @@ public class ServiceSet
     }
 
     /**
+     * A service is startable if it's UIMA-AS, or if it's CUSTOM and has a process_executable
+     * associated with it.  A non-startable CUSTOM service may have only a pinger.
+     */
+    boolean isStartable()
+    {
+        switch ( service_type ) {
+            case UimaAs:
+                return true;
+            case Custom: 
+                return job_props.containsKey("process_executable");
+        }
+        return false;  // redundant but needed for compilation
+    }
+
+    /**
      * At boot only ... synchronize my state with published OR state.
      */
     void synchronizeImplementors(Map<DuccId, JobState> work)
@@ -529,10 +575,13 @@ public class ServiceSet
         }
 
         // stop the pinger if no longer needed
-        if ( (references.size() == 0) && (isImplicit() || isCustom()) ) {    // non-implicit are stopped when the implementors go away
-            stopPingThread();
+        if ( (references.size() == 0) &&                          // nothing left
+             ( isImplicit() || ( isCustom() && !isStartable()) )   // implicit UIMA-AS || implicit CUSTOM
+             ) 
+        {
+            stopPingThread();                                     // must stop pinger because there's no state machine
+                                                                  // to do it for us in this situation
         }
-        // If it's registered, and nrefs == 0, and implicit_start is true, we need to stop the service
 
         return references.size();
     }
@@ -611,7 +660,6 @@ public class ServiceSet
                     }
                 } else {
                     if ( service_type == ServiceType.Custom ) {
-                        // ugly, until we're actually starting the custom process as well
                         startPingThread();
                     } else {
                         int needed = Math.max(0, instances - friendly_ids.size());
@@ -784,18 +832,10 @@ public class ServiceSet
         if ( serviceMeta != null ) return;         // don't start multiple times.
 
         try {
-            switch ( service_type ) {
-                case UimaAs:
-                    logger.info(methodName, id, "Starting UIMA-AS monitor.");
-                    serviceMeta = new UimaServiceMeta(this, endpoint, broker_host, broker_jmx_port);
-                    break;               
-                case Custom:
-                    logger.info(methodName, id, "Starting CUSTOM monitor.");
-                    serviceMeta = new CustomServiceMeta(this);
-                    break;       
-            }
+            logger.info(methodName, id, "Starting ping/monitor.");
+            serviceMeta = new PingDriver(this);
         } catch ( Throwable t ) {
-            logger.error(methodName, id, "Cannot start CUSTOM monitor.", t);
+            logger.error(methodName, id, "Cannot instantiate ping/monitor.", t);
             return;
         }
 
@@ -804,6 +844,18 @@ public class ServiceSet
         t.start();
     }
 
+    synchronized void pingExited()
+    {
+        String methodName = "pingExited";
+        if ( serviceMeta != null ) {
+            logger.warn(methodName, id, "Pinger exited voluntarily, setting state to Undefined. Endpoint", endpoint);
+            service_state = ServiceState.Undefined;     // not really sure what state is. it will be
+                                                        // checked and updated next run through the
+                                                        // main state machine, and maybe ping restarted.
+            serviceMeta = null;
+        }
+
+    }
 
     public synchronized void stopPingThread()
     {
@@ -877,10 +929,10 @@ public class ServiceSet
     {
     	String methodName = "start";
 
-        if ( service_type == ServiceType.Custom ) {
-            establish();
-            return;
-        }
+//         if ( service_type == ServiceType.Custom ) { 
+//             establish();
+//             return;
+//         }
 
         this.stopped = false;          // for registered
 
@@ -894,7 +946,7 @@ public class ServiceSet
             System.getProperty("ducc.jvm"),
             "-cp",
             System.getProperty("java.class.path"),
-            "org.apache.uima.ducc.sm.cli.DuccServiceSubmit",
+            "org.apache.uima.ducc.cli.DuccServiceSubmit",
             "--specification",
             props_filename
         };
@@ -989,7 +1041,7 @@ public class ServiceSet
             System.getProperty("ducc.jvm"),
             "-cp",
             System.getProperty("java.class.path"),
-            "org.apache.uima.ducc.sm.cli.DuccServiceCancel",
+            "org.apache.uima.ducc.cli.DuccServiceCancel",
             "--id",
             id.toString()
         };
@@ -1151,17 +1203,6 @@ public class ServiceSet
         return sd;
     }
 
-    class TcpStreamHandler
-        extends URLStreamHandler
-    {
-        TcpStreamHandler() {}
-
-        public URLConnection openConnection(URL u)
-        {
-            //throw new Exception("This protocol handler isn't expected to actually work.");
-        	return null;
-        }
-    }
 
     /**
      * For debugging, so it's easier to identify this guy in the eclipse debugger.

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/TcpStreamHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/TcpStreamHandler.java?rev=1433548&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/TcpStreamHandler.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/TcpStreamHandler.java Tue Jan 15 17:37:20 2013
@@ -0,0 +1,18 @@
+package org.apache.uima.ducc.sm;
+
+import java.net.URL;
+import java.net.URLConnection;
+import java.net.URLStreamHandler;
+
+
+class TcpStreamHandler
+    extends URLStreamHandler
+{
+    TcpStreamHandler() {}
+    
+    public URLConnection openConnection(URL u)
+    {
+        //throw new Exception("This protocol handler isn't expected to actually work.");
+        return null;
+    }
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/TcpStreamHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java?rev=1433548&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java Tue Jan 15 17:37:20 2013
@@ -0,0 +1,124 @@
+package org.apache.uima.ducc.sm;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
+import org.apache.uima.ducc.common.AServicePing;
+import org.apache.uima.ducc.common.ServiceStatistics;
+import org.apache.uima.ducc.common.UimaAsServiceMonitor;
+import org.apache.uima.ducc.common.utils.SystemPropertyResolver;
+import org.apache.uima.resource.ResourceInitializationException;
+import org.apache.uima.util.Level;
+
+public class UimaAsPing
+    extends AServicePing
+{
+
+    String ep;
+
+    String endpoint;
+    String broker;
+    int    meta_timeout;
+
+    String broker_host;
+    int    broker_jmx_port;
+    boolean connected;
+    UimaAsServiceMonitor monitor;
+
+    public void init(String ep)
+        throws Exception
+    {
+        this.ep = ep;
+
+        // Ep is of the form UIMA-AS:queuename:broker
+        int ndx = ep.indexOf(":");
+        ep = ep.substring(ndx+1);
+        ndx = ep.indexOf(":");
+            
+        this.endpoint = ep.substring(0, ndx).trim();
+        this.broker = ep.substring(ndx+1).trim();
+
+        // broker is a URL that we need to parse in order to get the actual host and port
+        // for jmx
+        URL url = null;
+        try {                
+            url = new URL(null, broker, new TcpStreamHandler());
+        } catch (MalformedURLException e) {
+            throw new IllegalArgumentException("Invalid broker URL: " + broker);
+        }
+        broker_host = url.getHost();
+        // not needed here fyi broker_port = url.getPort();
+
+        String to = System.getProperty("ducc.sm.meta.ping.timeout");
+        meta_timeout = Integer.parseInt(to);
+
+        
+        broker_jmx_port = SystemPropertyResolver.getIntProperty("ducc.sm.meta.jmx.port", 1099);
+        this.monitor = new UimaAsServiceMonitor(endpoint, broker_host, broker_jmx_port);
+        init_monitor();
+
+        //UIMAFramework.getLogger(BaseUIMAAsynchronousEngineCommon_impl.class).setLevel(Level.OFF);
+        //UIMAFramework.getLogger(BaseUIMAAsynchronousEngine_impl.class).setLevel(Level.OFF);
+        // there are a couple junky messages that slip by the above configurations.  turn the whole danged thing off.
+        UIMAFramework.getLogger().setLevel(Level.OFF);
+
+    }
+
+    public void stop()
+    {
+    }
+
+    private synchronized void init_monitor()
+    {
+        if ( ! connected ) {
+            try {
+                System.out.println("Initializing monitor");
+                monitor.init(ep);
+                connected = true;
+                System.out.println("Monitor initialized");
+            } catch (Throwable t ) {
+                connected = false;
+                t.printStackTrace();
+            }
+        }
+    }
+
+    public ServiceStatistics getStatistics()
+    {
+        ServiceStatistics statistics = new ServiceStatistics(false, false, "<NA>");
+
+        // Instantiate Uima AS Client
+        BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+        Map<String, Object> appCtx = new HashMap<String, Object>();
+        appCtx.put(UimaAsynchronousEngine.ServerUri, broker);
+        appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
+        appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, meta_timeout);
+
+        try {
+            //	this sends GetMeta request and blocks waiting for a reply
+            init_monitor();
+            if ( connected ) {
+                statistics = monitor.getStatistics();
+            }
+
+            uimaAsEngine.initialize(appCtx);
+            statistics.setAlive(true);
+            statistics.setHealthy(true);
+            // System.out.println("getMeta ok: " + ep);
+
+        } catch( ResourceInitializationException e) {
+            System.out.println("Cannot issue getMeta: " + e.getMessage());
+            e.printStackTrace();
+        } finally {
+            uimaAsEngine.stop();
+        }
+
+        return statistics;
+    }
+
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java
------------------------------------------------------------------------------
    svn:eol-style = native