You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/02/09 20:45:33 UTC
svn commit: r1069054 -
/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
Author: cwiklik
Date: Wed Feb 9 19:45:32 2011
New Revision: 1069054
URL: http://svn.apache.org/viewvc?rev=1069054&view=rev
Log:
UIMA-2038 Modified to support clean shutdown
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1069054&r1=1069053&r2=1069054&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Wed Feb 9 19:45:32 2011
@@ -106,6 +106,7 @@ public abstract class BaseAnalysisEngine
private static final Class CLASS_NAME = BaseAnalysisEngineController.class;
private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME";
public static enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED };
+ public static final boolean NO_RECOVERY = true;
protected ServiceState currentState = ServiceState.INITIALIZING;
@@ -246,7 +247,6 @@ public abstract class BaseAnalysisEngine
private static final UimaAsVersion uimaAsVersion = new UimaAsVersion();
// Holds destination names of clients known to be dead
protected ConcurrentHashMap<String,String> deadClientDestinationMap = new ConcurrentHashMap<String, String>();
-
public BaseAnalysisEngineController() {
@@ -1880,6 +1880,8 @@ public abstract class BaseAnalysisEngine
// then wait until all CASes still in play are processed. When all CASes are processed
// we proceed with the shutdown of delegates and finally of the top level service.
if (isTopLevelComponent()) {
+ getInputChannel().setTerminating();
+
// Stops all input channels of this service, but keep temp reply queue input channels open
// to process replies.
stopInputChannels(InputChannel.InputChannels);
@@ -1916,6 +1918,7 @@ public abstract class BaseAnalysisEngine
"quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAEE_onEmpty_callback_received__INFO", new Object[] { getComponentName() });
}
+ getInputChannel().terminate();
stop();
}
}
@@ -1958,8 +1961,13 @@ public abstract class BaseAnalysisEngine
} else if (!isStopped()) {
stopDelegateTimers();
getOutputChannel().cancelTimers();
+ InputChannel iC = getInputChannel(endpointName);
+ if ( iC != null) {
+ iC.setTerminating();
+ }
// Stop the inflow of new input CASes
stopInputChannel();
+ iC.terminate();
stopCasMultipliers();
stopTransportLayer();
if (cause != null && aCasReferenceId != null) {
@@ -2103,54 +2111,61 @@ public abstract class BaseAnalysisEngine
}
}
}
-
+ private void setInputChannelForNoRecovery() {
+ if ( inputChannelMap.size() > 0 ) {
+ InputChannel iC = getInputChannel();
+ iC.setTerminating();
+ }
+ }
+ protected void stopInputChannels( int channelsToStop) { //, boolean norecovery) {
+ InputChannel iC = null;
+ setInputChannelForNoRecovery();
+ Iterator it = inputChannelMap.keySet().iterator();
+ int i = 1;
+ while (it.hasNext()) {
+ try {
+ String key = (String) it.next();
+ if (key != null && key.trim().length() > 0) {
+ iC = (InputChannel) inputChannelMap.get(key);
+ if (iC != null) {
+ if (channelsToStop == InputChannel.InputChannels && iC.getServiceInfo() != null
+ && iC.getServiceInfo().getInputQueueName().startsWith("top_level_input_queue")) {
+ // This closes both listeners on the input queue: Process Listener and GetMeta
+ // Listener
+ iC.stop(channelsToStop);
+ return; // Just closed input channels. Keep the others open
+ }
+ iC.stop(channelsToStop);
+ }
+ }
+ i++;
+ } catch (Exception e) {
+ if (iC != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+ "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_unable_to_stop_inputchannel__INFO",
+ new Object[] { getComponentName(), iC.getInputQueueName() });
+ }
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_exception_WARNING", getComponentName());
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+ }
+ }
+ }
+ }
+
+ }
/**
* Aggregates have more than one Listener channel. This method stops all configured input channels
* this service is configured with.
*
*/
- protected void stopInputChannels(int channelsToStop) {
- InputChannel iC = null;
- Iterator it = inputChannelMap.keySet().iterator();
- int i = 1;
- while (it.hasNext()) {
- try {
- String key = (String) it.next();
- if (key != null && key.trim().length() > 0) {
- iC = (InputChannel) inputChannelMap.get(key);
- if (iC != null) {
- if (channelsToStop == InputChannel.InputChannels && iC.getServiceInfo() != null
- && iC.getServiceInfo().getInputQueueName().startsWith("top_level_input_queue")) {
- // This closes both listeners on the input queue: Process Listener and GetMeta
- // Listener
- iC.stop(channelsToStop);
- return; // Just closed input channels. Keep the others open
- }
- iC.stop(channelsToStop);
- }
- }
- i++;
- } catch (Exception e) {
- if (iC != null) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
- "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_unable_to_stop_inputchannel__INFO",
- new Object[] { getComponentName(), iC.getInputQueueName() });
- }
- } else {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_service_exception_WARNING", getComponentName());
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
- "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_exception__WARNING", e);
- }
- }
- }
- }
- }
public AnalysisEngineController getCasMultiplierController(String cmKey) {
List<AnalysisEngineController> colocatedControllerList =