You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/12/13 15:51:12 UTC

svn commit: r1213723 - in /uima/uima-as/trunk: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-core/src/main/java/org/apache/uima/aae/ uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ uimaj-as-core/src/main/res...

Author: cwiklik
Date: Tue Dec 13 14:51:11 2011
New Revision: 1213723

URL: http://svn.apache.org/viewvc?rev=1213723&view=rev
Log:
UIMA-2309 Refactored code that handles orderly shutdown of uima as service. On shutdown, AE.destroy() is called on the same thread that initialized it.  

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/Channel.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePool.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
    uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
    uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Tue Dec 13 14:51:11 2011
@@ -840,17 +840,20 @@ public class JmsInputChannel implements 
     }
   }
 
-  private void stopChannel(UimaDefaultMessageListenerContainer mL) throws Exception {
+  private void stopChannel(UimaDefaultMessageListenerContainer mL, boolean shutdownNow) throws Exception {
     String eName = mL.getEndpointName();
     if (eName != null) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stopChannel",
                 JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_jms_transport__INFO",
-                new Object[] { eName });
+                new Object[] { eName, shutdownNow });
       }
     }
-    mL.stop();
-
+    mL.delegateStop();
+    if (shutdownNow) {
+        mL.destroy(shutdownNow);
+    }
+   
     String selector = "";
     if (mL.getMessageSelector() != null) {
       selector = " Selector:" + mL.getMessageSelector();
@@ -877,8 +880,8 @@ public class JmsInputChannel implements 
     return true;
   }
 
