You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/04/10 16:07:44 UTC

svn commit: r763938 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Author: cwiklik
Date: Fri Apr 10 14:07:44 2009
New Revision: 763938

URL: http://svn.apache.org/viewvc?rev=763938&view=rev
Log:
UIMA-1109 Modified to support orderly shutdown of Uima AS service

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=763938&r1=763937&r2=763938&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Apr 10 14:07:44 2009
@@ -52,6 +52,7 @@
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.util.Level;
 import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.JmsException;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.support.destination.DestinationResolver;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -67,7 +68,7 @@
 	private AnalysisEngineController controller;
 	private volatile boolean failed = false;
 	private Object mux = new Object();
-	private UimaDefaultMessageListenerContainer __listenerRef;
+	private final UimaDefaultMessageListenerContainer __listenerRef;
 	private TaskExecutor taskExecutor = null;	
 	private ConnectionFactory connectionFactory = null;
 	private Object mux2 = new Object();
@@ -82,7 +83,7 @@
   //	listener purpose is to increment number of children for
   //	an input CAS. 
   private ConcurrentMessageListener concurrentListener = null;
-
+  private volatile boolean awaitingShutdown = false;
   
   public UimaDefaultMessageListenerContainer()
 	{
@@ -118,7 +119,11 @@
 	/**
 	 * Stops this Listener
 	 */
-	private synchronized void handleListenerFailure() {
+	private  void handleListenerFailure() {
+	  // If shutdown already, nothing to do
+	  if ( awaitingShutdown ) {
+      return;
+    }
     try {
       if ( controller instanceof AggregateAnalysisEngineController ) {
         String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
@@ -283,6 +288,10 @@
 	 */
 	protected void handleListenerSetupFailure( Throwable t, boolean alreadyHandled )
 	{
+	  // If shutdown already, nothing to do
+	  if ( awaitingShutdown ) {
+      return;
+    }
 	  // If controller is stopping not need to recover the connection
 	  if ( controller != null && controller.isStopped()) {
 	    return;
@@ -342,21 +351,26 @@
                 new Object[] {  controller.getComponentName(), getBrokerUrl() });
     }
     controller.notifyListenersWithInitializationStatus(new ResourceInitializationException(t));
+    if ( !controller.isStopped() && !controller.isAwaitingCacheCallbackNotification()) {
     controller.stop();
+    }
 	}
 	protected void handleListenerException( Throwable t )
 	{
-      t.printStackTrace();
-      String endpointName = 
+	  // Already shutdown, nothing to do
+    if ( awaitingShutdown ) {
+      return;
+    }
+    t.printStackTrace();
+    String endpointName = 
         (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName(); 
       
-      if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+    if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
                 "handleListenerException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
                 new Object[] {  endpointName, getBrokerUrl(), t });
-      }
-			super.handleListenerException(t);
-		
+    }
+    super.handleListenerException(t);
 	}
 
 	private void allPropertiesSet() {
@@ -660,6 +674,9 @@
 
 	public void onException(JMSException arg0)
 	{
+    if ( awaitingShutdown ) {
+      return;
+    }
 	  arg0.printStackTrace();
     String endpointName = 
       (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName(); 
@@ -684,35 +701,61 @@
     System.out.println("Injected Updated Task Executor Into Listener For Destination:"+getDestination());
   }
   /**
-   * Called when the object goes out of scope. Main task in this method is to list 
-   * all threads and wait until they terminate. 
+   * Delegate shutdown to the super class
+   */
+  public void doDestroy() {
+    super.destroy();
+  }
+  /**
+   * Spins a shutdown thread and stops Sprint and ActiveMQ threads. 
    * 
    */
   public void destroy() {
-    super.destroy();
+    if ( awaitingShutdown ) {
+      return;
+    }
+    awaitingShutdown = true;
+    if ( getDestination() != null ) {
+      System.out.println("Listener:"+getDestination()+" Destroy Called. Active Consumer Count:"+super.getActiveConsumerCount());
+    } else {
+      System.out.println("Listener:"+getDestinationName()+" Destroy Called. Active Consumer Count:"+super.getActiveConsumerCount());
+    }
     //  Spin a thread that will wait until all threads complete. This is needed to avoid
     //  memory leak caused by the fact that we did not wait to collect the threads
-    //  
     Thread threadGroupDestroyer = new Thread(threadGroup.getParent().getParent(),"threadGroupDestroyer") {
         public void run() {
+          try {
+            //  stop Spring listener and ActiveMQ threads
+            __listenerRef.stop();
+            __listenerRef.closeConnection();
+          } catch( Exception e) {}
+          //  If using non-default TaskExecutor, stop its threads
           if ( taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
-            ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdown();
-            try {
-              ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-            } catch ( Exception e){}
+            if (!((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().isTerminated() ) {
+              ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdown();
+              try {
+                ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+              } catch ( Exception e){}
+            }
           }
+          //  Shutdown the listener
+          __listenerRef.shutdown();
           if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST) ) {
             threadGroup.getParent().list();
           }
-
+          //  Wait until all threads are accounted for
           while (threadGroup.activeCount() > 0) {
             try {
               Thread.sleep(100);
             } catch (InterruptedException e) {
             }
           }
-          threadGroup.destroy();
           try {
+            synchronized(threadGroup ) {
+              if ( !threadGroup.isDestroyed() ) {
+                threadGroup.destroy();
+              }
+            }
             System.out.println(">>>>>>>>>>>> Listener:"+getDestinationName()+" Thread Group Destroyed");
           } catch( Exception e) {}   // Ignore 
         }
@@ -757,4 +800,8 @@
       ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().prestartAllCoreThreads();
     }
   }
+  public void stop() throws JmsException {
+    setAcceptMessagesWhileStopping(false);
+    destroy();
+  }
 }