You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2013/04/30 22:07:39 UTC

svn commit: r1477805 - in /uima/sandbox/uima-ducc/trunk: uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/ uima-ducc-common/src/main/java/org/apache/uima/ducc/common/ uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/

Author: challngr
Date: Tue Apr 30 20:07:39 2013
New Revision: 1477805

URL: http://svn.apache.org/r1477805
Log:
UIMA-2856
SM Must clean up AMQ queues when services exit.

Removed:
    uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/AServicePing.java
Modified:
    uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/AServicePing.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/UimaAsServiceMonitor.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
    uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/AServicePing.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/AServicePing.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/AServicePing.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/AServicePing.java Tue Apr 30 20:07:39 2013
@@ -37,5 +37,14 @@ public abstract class AServicePing
      * Returns the object with application-derived health and statistics.
      */
     public abstract ServiceStatistics getStatistics();
-
+    
+    /**
+     * Clears lingering service state.  In the case of UIMA-AS services, this will delete the
+     * queue, if nobody is using it any more.  Other types of services do service-specific things.y
+     */
+    public void clearQueues()
+    	throws Throwable
+    {
+        // default, do nothing
+    }
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/UimaAsServiceMonitor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/UimaAsServiceMonitor.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/UimaAsServiceMonitor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/UimaAsServiceMonitor.java Tue Apr 30 20:07:39 2013
@@ -30,14 +30,18 @@ import javax.management.remote.JMXServic
 
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.uima.ducc.common.utils.DuccLogger;
 
 public class UimaAsServiceMonitor
     extends AServicePing
 {
+	private DuccLogger logger = DuccLogger.getLogger(this.getClass().getName(), "SM");	
+
     private String qname;
     private String broker_url;
 
     private JMXConnector jmxc;
+    BrokerViewMBean brokerMBean;
     private QueueViewMBean monitoredQueue;
     private ServiceStatistics qstats;
 
@@ -89,7 +93,9 @@ public class UimaAsServiceMonitor
     public void init(String parm /* parm not used in this impl */)
         throws Exception
     {
-        
+        String methodName = "init";
+        logger.info(methodName, null, "INIT");
+
         JMXServiceURL url = new JMXServiceURL(broker_url);
         jmxc = JMXConnectorFactory.connect(url);
         MBeanServerConnection conn = jmxc.getMBeanServerConnection();        
@@ -112,8 +118,8 @@ public class UimaAsServiceMonitor
 
         //ObjectName activeMQ = new ObjectName("org.apache.activemq:BrokerName=" + broker_name +",Type=Broker");
 
-        BrokerViewMBean mbean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, brokerObjectName ,BrokerViewMBean.class, true);
-        for (ObjectName name : mbean.getQueues()) {
+        brokerMBean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, brokerObjectName ,BrokerViewMBean.class, true);
+        for (ObjectName name : brokerMBean.getQueues()) {
             QueueViewMBean qView = (QueueViewMBean)
                 MBeanServerInvocationHandler.newProxyInstance(conn, name, QueueViewMBean.class, true);
             
@@ -129,8 +135,31 @@ public class UimaAsServiceMonitor
         }        
     }
 
