You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ch...@apache.org on 2013/04/30 22:07:39 UTC
svn commit: r1477805 - in /uima/sandbox/uima-ducc/trunk:
uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/
uima-ducc-common/src/main/java/org/apache/uima/ducc/common/
uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/
Author: challngr
Date: Tue Apr 30 20:07:39 2013
New Revision: 1477805
URL: http://svn.apache.org/r1477805
Log:
UIMA-2856
SM Must clean up AMQ queues when services exit.
Removed:
uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/AServicePing.java
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/AServicePing.java
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/UimaAsServiceMonitor.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/AServicePing.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/AServicePing.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/AServicePing.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/AServicePing.java Tue Apr 30 20:07:39 2013
@@ -37,5 +37,14 @@ public abstract class AServicePing
* Returns the object with application-derived health and statistics.
*/
public abstract ServiceStatistics getStatistics();
-
+
+ /**
+ * Clears lingering service state. In the case of UIMA-AS services, this will delete the
+ * queue, if nobody is using it any more. Other types of services do service-specific things.y
+ */
+ public void clearQueues()
+ throws Throwable
+ {
+ // default, do nothing
+ }
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/UimaAsServiceMonitor.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/UimaAsServiceMonitor.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/UimaAsServiceMonitor.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/UimaAsServiceMonitor.java Tue Apr 30 20:07:39 2013
@@ -30,14 +30,18 @@ import javax.management.remote.JMXServic
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.uima.ducc.common.utils.DuccLogger;
public class UimaAsServiceMonitor
extends AServicePing
{
+ private DuccLogger logger = DuccLogger.getLogger(this.getClass().getName(), "SM");
+
private String qname;
private String broker_url;
private JMXConnector jmxc;
+ BrokerViewMBean brokerMBean;
private QueueViewMBean monitoredQueue;
private ServiceStatistics qstats;
@@ -89,7 +93,9 @@ public class UimaAsServiceMonitor
public void init(String parm /* parm not used in this impl */)
throws Exception
{
-
+ String methodName = "init";
+ logger.info(methodName, null, "INIT");
+
JMXServiceURL url = new JMXServiceURL(broker_url);
jmxc = JMXConnectorFactory.connect(url);
MBeanServerConnection conn = jmxc.getMBeanServerConnection();
@@ -112,8 +118,8 @@ public class UimaAsServiceMonitor
//ObjectName activeMQ = new ObjectName("org.apache.activemq:BrokerName=" + broker_name +",Type=Broker");
- BrokerViewMBean mbean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, brokerObjectName ,BrokerViewMBean.class, true);
- for (ObjectName name : mbean.getQueues()) {
+ brokerMBean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, brokerObjectName ,BrokerViewMBean.class, true);
+ for (ObjectName name : brokerMBean.getQueues()) {
QueueViewMBean qView = (QueueViewMBean)
MBeanServerInvocationHandler.newProxyInstance(conn, name, QueueViewMBean.class, true);
@@ -129,8 +135,31 @@ public class UimaAsServiceMonitor
}
}
+ public void clearQueues()
+ throws Throwable
+ {
+ String methodName = "clearQueues";
+ init(null);
+
+ logger.info(methodName, null, "Clear queues starts.");
+ if ( ( qname != null ) && ( brokerMBean != null ) ) {
+ consumerCount = monitoredQueue.getConsumerCount();
+ producerCount = monitoredQueue.getProducerCount();
+ logger.info(methodName, null, "Trying to clear: cousumerCount[", consumerCount, "] producerCount[", producerCount, "]");
+ if ( (consumerCount == 0) && (producerCount == 0) ) {
+ brokerMBean.removeQueue(qname);
+ }
+ }
+ stop();
+ logger.info(methodName, null, "Clear queues returns.");
+
+ }
+
public void stop()
{
+ String methodName = "stop";
+ logger.info(methodName, null, "STOP");
+
try {
if ( jmxc != null ) {
jmxc.close();
@@ -162,6 +191,10 @@ public class UimaAsServiceMonitor
private void collect()
throws Throwable
{
+ String methodName = "collect";
+ init(null);
+ logger.info(methodName, null, "Collect stats", monitoredQueue);
+ if ( monitoredQueue != null ) {
enqueueTime = monitoredQueue.getAverageEnqueueTime();
consumerCount = monitoredQueue.getConsumerCount();
producerCount = monitoredQueue.getProducerCount();
@@ -173,7 +206,21 @@ public class UimaAsServiceMonitor
enqueueCount = monitoredQueue.getEnqueueCount();
dispatchCount = monitoredQueue.getDispatchCount();
expiredCount = monitoredQueue.getExpiredCount();
+ } else {
+ enqueueTime = 0;
+ consumerCount = 0;
+ producerCount = 0;
+ queueSize = 0;
+ minEnqueueTime = 0;
+ maxEnqueueTime = 0;
+ inFlightCount = 0;
+ dequeueCount = 0;
+ enqueueCount = 0;
+ dispatchCount = 0;
+ expiredCount = 0;
+ }
+ stop();
}
public static void main(String[] args)
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/IServiceMeta.java Tue Apr 30 20:07:39 2013
@@ -28,4 +28,5 @@ interface IServiceMeta
public void run();
public void stop();
public void reference();
+ public void clearQueues();
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/PingDriver.java Tue Apr 30 20:07:39 2013
@@ -28,6 +28,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
+import org.apache.uima.ducc.common.AServicePing;
import org.apache.uima.ducc.common.ServiceStatistics;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.DuccProperties;
@@ -93,6 +94,8 @@ class PingDriver
boolean shutdown = false;
+ AServicePing internal_pinger = null;
+
PingDriver(ServiceSet sset)
{
this.sset = sset;
@@ -156,7 +159,21 @@ class PingDriver
ping_thread.interrupt();
}
}
-
+
+ public void clearQueues()
+ {
+ String methodName = "clearQueues";
+ if ( internal_pinger != null ) {
+ try {
+ internal_pinger.clearQueues();
+ } catch (Throwable e) {
+ logger.warn(methodName, sset.getId(), "Error clearing queues: ", e.toString());
+ }
+ } else {
+ // external pinger
+ }
+ }
+
synchronized int getMetaPingRate()
{
return meta_ping_rate;
@@ -212,18 +229,18 @@ class PingDriver
public void runAsThread()
{
String methodName = "runAsThread";
- UimaAsPing uap = new UimaAsPing(logger);
+ internal_pinger = new UimaAsPing(logger);
try {
- uap.init(endpoint);
+ internal_pinger.init(endpoint);
} catch ( Throwable t ) {
logger.warn(methodName, sset.getId(), t);
sset.pingExited();
}
while ( ! shutdown ) {
- handleStatistics(uap.getStatistics());
+ handleStatistics(internal_pinger.getStatistics());
if ( errors > error_threshold ) {
- uap.stop();
+ internal_pinger.stop();
logger.warn(methodName, sset.getId(), "Ping exited because of excess errors: ", errors);
sset.pingExited();
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java Tue Apr 30 20:07:39 2013
@@ -660,31 +660,33 @@ public class ServiceHandler
// if it hadn't been before.
// sset.resetRunFailures();
} else {
- sset.removeImplementor(id);
-
- JobCompletionType jct = w.getCompletionType();
JobState state = w.getJobState();
- logger.info(methodName, id, "Removing stopped instance from maps: state[", state, "] completion[", jct, "]");
- switch ( jct ) {
- case EndOfJob:
- case CanceledByUser:
- case CanceledByAdministrator:
- case Undefined:
- break;
- default:
- logger.debug(methodName, id, "RECORDING FAILURE");
- // all other cases are errors that contribute to the error count
- if ( sset.excessiveRunFailures() ) { // if true, the count is exceeeded, but reset
- logger.warn(methodName, null, "Process Failure: " + jct + " Maximum consecutive failures[" + sset.failure_run + "] max [" + sset.failure_max + "]");
- } else {
- sset.start();
- }
- break;
- }
+ if ( state == JobState.Completed ) {
+ sset.removeImplementor(id);
+ JobCompletionType jct = w.getCompletionType();
+
+ logger.info(methodName, id, "Removing stopped instance from maps: state[", state, "] completion[", jct, "]");
+ switch ( jct ) {
+ case EndOfJob:
+ case CanceledByUser:
+ case CanceledByAdministrator:
+ case Undefined:
+ break;
+ default:
+ logger.debug(methodName, id, "RECORDING FAILURE");
+ // all other cases are errors that contribute to the error count
+ if ( sset.excessiveRunFailures() ) { // if true, the count is exceeeded, but reset
+ logger.warn(methodName, null, "Process Failure: " + jct + " Maximum consecutive failures[" + sset.failure_run + "] max [" + sset.failure_max + "]");
+ } else {
+ sset.start();
+ }
+ break;
+ }
+ }
}
- if ( (sset.getServiceState() == ServiceState.NotAvailable) && (sset.countReferences() == 0) ) {
+ if ( (sset.getServiceState() == ServiceState.NotAvailable) && (sset.countReferences() == 0) && (sset.countImplementors() == 0) ) {
// this service is now toast. remove from our maps asap to avoid clashes if it gets
// resubmitted before the OR can purge it.
if ( ! sset.isRegistered() ) {
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Tue Apr 30 20:07:39 2013
@@ -104,9 +104,14 @@ public class ServiceSet
// Registered services, the number of instances to maintain
int instances = 1;
- // UIMA-AS pinger
+ // Service pinger
IServiceMeta serviceMeta = null;
+ // After stopping a pinger we need to discard it,and we usually need to stop it well before
+ // it is ok to delete residual state such as UIMA-AS queues. Instead of discarding it, we
+ // stash it here to use once the last implementor seems to be dead.
+ IServiceMeta residualMeta = null;
+
// registered services state files
DuccProperties job_props = null;
DuccProperties meta_props = null;
@@ -729,12 +734,19 @@ public class ServiceSet
logger.debug(methodName, this.id, "Removing implementor", id);
implementors.remove(id);
friendly_ids.remove(id.getFriendly());
+ if ( implementors.size() == 0 ) {
+ stopPingThread();
+ }
String history = meta_props.getStringProperty(history_key, "");
history = history + " " + id.toString();
meta_props.put(history_key, history);
persistImplementors();
- if ( implementors.size() == 0 ) {
- stopPingThread();
+
+ if ( (implementors.size() == 0) && (residualMeta != null) ) { // Went to 0 and there was a pinger?
+ if ( isRegistered() || isSubmitted() ) { // Is one of our happy cases?
+ residualMeta.clearQueues(); // Try to clear residal state.
+ residualMeta = null; // All done now.
+ }
}
}
@@ -1074,8 +1086,10 @@ public class ServiceSet
if ( serviceMeta != null ) {
logger.warn(methodName, id, "Pinger exited voluntarily, setting state to Undefined. Endpoint", endpoint);
setServiceState(ServiceState.Undefined); // not really sure what state is. it will be
+
// checked and updated next run through the
// main state machine, and maybe ping restarted.
+ residualMeta = serviceMeta;
serviceMeta = null;
} else {
setServiceState(ServiceState.NotAvailable);
@@ -1085,15 +1099,17 @@ public class ServiceSet
deleteProperties();
} else {
saveMetaProperties();
- }
+ }
}
public synchronized void stopPingThread()
{
String methodName = "stopPingThread";
+
if ( serviceMeta != null ) {
logger.debug(methodName, id, "Stopping ping thread, endpoint", endpoint);
serviceMeta.stop();
+ residualMeta = serviceMeta;
serviceMeta = null;
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java?rev=1477805&r1=1477804&r2=1477805&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/UimaAsPing.java Tue Apr 30 20:07:39 2013
@@ -74,7 +74,6 @@ public class UimaAsPing
broker_jmx_port = SystemPropertyResolver.getIntProperty("ducc.sm.meta.jmx.port", 1099);
this.monitor = new UimaAsServiceMonitor(endpoint, broker_host, broker_jmx_port);
- init_monitor();
//UIMAFramework.getLogger(BaseUIMAAsynchronousEngineCommon_impl.class).setLevel(Level.OFF);
//UIMAFramework.getLogger(BaseUIMAAsynchronousEngine_impl.class).setLevel(Level.OFF);
@@ -88,6 +87,12 @@ public class UimaAsPing
if ( monitor != null ) monitor.stop();
}
+ public void clearQueues()
+ throws Throwable
+ {
+ monitor.clearQueues();
+ }
+
private void doLog(String methodName, String msg)
{
if ( logger == null ) {
@@ -97,23 +102,6 @@ public class UimaAsPing
}
}
- private synchronized void init_monitor()
- {
- String methodName = "init_monitor";
- if ( ! connected ) {
- try {
- doLog(methodName, "Initializing monitor");
- monitor.init(ep);
- connected = true;
- doLog(methodName, "Monitor initialized");
- } catch (Throwable t ) {
- connected = false;
- // t.printStackTrace();
- doLog(methodName, "Cannot initialize monitor: " + t.toString());
- }
- }
- }
-
public ServiceStatistics getStatistics()
{
String methodName = "getStatistics";
@@ -128,12 +116,7 @@ public class UimaAsPing
try {
// this sends GetMeta request and blocks waiting for a reply
- init_monitor();
- if ( connected ) {
- statistics = monitor.getStatistics();
- } else {
- return statistics;
- }
+ statistics = monitor.getStatistics();
uimaAsEngine.initialize(appCtx);
statistics.setAlive(true);
@@ -142,8 +125,14 @@ public class UimaAsPing
} catch( ResourceInitializationException e) {
doLog(methodName, "Cannot issue getMeta to: " + endpoint + ":" + broker);
+ statistics.setHealthy(false);
+ statistics.setAlive(false);
} finally {
- uimaAsEngine.stop();
+ try {
+ uimaAsEngine.stop();
+ } catch (Throwable e) {
+ doLog(methodName, "Exception on UIMA-AS connection stop:" + e.toString());
+ }
}
return statistics;