-  public void stop() throws Exception {
-    stop(InputChannel.CloseAllChannels);
+  public void stop(boolean shutdownNow) throws Exception {
+    stop(InputChannel.CloseAllChannels, shutdownNow);
     listenerContainerList.clear();
     failedListenerMap.clear();
     if ( remoteJMXServer != null ) {
@@ -886,6 +889,12 @@ public class JmsInputChannel implements 
       remoteJMXServer = null;
     }
   }
+  public void disconnectListenersFromQueue() throws Exception {
+	for (Object listenerObject : listenerContainerList) {
+	  final UimaDefaultMessageListenerContainer mL = (UimaDefaultMessageListenerContainer) listenerObject;
+	  	stopChannel(mL, false);
+	}	  
+  }
   public void setTerminating() {
   	 if ( listenerContainerList.size() > 0 ) {
   		 //	set a global static flag to stop spring's from automatic recovery on lost connection
@@ -905,12 +914,12 @@ public class JmsInputChannel implements 
 	 }
 	  
   }
-  public synchronized void stop(int channelsToClose) throws Exception {
-    List<UimaDefaultMessageListenerContainer> listenersToRemove = new ArrayList<UimaDefaultMessageListenerContainer>();
+  public synchronized void stop(int channelsToClose, boolean shutdownNow) throws Exception {
+	  List<UimaDefaultMessageListenerContainer> listenersToRemove = new ArrayList<UimaDefaultMessageListenerContainer>();
     for (Object listenerObject : listenerContainerList) {
       final UimaDefaultMessageListenerContainer mL = (UimaDefaultMessageListenerContainer) listenerObject;
       if (mL != null && doCloseChannel(mL, channelsToClose)) {
-    	  stopChannel(mL);
+    	  stopChannel(mL, shutdownNow);
         // Just in case check if the container still in the list. If so, add it to
         // another list that container listeners that have been stopped and need
         // to be removed from the listenerContainerList. Removing the listener from
@@ -1014,7 +1023,7 @@ public class JmsInputChannel implements 
         }
         newListener.afterPropertiesSet();
         if ( controller != null && controller.isStopped() ) {
-          newListener.stop();
+          newListener.destroy(true);  // shutdownNow
           //  we are aborting, the controller has been stopped
           return;
         }

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Tue Dec 13 14:51:11 2011
@@ -1671,12 +1671,15 @@ public class JmsOutputChannel implements
     long t = System.nanoTime();
     getAnalysisEngineController().saveReplyTime(t, "");
   }
-
   public void stop() {
-    stop(Channel.CloseAllChannels);
+	    stop(Channel.CloseAllChannels, true);
+}
+
+  public void stop(boolean shutdownNow) {
+	    stop(Channel.CloseAllChannels, shutdownNow);
   }
 
-  public void stop(int channelsToClose) {
+  public void stop(int channelsToClose, boolean shutdownNow) {
     aborting = true;
     try {
       // Fetch iterator over all Broker Connections. This service may be connected

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Tue Dec 13 14:51:11 2011
@@ -923,15 +923,22 @@ public class UimaDefaultMessageListenerC
     //  we will start listeners on input queue.
     this.setAutoStartup(false);
   }
-  public void shutdownTaskExecutor(ThreadPoolExecutor tpe) throws InterruptedException {
-    tpe.purge();
-    tpe.shutdownNow();
+  public void shutdownTaskExecutor(ThreadPoolExecutor tpe, boolean stopImmediate) throws InterruptedException {
+    if ( stopImmediate ) {
+  	  tpe.purge();
+      tpe.shutdownNow();
+    } else {
+      tpe.shutdown();
+    }
+  }
+  public void destroy() {
+	  destroy(true); 
   }
   /**
    * Spins a shutdown thread and stops Sprint and ActiveMQ threads.
    * 
    */
-  public void destroy() {
+  public void destroy(final boolean stopImmediate) {
 	  
     if (awaitingShutdown) {
       return;
@@ -948,16 +955,33 @@ public class UimaDefaultMessageListenerC
                 // delegate stop request to Spring 
               __listenerRef.delegateStop();
               if (taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
-                ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().purge();
-                  ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdownNow();
-                } else if (concurrentListener != null) {
-                  shutdownTaskExecutor(concurrentListener.getTaskExecutor());
+              	//	Modify task executor to terminate idle threads. While the thread terminates
+              	//  it calls destroy() method on the pinned instance of AE
+              	((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().allowCoreThreadTimeOut(true);
+                  ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().setKeepAliveTime(1000, TimeUnit.MILLISECONDS);
+                	((ThreadPoolTaskExecutor) taskExecutor).setWaitForTasksToCompleteOnShutdown(true);
+              	((ThreadPoolTaskExecutor) taskExecutor).shutdown();
+              } else if (concurrentListener != null) {
+                  shutdownTaskExecutor(concurrentListener.getTaskExecutor(), stopImmediate);
                   concurrentListener.stop();
-                } else if ( threadPoolExecutor != null ) {
-                  shutdownTaskExecutor(threadPoolExecutor);
-                }
+              } else if ( threadPoolExecutor != null ) {
+            	  shutdownTaskExecutor(threadPoolExecutor, true);
+              }
         	}
+          // Close Connection to the broker
+          String controllerName = (__listenerRef.controller == null) ? "" :__listenerRef.controller.getComponentName();
+          __listenerRef.getSharedConnection().close();
+          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                       "destroy.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                       "UIMAJMS_listener_shutdown__INFO", new Object[] {controllerName,__listenerRef.getMessageSelector(),__listenerRef.getBrokerUrl()});
+         }
           __listenerRef.shutdown();
+         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                       "destroy.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                       "UIMAJMS_listener_jms_connection_closed__INFO", new Object[] {controllerName,__listenerRef.getMessageSelector()});
+         }
         } catch (Exception e) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
                   "destroy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
@@ -989,7 +1013,7 @@ public class UimaDefaultMessageListenerC
     // 2) ReleaseCAS request
     if ( taskExecutor == null ) {
       UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
-      tf.setDaemon(true);
+      tf.setDaemon(false);
       if ( isFreeCasQueueListener()) {
         tf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread");
       } else if ( isGetMetaListener()  ) {
@@ -1006,6 +1030,20 @@ public class UimaDefaultMessageListenerC
           threadPoolExecutor = (ThreadPoolExecutor)es;
           super.setTaskExecutor(es);
       }
+    } else {
+        UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
+        tf.setDaemon(false);
+        if ( isFreeCasQueueListener()) {
+          tf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread");
+        } else if ( isGetMetaListener()  ) {
+          tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+        } else if ( getDestination() != null && getMessageSelector() != null ) {
+          tf.setThreadNamePrefix(controller.getComponentName() + " Process Thread");
+        } else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
+          tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+        } else { 
+          throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()");
+        }
     }
   }
 

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/Channel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/Channel.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/Channel.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/Channel.java Tue Dec 13 14:51:11 2011
@@ -24,9 +24,9 @@ public interface Channel {
 
   public static final int InputChannels = 1;
 
-  public void stop() throws Exception;
+  public void stop(boolean shutdownNow) throws Exception;
 
-  public void stop(int channelsToStop) throws Exception;
+  public void stop(int channelsToStop, boolean shutdownNow) throws Exception;
 
   public String getName();
 

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java Tue Dec 13 14:51:11 2011
@@ -53,4 +53,6 @@ public interface InputChannel extends Ch
   public void setTerminating();
   
   public void terminate();
+  
+  public void disconnectListenersFromQueue() throws Exception;
 }

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaAsThreadFactory.java Tue Dec 13 14:51:11 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
+import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl;
 import org.apache.uima.util.Level;
 
 /**
@@ -44,7 +45,7 @@ public class UimaAsThreadFactory impleme
 
   private String threadNamePrefix=null;
   
-  private boolean isDaemon;
+  private boolean isDaemon=false;
   
   public static AtomicInteger poolIdGenerator = new AtomicInteger();
   
@@ -70,7 +71,7 @@ public class UimaAsThreadFactory impleme
     theThreadGroup = tGroup;
   }
   public void setDaemon(boolean daemon) {
-    isDaemon = daemon;
+ //   isDaemon = daemon;
   }
   public void stop() {
   }
@@ -99,6 +100,9 @@ public class UimaAsThreadFactory impleme
             if (controller != null && !controller.threadAssignedToAE()) {
               // call the controller to initialize next instance of AE. Once initialized this
               // AE instance process() method will only be called from this thread
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+					"UimaAsThreadFactory.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+					"UIMAEE_calling_ae_initialize__INFO", new Object[] {controller.getComponentName(),Thread.currentThread().getId()});
               controller.initializeAnalysisEngine();
             }
             // Call given Worker (Runnable) run() method and block. This call block until the
@@ -118,7 +122,18 @@ public class UimaAsThreadFactory impleme
             	  controller.notifyListenersWithInitializationStatus(ex);
         	  } 
             return;
+          } finally {
+              if ( controller instanceof PrimitiveAnalysisEngineController_impl ) {
+       			 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+        					"UimaAsThreadFactory.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+        					"UIMAEE_process_thread_exiting__INFO", new Object[] {controller.getComponentName(),Thread.currentThread().getId()});
+            	  ((PrimitiveAnalysisEngineController_impl)controller).destroyAE();
+        			 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+         					"UimaAsThreadFactory.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+         					"UIMAEE_ae_instance_destroy_called__INFO", new Object[] {controller.getComponentName(),Thread.currentThread().getId()});
+              }
           }
+        
         }
       });
     } catch (Exception e) {

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Tue Dec 13 14:51:11 2011
@@ -656,7 +656,7 @@ public class AggregateAnalysisEngineCont
     }
     // Any problems in completeInitialization() is a reason to stop
     notifyListenersWithInitializationStatus(ex);
-    super.stop();
+    super.stop(true);   // shutdown now
   }
 
   private void stopListener(String key, Endpoint endpoint) throws Exception {
@@ -3064,7 +3064,7 @@ public class AggregateAnalysisEngineCont
   }
 
   public void stop() {
-    super.stop();
+    super.stop(true);  // shutdown now
     this.cleanUp();
   }
 

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePool.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePool.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePool.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePool.java Tue Dec 13 14:51:11 2011
@@ -25,16 +25,6 @@ import org.apache.uima.analysis_engine.A
 
 public interface AnalysisEngineInstancePool {
   /**
-   * Creates and initializes the AE Pool with intances of AEs provided in the
-   * anAnalysisEngineInstanceList
-   * 
-   * @param anAnalysisEngineInstanceList
-   *          - list of AnalysisEngine instances
-   * @throws Exception
-   */
-  public void intialize(List anAnalysisEngineInstanceList) throws Exception;
-
-  /**
    * Adds an instance of AnalysisEngine to the pool
    * 
    * @param anAnalysisEngine

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java Tue Dec 13 14:51:11 2011
@@ -22,54 +22,39 @@
 
 package org.apache.uima.aae.controller;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Semaphore;
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.analysis_engine.AnalysisEngine;
 import org.apache.uima.util.Level;
 
 public class AnalysisEngineInstancePoolWithThreadAffinity implements AnalysisEngineInstancePool {
   private static final Class CLASS_NAME = AnalysisEngineInstancePoolWithThreadAffinity.class;
 
-  private boolean allThreadsAlreadyAssigned = false;
-
-  private Map aeInstanceMap = new HashMap();
-
-  private List aeList = new ArrayList();
-
-  private int analysisEnginePoolSize = 0;
-
-  public AnalysisEngineInstancePoolWithThreadAffinity(int aePoolSize) {
-    analysisEnginePoolSize = aePoolSize;
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.uima.aae.controller.AnalysisEngineInstancePool#intialize(java.util.List)
-   */
-  public void intialize(List anAnalysisEngineInstanceList) throws Exception {
-    aeList = anAnalysisEngineInstanceList;
-  }
+  private volatile boolean destroyAEInstanceIfFree=false;
+  private Semaphore lock = new Semaphore(1);
+  
+  private Map<Long, AnalysisEngine> aeInstanceMap = new HashMap<Long,AnalysisEngine>();
 
   public int size() {
     return aeInstanceMap.size();
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.uima.aae.controller.AnalysisEngineInstancePool#checkin(org.apache.uima.analysis_engine
-   * .AnalysisEngine)
-   */
-  public synchronized void checkin(AnalysisEngine anAnalysisEngine) throws Exception {
-    aeInstanceMap.put(Thread.currentThread().getId(), anAnalysisEngine);
+  public void checkin(AnalysisEngine anAnalysisEngine) throws Exception {
+	  try {
+		  lock.acquireUninterruptibly();
+		  aeInstanceMap.put(Thread.currentThread().getId(), anAnalysisEngine);
+	  } catch( Exception e) {
+		  e.printStackTrace();
+		  throw e;
+	  } finally {
+		  lock.release();
+	  }
   }
 
   public boolean exists() {
@@ -83,39 +68,38 @@ public class AnalysisEngineInstancePoolW
    * 
    * @see org.apache.uima.aae.controller.AnalysisEngineInstancePool#checkout()
    **/
-  public synchronized AnalysisEngine checkout() throws Exception {
-    AnalysisEngine ae = null;
-
-    // AEs are instantiated and initialized in the the main thread and placed in the temporary list.
-    // First time in the process() method, each thread will remove AE instance from the temporary
-    // list
-    // and place it in the permanent instanceMap. The key to the instanceMap is the thread name.
-    // Each
-    // thread will always process a CAS using its own and dedicated AE instance.
-    return (AnalysisEngine) aeInstanceMap.remove(Thread.currentThread().getId());
+  public AnalysisEngine checkout() throws Exception {
+	  try {
+		  lock.acquireUninterruptibly();
+		  if ( !exists() ) {
+			  throw new AsynchAEException("AE instance not found in AE pool. Most likely due to service quiescing");
+		  }
+	    // AEs are instantiated and initialized in the the main thread and placed in the temporary list.
+	    // First time in the process() method, each thread will remove AE instance from the temporary
+	    // list
+	    // and place it in the permanent instanceMap. The key to the instanceMap is the thread name.
+	    // Each
+	    // thread will always process a CAS using its own and dedicated AE instance.
+	    return (AnalysisEngine) aeInstanceMap.remove(Thread.currentThread().getId());
+
+	  } catch( Exception e) {
+		  throw e;
+	  } finally {
+		  lock.release();
+	  }
 
+	  
+	  
   }
-
   /*
    * (non-Javadoc)
    * 
    * @see org.apache.uima.aae.controller.AnalysisEngineInstancePool#destroy()
    */
   public void destroy() throws Exception {
-
-    Iterator aeInstanceIterator = aeInstanceMap.keySet().iterator();
-    int i = 0;
-    while (aeInstanceIterator.hasNext()) {
-      AnalysisEngine ae = (AnalysisEngine) aeInstanceMap.get((Long) aeInstanceIterator.next());
-      ae.destroy();
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "abort",
-                UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_destroying_ae__INFO",
-                new Object[] { ae.getAnalysisEngineMetaData().getName(), i });
-      }
-      i++;
-    }
-    aeInstanceMap.clear();
+	  //	set the flag so that any AE instance returned from PrimitiveController
+	  //    will be destroyed. 
+	  destroyAEInstanceIfFree = true;
   }
 
 }

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Tue Dec 13 14:51:11 2011
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import javax.management.ObjectName;
@@ -210,6 +211,8 @@ public abstract class BaseAnalysisEngine
   protected ConcurrentHashMap<String, UimaMessageListener> messageListeners = new ConcurrentHashMap<String, UimaMessageListener>();
 
   private Exception initException = null;
+  
+  private Object lock = new Object();
 
   // Local cache for this controller only. This cache stores state of
   // each CAS. The actual CAS is still stored in the global cache. The
@@ -230,7 +233,9 @@ public abstract class BaseAnalysisEngine
 
   // Monitor used in stop() to await a callback from InProcessCache
   protected Object callbackMonitor = new Object();
-
+  
+  protected Semaphore onEmptyCacheSemaphore = new Semaphore(1);
+  
   protected volatile boolean awaitingCacheCallbackNotification = false;
 
   protected ConcurrentHashMap<String, String> abortedCasesMap = new ConcurrentHashMap<String, String>();
@@ -1768,11 +1773,11 @@ public abstract class BaseAnalysisEngine
    * Stops input channel(s) and initiates a shutdown of all delegates ( if this is an aggregate ).
    * At the end sends an Exception to the client and closes an output channel.
    */
-  public void stop() {
-    this.stop(null, null);
+  public void stop(boolean shutdownNow) {
+    this.stop(null, null,shutdownNow);
   }
 
-  public void stop(Throwable cause, String aCasReferenceId) {
+  public void stop(Throwable cause, String aCasReferenceId, boolean shutdownNow ) {
     if (!isStopped()) {
       setStopped();
     }
@@ -1792,11 +1797,11 @@ public abstract class BaseAnalysisEngine
 
       getControllerLatch().release();
       // Stops the input channel of this service
-      stopInputChannels(InputChannel.CloseAllChannels);
+      stopInputChannels(InputChannel.CloseAllChannels, shutdownNow);
     } else {
       ((AggregateAnalysisEngineController_impl) this).stopTimers();
       // Stops ALL input channels of this service including the reply channels
-      stopInputChannels(InputChannel.CloseAllChannels);
+      stopInputChannels(InputChannel.CloseAllChannels,shutdownNow);
       
       List<AnalysisEngineController> colocatedControllerList = 
         ((AggregateAnalysisEngineController_impl)this).getChildControllerList();
@@ -1843,7 +1848,6 @@ public abstract class BaseAnalysisEngine
         }
       }
     }
-
     getInProcessCache().releaseAllCASes();
 
     releasedAllCASes = true;
@@ -1857,6 +1861,7 @@ public abstract class BaseAnalysisEngine
         jmxManagement.destroy();
       } catch (Exception e) {
       }
+      
       try {
         getInProcessCache().destroy();
       } catch (Exception e) {
@@ -1936,12 +1941,35 @@ public abstract class BaseAnalysisEngine
 
         // Stops all input channels of this service, but keep temp reply queue input channels open
         // to process replies.
-        stopInputChannels(InputChannel.InputChannels);
+        stopReceivingCASes();
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
                   "quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAEE_register_onEmpty_callback__INFO", new Object[] { getComponentName() });
         }
+        if ( this instanceof PrimitiveAnalysisEngineController_impl &&
+        		((PrimitiveAnalysisEngineController_impl)this).aeInstancePool != null ) {
+        	//	Since we are quiescing, destroy all AEs that are in AE pool. Those that
+        	//  are still busy processing will be destroyed when they finish.
+        	try {
+        		//	Sleep for 2secs to allow any CASes that just arrived to reach process
+        		//  method. There may be CASes in flight that just came in before we
+        		//  stopped input channel but not yet reached process method. We allow
+        		//  them to be processed before we clean AE pool below.
+        		synchronized(lock) {
+        			lock.wait(2000);
+        		}
+        		//	Set a flag on the AEPool manager to destroy any AE instance being returned
+        		//  to the pool. The AE.destroy() method must be called on the same thread
+        		//  that initialized the AE instance. Any AE instances already in the pool
+        		//  will be destroyed when a thread pool is shutdown
+        		((PrimitiveAnalysisEngineController_impl)this).aeInstancePool.destroy();
+        	} catch( Exception e) {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                        "quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                        "UIMAEE_exception__WARNING", e);
+        	}
+        }
         // Register a callback with the cache. The callback will be made when the cache becomes
         // empty
         getInProcessCache().registerCallbackWhenCacheEmpty(this,
@@ -1970,8 +1998,9 @@ public abstract class BaseAnalysisEngine
                   "quiesceAndStop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAEE_onEmpty_callback_received__INFO", new Object[] { getComponentName() });
         }
-        getInputChannel().terminate();
-        stop();
+        stopInputChannels(InputChannel.InputChannels, true);  
+        // close JMS connection 
+        stop(false); // wait for any remaining CASes in flight to finish
       }
     }
   }