+    public void clearQueues()
+        throws Throwable
+    {
+        String methodName = "clearQueues";
+        init(null);
+
+        logger.info(methodName, null, "Clear queues starts.");
+        if ( ( qname != null ) && ( brokerMBean != null ) ) {
+            consumerCount  = monitoredQueue.getConsumerCount();
+            producerCount  = monitoredQueue.getProducerCount();
+            logger.info(methodName, null, "Trying to clear: cousumerCount[", consumerCount, "] producerCount[", producerCount, "]");
+            if ( (consumerCount == 0) && (producerCount == 0) ) {
+                brokerMBean.removeQueue(qname);
+            }
+        }
+        stop();
+        logger.info(methodName, null, "Clear queues returns.");
+
+    }
+
     public void stop()
     {
+        String methodName = "stop";
+        logger.info(methodName, null, "STOP");
+
         try {
 			if ( jmxc != null ) {
 			    jmxc.close();
@@ -162,6 +191,10 @@ public class UimaAsServiceMonitor
     private void collect()
         throws Throwable
     {
+    	String methodName = "collect";
+        init(null);
+        logger.info(methodName, null, "Collect stats", monitoredQueue);
+        if ( monitoredQueue != null ) {
             enqueueTime    = monitoredQueue.getAverageEnqueueTime();
             consumerCount  = monitoredQueue.getConsumerCount();
             producerCount  = monitoredQueue.getProducerCount();
@@ -173,7 +206,21 @@ public class UimaAsServiceMonitor
             enqueueCount   = monitoredQueue.getEnqueueCount();
             dispatchCount  = monitoredQueue.getDispatchCount();
             expiredCount   = monitoredQueue.getExpiredCount();
+        } else {
+            enqueueTime    = 0;
+            consumerCount  = 0;
+            producerCount  = 0;
+            queueSize      = 0;
+            minEnqueueTime = 0;
+            maxEnqueueTime = 0;
+            inFlightCount  = 0;
+            dequeueCount   = 0;
+            enqueueCount   = 0;
+            dispatchCount  = 0;
+            expiredCount   = 0;
+        }
 
+        stop();
     }
 
     public static void main(String[] args)

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java Tue Apr 30 20:07:39 2013
@@ -28,4 +28,5 @@ interface IServiceMeta
     public void run();
     public void stop();
     public void reference();
+    public void clearQueues();
 }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java Tue Apr 30 20:07:39 2013
@@ -28,6 +28,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.ArrayList;
 
+import org.apache.uima.ducc.common.AServicePing;
 import org.apache.uima.ducc.common.ServiceStatistics;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.DuccProperties;
@@ -93,6 +94,8 @@ class PingDriver
 
     boolean shutdown = false;
     
+    AServicePing internal_pinger = null;
+    
     PingDriver(ServiceSet sset)
     {        
         this.sset = sset;
@@ -156,7 +159,21 @@ class PingDriver
             ping_thread.interrupt();
         }
     }
-
+    
+    public void clearQueues()
+    {
+    	String methodName = "clearQueues";
+        if ( internal_pinger != null ) {
+            try {
+				internal_pinger.clearQueues();
+			} catch (Throwable e) {
+                logger.warn(methodName, sset.getId(), "Error clearing queues: ", e.toString());
+			}
+        } else {
+            // external pinger
+        }
+    }
+    
     synchronized int getMetaPingRate()
     {
         return meta_ping_rate;
@@ -212,18 +229,18 @@ class PingDriver
     public void runAsThread()
     {
     	String methodName = "runAsThread";
-        UimaAsPing uap = new UimaAsPing(logger);
+        internal_pinger = new UimaAsPing(logger);
         try {
-            uap.init(endpoint);
+            internal_pinger.init(endpoint);
         } catch ( Throwable t ) {
             logger.warn(methodName, sset.getId(), t);
             sset.pingExited();
         }
         while ( ! shutdown ) {
             
-            handleStatistics(uap.getStatistics());
+            handleStatistics(internal_pinger.getStatistics());
             if ( errors > error_threshold ) {
-                uap.stop();
+                internal_pinger.stop();
                 logger.warn(methodName, sset.getId(), "Ping exited because of excess errors: ", errors);
                 sset.pingExited();
             }

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java Tue Apr 30 20:07:39 2013
@@ -660,31 +660,33 @@ public class ServiceHandler
                 // if it hadn't been before.
                 //  sset.resetRunFailures();
             } else {
-                sset.removeImplementor(id);
-
-                JobCompletionType jct = w.getCompletionType();
                 JobState          state = w.getJobState();
-                logger.info(methodName, id, "Removing stopped instance from maps: state[", state, "] completion[", jct, "]");
-                switch ( jct ) {
-                    case EndOfJob:
-                    case CanceledByUser:
-                    case CanceledByAdministrator:
-                    case Undefined:
-                        break;
-                    default:
-                        logger.debug(methodName, id, "RECORDING FAILURE");
-                        // all other cases are errors that contribute to the error count
-                        if ( sset.excessiveRunFailures() ) {    // if true, the count is exceeeded, but reset
-                            logger.warn(methodName, null, "Process Failure: " + jct + " Maximum consecutive failures[" + sset.failure_run + "] max [" + sset.failure_max + "]");
-                        } else {
-                            sset.start();
-                        }
-                        break;
-                }
                 
+                if ( state == JobState.Completed ) {
+                    sset.removeImplementor(id);
+                    JobCompletionType jct = w.getCompletionType();
+                        
+                    logger.info(methodName, id, "Removing stopped instance from maps: state[", state, "] completion[", jct, "]");
+                    switch ( jct ) {
+                        case EndOfJob:
+                        case CanceledByUser:
+                        case CanceledByAdministrator:
+                        case Undefined:
+                            break;
+                        default:
+                            logger.debug(methodName, id, "RECORDING FAILURE");
+                            // all other cases are errors that contribute to the error count
+                            if ( sset.excessiveRunFailures() ) {    // if true, the count is exceeeded, but reset
+                                logger.warn(methodName, null, "Process Failure: " + jct + " Maximum consecutive failures[" + sset.failure_run + "] max [" + sset.failure_max + "]");
+                            } else {
+                                sset.start();
+                            }
+                            break;
+                    }
+                }
             }
 
-            if ( (sset.getServiceState() == ServiceState.NotAvailable) && (sset.countReferences() == 0) ) {
+            if ( (sset.getServiceState() == ServiceState.NotAvailable) && (sset.countReferences() == 0) && (sset.countImplementors() == 0) ) {
                 // this service is now toast.  remove from our maps asap to avoid clashes if it gets
                 // resubmitted before the OR can purge it.
                 if ( ! sset.isRegistered() ) {

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Tue Apr 30 20:07:39 2013
@@ -104,9 +104,14 @@ public class ServiceSet
     // Registered services, the number of instances to maintain
     int instances = 1;
 
-    // UIMA-AS pinger
+    // Service pinger
     IServiceMeta serviceMeta = null;
 
+    // After stopping a pinger we need to discard it,and we usually need to stop it well before
+    // it is ok to delete residual state such as UIMA-AS queues.  Instead of discarding it, we
+    // stash it here to use once the last implementor seems to be dead.
+    IServiceMeta residualMeta = null;
+
     // registered services state files
     DuccProperties job_props  = null;
     DuccProperties meta_props = null;
@@ -729,12 +734,19 @@ public class ServiceSet
         logger.debug(methodName, this.id, "Removing implementor", id);
         implementors.remove(id);
         friendly_ids.remove(id.getFriendly());
+        if ( implementors.size() == 0 ) {
+            stopPingThread();
+        }
         String history = meta_props.getStringProperty(history_key, "");
         history = history + " " + id.toString();
         meta_props.put(history_key, history);
         persistImplementors();
-        if ( implementors.size() == 0 ) {
-            stopPingThread();
+
+        if ( (implementors.size() == 0) && (residualMeta != null) ) {  // Went to 0 and there was a pinger?
+            if ( isRegistered() || isSubmitted() ) {                   // Is one of our happy cases?
+                residualMeta.clearQueues();                            // Try to clear residal state.
+                residualMeta = null;                                   // All done now.
+            }
         }
     }
 
@@ -1074,8 +1086,10 @@ public class ServiceSet
         if ( serviceMeta != null ) {
             logger.warn(methodName, id, "Pinger exited voluntarily, setting state to Undefined. Endpoint", endpoint);
             setServiceState(ServiceState.Undefined);    // not really sure what state is. it will be
+
             // checked and updated next run through the
             // main state machine, and maybe ping restarted.
+            residualMeta = serviceMeta;
             serviceMeta = null;
         } else {
             setServiceState(ServiceState.NotAvailable);
@@ -1085,15 +1099,17 @@ public class ServiceSet
             deleteProperties();
         } else {
             saveMetaProperties();
-        }
+        }        
     }
 
     public synchronized void stopPingThread()
     {
         String methodName = "stopPingThread";
+
         if ( serviceMeta != null ) {
             logger.debug(methodName, id, "Stopping ping thread, endpoint", endpoint);
             serviceMeta.stop();
+            residualMeta = serviceMeta;
             serviceMeta = null;
         }
 

Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java Tue Apr 30 20:07:39 2013
@@ -74,7 +74,6 @@ public class UimaAsPing
         
         broker_jmx_port = SystemPropertyResolver.getIntProperty("ducc.sm.meta.jmx.port", 1099);
         this.monitor = new UimaAsServiceMonitor(endpoint, broker_host, broker_jmx_port);
-        init_monitor();
 
         //UIMAFramework.getLogger(BaseUIMAAsynchronousEngineCommon_impl.class).setLevel(Level.OFF);
         //UIMAFramework.getLogger(BaseUIMAAsynchronousEngine_impl.class).setLevel(Level.OFF);
@@ -88,6 +87,12 @@ public class UimaAsPing
         if ( monitor != null ) monitor.stop();
     }
 
+    public void clearQueues()
+    	throws Throwable
+    {
+        monitor.clearQueues();
+    } 
+
     private void doLog(String methodName, String msg)
     {
         if ( logger == null ) {
@@ -97,23 +102,6 @@ public class UimaAsPing
         }
     }
 
-    private synchronized void init_monitor()
-    {
-        String methodName = "init_monitor";
-        if ( ! connected ) {
-            try {
-                doLog(methodName, "Initializing monitor");
-                monitor.init(ep);
-                connected = true;
-                doLog(methodName, "Monitor initialized");
-            } catch (Throwable t ) {
-                connected = false;
-                // t.printStackTrace();
-                doLog(methodName, "Cannot initialize monitor: " + t.toString());
-            }
-        }
-    }
-
     public ServiceStatistics getStatistics()
     {
         String methodName = "getStatistics";
@@ -128,12 +116,7 @@ public class UimaAsPing
 
         try {
             //	this sends GetMeta request and blocks waiting for a reply
-            init_monitor();
-            if ( connected ) {
-                statistics = monitor.getStatistics();
-            } else {
-                return statistics;
-            }
+            statistics = monitor.getStatistics();
 
             uimaAsEngine.initialize(appCtx);
             statistics.setAlive(true);
@@ -142,8 +125,14 @@ public class UimaAsPing
 
         } catch( ResourceInitializationException e) {
             doLog(methodName, "Cannot issue getMeta to: " + endpoint + ":" + broker);
+            statistics.setHealthy(false);
+            statistics.setAlive(false);
         } finally {
-            uimaAsEngine.stop();
+            try {
+				uimaAsEngine.stop();
+			} catch (Throwable e) {
+				doLog(methodName, "Exception on UIMA-AS connection stop:" + e.toString());
+			}
         }
 
         return statistics;