You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/02/09 20:45:33 UTC

svn commit: r1069054 - /uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java

Author: cwiklik
Date: Wed Feb  9 19:45:32 2011
New Revision: 1069054

URL: http://svn.apache.org/viewvc?rev=1069054&view=rev
Log:
UIMA-2038 Modified to support clean shutdown

Modified:
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1069054&r1=1069053&r2=1069054&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Wed Feb  9 19:45:32 2011
@@ -106,6 +106,7 @@ public abstract class BaseAnalysisEngine
   private static final Class CLASS_NAME = BaseAnalysisEngineController.class;
   private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME";
   public static enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, FAILED };
+  public static final boolean NO_RECOVERY = true;
   
   protected ServiceState currentState = ServiceState.INITIALIZING;
   
@@ -246,7 +247,6 @@ public abstract class BaseAnalysisEngine
   private static final UimaAsVersion uimaAsVersion = new UimaAsVersion();
   // Holds destination names of clients known to be dead
   protected ConcurrentHashMap<String,String> deadClientDestinationMap = new ConcurrentHashMap<String, String>();
-
   
   public BaseAnalysisEngineController() {
 
@@ -1880,6 +1880,8 @@ public abstract class BaseAnalysisEngine
       // then wait until all CASes still in play are processed. When all CASes are processed
       // we proceed with the shutdown of delegates and finally of the top level service.
       if (isTopLevelComponent()) {
+          getInputChannel().setTerminating();
+
         // Stops all input channels of this service, but keep temp reply queue input channels open
         // to process replies.
         stopInputChannels(InputChannel.InputChannels);
@@ -1916,6 +1918,7 @@ public abstract class BaseAnalysisEngine
                   "quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAEE_onEmpty_callback_received__INFO", new Object[] { getComponentName() });
         }
+        getInputChannel().terminate();
         stop();
       }
     }
@@ -1958,8 +1961,13 @@ public abstract class BaseAnalysisEngine
     } else if (!isStopped()) {
       stopDelegateTimers();
       getOutputChannel().cancelTimers();
+      InputChannel iC = getInputChannel(endpointName);
+      if ( iC != null) {
+          iC.setTerminating();
+      }
       // Stop the inflow of new input CASes
       stopInputChannel();
+      iC.terminate();
       stopCasMultipliers();
       stopTransportLayer();
       if (cause != null && aCasReferenceId != null) {
@@ -2103,54 +2111,61 @@ public abstract class BaseAnalysisEngine
       }
     }
   }
-
+  private void setInputChannelForNoRecovery() {
+	  if ( inputChannelMap.size() > 0 ) {
+		  InputChannel iC = getInputChannel();
+		  iC.setTerminating();
+	  }
+  }
+  protected void stopInputChannels( int channelsToStop) {   //, boolean norecovery) {
+	    InputChannel iC = null;
+	    setInputChannelForNoRecovery();
+	    Iterator it = inputChannelMap.keySet().iterator();
+	    int i = 1;
+	    while (it.hasNext()) {
+	      try {
+	        String key = (String) it.next();
+	        if (key != null && key.trim().length() > 0) {
+	          iC = (InputChannel) inputChannelMap.get(key);
+	          if (iC != null) {
+	            if (channelsToStop == InputChannel.InputChannels && iC.getServiceInfo() != null
+	                    && iC.getServiceInfo().getInputQueueName().startsWith("top_level_input_queue")) {
+	              // This closes both listeners on the input queue: Process Listener and GetMeta
+	              // Listener
+	            	iC.stop(channelsToStop);
+	              return; // Just closed input channels. Keep the others open
+	            }
+	            iC.stop(channelsToStop);
+	          }
+	        }
+	        i++;
+	      } catch (Exception e) {
+	        if (iC != null) {
+	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+	                    "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                    "UIMAEE_unable_to_stop_inputchannel__INFO",
+	                    new Object[] { getComponentName(), iC.getInputQueueName() });
+	          }
+	        } else {
+	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+	                    "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                    "UIMAEE_service_exception_WARNING", getComponentName());
+	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+	                    "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                    "UIMAEE_exception__WARNING", e);
+	          }
+	        }
+	      }
+	    }
+	  
+  }
   /**
    * Aggregates have more than one Listener channel. This method stops all configured input channels
    * this service is configured with.
    * 
    */
-  protected void stopInputChannels(int channelsToStop) {
-    InputChannel iC = null;
-    Iterator it = inputChannelMap.keySet().iterator();
-    int i = 1;
-    while (it.hasNext()) {
-      try {
-        String key = (String) it.next();
-        if (key != null && key.trim().length() > 0) {
-          iC = (InputChannel) inputChannelMap.get(key);
-          if (iC != null) {
-            if (channelsToStop == InputChannel.InputChannels && iC.getServiceInfo() != null
-                    && iC.getServiceInfo().getInputQueueName().startsWith("top_level_input_queue")) {
-              // This closes both listeners on the input queue: Process Listener and GetMeta
-              // Listener
-              iC.stop(channelsToStop);
-              return; // Just closed input channels. Keep the others open
-            }
-            iC.stop(channelsToStop);
-          }
-        }
-        i++;
-      } catch (Exception e) {
-        if (iC != null) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
-                    "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_unable_to_stop_inputchannel__INFO",
-                    new Object[] { getComponentName(), iC.getInputQueueName() });
-          }
-        } else {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                    "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_service_exception_WARNING", getComponentName());
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
-                    "stopInputChannels", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_exception__WARNING", e);
-          }
-        }
-      }
-    }
-  }
 
   public AnalysisEngineController getCasMultiplierController(String cmKey) {
     List<AnalysisEngineController> colocatedControllerList =