@@ -2018,14 +2047,14 @@ public abstract class BaseAnalysisEngine
           iC.setTerminating();
       }
       // Stop the inflow of new input CASes
-      stopInputChannel();
-      if ( iC != null ) {
+      stopInputChannel(true);  // shutdownNow
+       if ( iC != null ) {
         iC.terminate();
       }
       stopCasMultipliers();
       stopTransportLayer();
       if (cause != null && aCasReferenceId != null) {
-        this.stop(cause, aCasReferenceId);
+        this.stop(cause, aCasReferenceId, true);  // shutdownNow
       } else {
         this.stop();
       }
@@ -2150,11 +2179,11 @@ public abstract class BaseAnalysisEngine
    * Stops a listener on the main input channel
    * 
    */
-  protected void stopInputChannel() {
+  protected void stopInputChannel(boolean shutdownNow) {
     InputChannel iC = getInputChannel(endpointName);
     if (iC != null && !iC.isStopped()) {
       try {
-        iC.stop();
+        iC.stop(shutdownNow);
       } catch (Exception e) {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "terminate",
@@ -2171,7 +2200,43 @@ public abstract class BaseAnalysisEngine
 		  iC.setTerminating();
 	  }
   }
-  protected void stopInputChannels( int channelsToStop) {   //, boolean norecovery) {
+  protected void stopReceivingCASes()  {
+	  
+	    InputChannel iC = null;
+	    setInputChannelForNoRecovery();
+	    Iterator<String> it = inputChannelMap.keySet().iterator();
+	    while (it.hasNext()) {
+	      try {
+	        String key = it.next();
+	        if (key != null && key.trim().length() > 0) {
+	          iC = (InputChannel) inputChannelMap.get(key);
+	          if (iC != null) {
+	        	  iC.disconnectListenersFromQueue();
+               }
+	        }
+	      } catch (Exception e) {
+	        if (iC != null) {
+	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(),
+	                    "stopReceivingCASes", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                    "UIMAEE_unable_to_stop_inputchannel__INFO",
+	                    new Object[] { getComponentName(), iC.getInputQueueName() });
+	          }
+	        } else {
+	          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+	                    "stopReceivingCASes", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                    "UIMAEE_service_exception_WARNING", getComponentName());
+	            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+	                    "stopReceivingCASes", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+	                    "UIMAEE_exception__WARNING", e);
+	          }
+	        }
+	      }
+	    }
+	  
+  }
+  protected void stopInputChannels( int channelsToStop, boolean shutdownNow) {   //, boolean norecovery) {
 	    InputChannel iC = null;
 	    setInputChannelForNoRecovery();
 	    Iterator it = inputChannelMap.keySet().iterator();
@@ -2186,10 +2251,10 @@ public abstract class BaseAnalysisEngine
 	                    && iC.getServiceInfo().getInputQueueName().startsWith("top_level_input_queue")) {
 	              // This closes both listeners on the input queue: Process Listener and GetMeta
 	              // Listener
-	            	iC.stop(channelsToStop);
+	            	iC.stop(channelsToStop,shutdownNow);
 	              return; // Just closed input channels. Keep the others open
 	            }
-	            iC.stop(channelsToStop);
+	            iC.stop(channelsToStop,shutdownNow);
 	          }
 	        }
 	        i++;

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Tue Dec 13 14:51:11 2011
@@ -86,7 +86,7 @@ public class PrimitiveAnalysisEngineCont
 
   // Pool containing instances of AE. The default implementation provides Thread affinity
   // meaning each thread executes the same AE instance.
-  private AnalysisEngineInstancePool aeInstancePool = null;
+  protected AnalysisEngineInstancePool aeInstancePool = null;
 
   private String abortedCASReferenceId = null;
   // Create a shared semaphore to serialize creation of AE instances.
@@ -182,7 +182,7 @@ public class PrimitiveAnalysisEngineCont
           return;
         }
         if (aeInstancePool == null) {
-          aeInstancePool = new AnalysisEngineInstancePoolWithThreadAffinity(analysisEnginePoolSize);
+          aeInstancePool = new AnalysisEngineInstancePoolWithThreadAffinity();//analysisEnginePoolSize);
         }
         if (analysisEngineMetadata == null) {
           analysisEngineMetadata = ae.getAnalysisEngineMetaData();
@@ -283,6 +283,10 @@ public class PrimitiveAnalysisEngineCont
         // Initialize Cas Manager
         if (getCasManagerWrapper() != null) {
           try {
+        	  // Below should always be true. In spring context file AsynchAECasManager_impl
+        	  // is instantiated and setCasPoolSize() method is called which sets the 
+        	  // initialized state = true. isInitialized() returning true just means that
+        	  // setCasPoolSize() was called.
             if (getCasManagerWrapper().isInitialized()) {
               getCasManagerWrapper().addMetadata(getAnalysisEngineMetadata());
               if (isTopLevelComponent()) {
@@ -477,7 +481,15 @@ public class PrimitiveAnalysisEngineCont
       }
     }
   }
+  public void destroyAE()  {
+	  try {
+		  AnalysisEngine ae = aeInstancePool.checkout();
+		  ae.destroy();
+	  } catch( Exception e) {
+		  e.printStackTrace();
+	  }
 
+  }
   /**
    * This is called when a Stop request is received from a client. Add the provided Cas id to the
    * list of aborted CASes. The process() method checks this list to determine if it should continue
@@ -1017,7 +1029,7 @@ public class PrimitiveAnalysisEngineCont
   }
 
   public void stop() {
-    super.stop();
+    super.stop(true);  // shutdown now
     if (aeInstancePool != null) {
       try {
         aeInstancePool.destroy();

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties Tue Dec 13 14:51:11 2011
@@ -245,4 +245,10 @@ UIMAEE_service_sending_release_cas_reque
 UIMAEE_service_sending_stop_request__FINE = Controller: {0} Sending STOP Request to Delegate: {1}
 UIMAEE_service_delivery_exception__WARNING = Controller: {0} Failed To Send Message to Delegates {1} Queue: {2}
 UIMAEE_service_delivery_to_client_exception__WARNING = Controller: {0} Failed To Send Message to Clients Queue: {1}
-UIMAEE_service_state__INFO={0}
\ No newline at end of file
+UIMAEE_service_state__INFO={0}
+UIMAEE_service_returned_reply__INFO=Controller: {0} Returned Input CAS: {1}
+UIMAEE_checking_in_ae_to_pool__INFO=Controller: {0} Checking in AE instance to AePoolManager. Thread Id:{1}
+UIMAEE_checked_in_ae_to_pool__INFO=Controller: {0} Checked in AE instance to AePoolManager. Thread Id:{1}
+UIMAEE_calling_ae_initialize__INFO=Controller: {0} Initializing AE instance on Thread Id: {1}
+UIMAEE_process_thread_exiting__INFO=Controller: {0} --------------- Process Thread ID:{1} EXITING
+UIMAEE_ae_instance_destroy_called__INFO=Controller: {0} --------------- AE destroy() Method Call Returned ID:{1}
\ No newline at end of file

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1213723&r1=1213722&r2=1213723&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Tue Dec 13 14:51:11 2011
@@ -68,7 +68,7 @@ UIMAJMS_connection_closed_to_endpoint__F
 UIMAJMS_open_connection_to_endpoint__FINE = Opening Connection To Endpoint: {0}
 UIMAJMS_sending_msg_to_endpoint__FINE = Sending Message to Endpoint: {0}
 UIMAJMS_service_listening__INFO = {0} Service Starting - Listening for Messages 
-UIMAJMS_stopping_jms_transport__INFO = Stopping Service JMS Transport. Service: {0}
+UIMAJMS_stopping_jms_transport__INFO = Stopping Service JMS Transport. Service: {0} ShutdownNow {1}
 UIMAJMS_recvd_msg__FINE = Service:{0} Received New Message
 UIMAJMS_connector_list__FINE = ActiveMQ Broker Connector List: {0}
 UIMAJMS_broker_uri__FINE = Broker URI: {0}
@@ -211,7 +211,8 @@ UIMAJMS_service_not_responding_to_ping__
 UIMAJMS_starting_listener__INFO=Controller: {0} Starting Listener on Endpoint: {1} Selector: {2} Broker: {3}
 UIMAJMS_caught_signal__INFO= Uima AS Service {0} Caught Kill Signal - Initiating Quiesce and Stop
 UIMAJMS_listener_added_after_initialize__WARNING = UIMA AS Already Initialized - Attempt to Add Callback Listener Failed. Add Callback Listener Before calling initialize().
-UIMAJMS_client_interrupted_INFO= UIMA AS Client Thread Interrupted While Waiting For a Reply. CAS: {0} CasHashCode: {1}
+UIMAJMS_client_interrupted_INFO= UIMA AS Client Thread [ID:{0}]Interrupted While Waiting For a Reply. CAS: {1} CasHashCode: {2}
+UIMAJMS_client_canceled_timer_INFO=UIMA AS Client Thread [ID:{0}] Cancelled Timer and Throwing Exception from sendAndReceive for CAS: {1} CasHashCode: {2}
 UIMAJMS_failed_cache_lookup__WARNING= UIMA AS Client Failed Cache Look Up For CAS: {0} Command:{1} Message:{2} Destination:{3}
 UIMAJMS_calling_onBeforeMessageSend__FINE= UIMA AS Client Calling onBeforeMessageSend - CAS:{0} CasHashCode:{1}
 UIMAJMS_completed_onBeforeMessageSend__INFO= UIMA AS Client Completed onBeforeMessageSend - CAS:{0} CasHashCode:{1}
@@ -221,4 +222,7 @@ UIMAJMS_cas_added_to_pending_FINE = UIMA
 UIMAJMS_cas_submitted_FINE=UIMA AS sendAndReceive Received CAS:{0} HashCode:{1} For Processing - Forwarding to sendCAS() on Thread:{2}
 UIMAJMS_calling_onBeforeProcessCAS_FINE = UIMA AS Client Calling onBeforeMessageProcess For CAS:{0} Hashcode:{1}
 UIMAJMS_completed_onBeforeProcessCAS_FINE = UIMA AS Client Completed onBeforeMessageProcess For CAS:{0} Hashcode:{1}
-UIMAJMS_skipping_onBeforeProcessCAS_INFO= UIMA AS Client Not Calling onBeforeMessageProcess For CAS:{0} Hashcode:{1}. Invalid state: Node: {2} IP: {3}
\ No newline at end of file
+UIMAJMS_skipping_onBeforeProcessCAS_INFO= UIMA AS Client Not Calling onBeforeMessageProcess For CAS:{0} Hashcode:{1}. Invalid state: Node IP: {2} PID: {3}
+UIMAJMS_listener_shutdown__INFO = +++++++++++++++ Controller: {0} UIMA AS Listener With Selector {1} shutdown() completed
+UIMAJMS_listener_jms_connection_closed__INFO = +++++++++++++++ Controller: {0} UIMA AS Listener With Selector {1} closed JMS connection to Broker {2}
+