You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/02/09 21:54:48 UTC

svn commit: r1069089 - /uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Author: cwiklik
Date: Wed Feb  9 20:54:47 2011
New Revision: 1069089

URL: http://svn.apache.org/viewvc?rev=1069089&view=rev
Log:
UIMA-2038 Modified to support clean shutdown

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1069089&r1=1069088&r2=1069089&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Wed Feb  9 20:54:47 2011
@@ -19,16 +19,17 @@
 
 package org.apache.uima.adapter.jms.activemq;
 
-import java.lang.reflect.Method;
 import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
@@ -37,7 +38,6 @@ import javax.jms.TemporaryQueue;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.InputChannel;
@@ -45,9 +45,9 @@ import org.apache.uima.aae.UIMAEE_Consta
 import org.apache.uima.aae.UimaAsThreadFactory;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
-import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
 import org.apache.uima.aae.delegate.Delegate;
 import org.apache.uima.aae.error.ErrorHandler;
 import org.apache.uima.aae.error.Threshold;
@@ -104,10 +104,20 @@ public class UimaDefaultMessageListenerC
 
   private volatile boolean awaitingShutdown = false;
   
-  private boolean brokerWithDaemonThreads = false;
+  //  When set to true, this flag prevents spring from using refreshUntilSuccessful
+  //  logic which attempts to recover the connection. This flag is set to true during the
+  //  service shutdown
+  public static volatile boolean terminating;
+
+  private ThreadPoolExecutor threadPoolExecutor = null;
+  
+  private boolean pluginThreadPool;
   
   public UimaDefaultMessageListenerContainer() {
     super();
+    // reset global static. This only effects unit testing as services are deployed 
+    // in the same process.
+    terminating = false;
     UIMAFramework.getLogger(CLASS_NAME).setLevel(Level.WARNING);
     __listenerRef = this;
     setRecoveryInterval(5);
@@ -121,7 +131,24 @@ public class UimaDefaultMessageListenerC
     this();
     this.freeCasQueueListener = freeCasQueueListener;
   }
+  /**
+   * Overriden Spring's method that tries to recover from lost connection. We dont 
+   * want to recover when the service is stopping.
+   */
+  protected void refreshConnectionUntilSuccessful() {
+	  if ( !terminating ) {
+		  super.refreshConnectionUntilSuccessful();
+	  }
+  }
+  protected void recoverAfterListenerSetupFailure() {
+	  if ( !terminating ) {
+		  super.recoverAfterListenerSetupFailure();
+	  }
+  }
 
+  public void setTerminating() {
+    terminating = true;
+  }
   public void setController(AnalysisEngineController aController) {
     controller = aController;
   }
@@ -389,11 +416,8 @@ public class UimaDefaultMessageListenerC
    */
   protected void handleListenerSetupFailure(Throwable t, boolean alreadyHandled) {
     // If shutdown already, nothing to do
-    if (awaitingShutdown) {
-      return;
-    }
-    // If controller is stopping not need to recover the connection
-    if (controller != null && controller.isStopped()) {
+	    // If controller is stopping no need to recover the connection
+    if (awaitingShutdown || terminating || (controller != null && controller.isStopped()) ) {
       return;
     }
     if ( controller != null ) {
@@ -520,8 +544,6 @@ public class UimaDefaultMessageListenerC
     try {
 
       injectConnectionFactory();
-      initializeTaskExecutor();
-      injectTaskExecutor();
       super.initialize();
     } catch (Exception e) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -561,14 +583,15 @@ public class UimaDefaultMessageListenerC
       super.setMessageListener(messageListener);
     }
   }
-
+  public void afterPropertiesSet() {
+    afterPropertiesSet(true);
+  }
   /**
    * Called by Spring and some Uima AS components when all properties have been set. This method
    * spins a thread in which the listener is initialized.
    */
-  public void afterPropertiesSet() {
+  public void afterPropertiesSet(final boolean propagate) {
     if (endpoint != null) {
-
 			// Override the prefetch size. The dd2spring always sets this to 1 which 
 			// may effect the throughput of a service. Change the prefetch size to
 			// number of consumer threads defined in DD.
@@ -590,7 +613,8 @@ public class UimaDefaultMessageListenerC
       super.setConcurrentConsumers(1);
       if (cc > 1) {
         try {
-          concurrentListener = new ConcurrentMessageListener(cc, ml, getDestinationName());
+          String prefix = endpoint.getDelegateKey()+" Reply Thread";
+          concurrentListener = new ConcurrentMessageListener(cc, ml, getDestinationName(), threadGroup,prefix);
           super.setMessageListener(concurrentListener);
         } catch (Exception e) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -607,11 +631,11 @@ public class UimaDefaultMessageListenerC
           return;
         }
       } else {
-        super.setMessageListener(ml);
+        pluginThreadPool = true;
       }
     } else {
-      super.setMessageListener(ml);
       super.setConcurrentConsumers(cc);
+      pluginThreadPool = true;
     }
     Thread t = new Thread(threadGroup, new Runnable() {
       public void run() {
@@ -643,14 +667,23 @@ public class UimaDefaultMessageListenerC
           }
           // Plug in connection Factory to Spring's Listener
           __listenerRef.injectConnectionFactory();
+          
+          if ( pluginThreadPool ) {
+            setUimaASThreadPoolExecutor(cc);
+          }
+          
           // Initialize the TaskExecutor. This call injects a custom Thread Pool into the
           // TaskExecutor provided in the spring xml. The custom thread pool initializes
           // an instance of AE in a dedicated thread
           initializeTaskExecutor();
-          // Plug in TaskExecutor to Spring's Listener
-          __listenerRef.injectTaskExecutor();
-          // Notify Spring Listener that all properties are ready
-          __listenerRef.allPropertiesSet();
+          if ( threadPoolExecutor == null ) {
+              // Plug in TaskExecutor to Spring's Listener
+              __listenerRef.injectTaskExecutor();
+          }
+          if ( propagate ) {
+            // Notify Spring Listener that all properties are ready
+            __listenerRef.allPropertiesSet();
+          }
           if (isActiveMQDestination() && destination != null) {
             destinationName = ((ActiveMQDestination) destination).getPhysicalName();
           }
@@ -686,6 +719,9 @@ public class UimaDefaultMessageListenerC
                   "afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAJMS_jms_listener_failed_WARNING",
                   new Object[] { destination, getBrokerUrl(), e });
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+                  "afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAJMS_exception__WARNING", e);
         }
       }
     });
@@ -747,7 +783,9 @@ public class UimaDefaultMessageListenerC
     ((TempDestinationResolver) resolver).setListener(this);
     super.setDestinationResolver(resolver);
   }
-
+  /**
+   * Closes shares connection to a broker
+  **/
   public void closeConnection() throws Exception {
     try {
       setRecoveryInterval(0);
@@ -872,11 +910,10 @@ public class UimaDefaultMessageListenerC
    * 
    */
   public void destroy() {
+	  
     if (awaitingShutdown) {
       return;
     }
-    awaitingShutdown = true;
-    
     // Spin a thread that will wait until all threads complete. This is needed to avoid
     // memory leak caused by the fact that we did not wait to collect the threads
     Thread threadGroupDestroyer = new Thread(threadGroup.getParent().getParent(),
@@ -884,32 +921,23 @@ public class UimaDefaultMessageListenerC
       public void run() {
         try {
         	if ( !__listenerRef.awaitingShutdown && __listenerRef.isRunning() ) {
-                // stop Spring listener and ActiveMQ threads
+        	    awaitingShutdown = true;
                 __listenerRef.stop();
-                __listenerRef.closeConnection();
+                // If using non-default TaskExecutor, stop its threads
+                if (taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
+                    ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdownNow();
+                } else if (concurrentListener != null) {
+                  // Stop internal Executor
+                  concurrentListener.stop();
+                } else if ( threadPoolExecutor != null ) {
+                	threadPoolExecutor.shutdownNow();
+                }
+                __listenerRef.shutdown();
         	}
         } catch (Exception e) {
+        	e.printStackTrace();
         }
-        // If using non-default TaskExecutor, stop its threads
-        if (taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
-          ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdown();
-          // Since the calling thread may be one of those managed by the executor allow
-          // for one open thread when checking active thread count.
-          while (((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().getActiveCount() > 1
-                  && !((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor()
-                          .isTerminated()) {
-            try {
-              ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().awaitTermination(200,
-                      TimeUnit.MILLISECONDS);
-            } catch (Exception e) {
-            }
-          }
-        } else if (concurrentListener != null) {
-          // Stop internal Executor
-          concurrentListener.stop();
-        }
-        // Shutdown the listener
-        __listenerRef.shutdown();
+
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
           threadGroup.getParent().list();
         }
@@ -957,6 +985,7 @@ public class UimaDefaultMessageListenerC
     
     
   }
+  
   private boolean isAmqThread(Thread t) {
 	  String tName = t.getName();
 	  // The following is necessary to account for the AMQ threads
@@ -969,6 +998,34 @@ public class UimaDefaultMessageListenerC
 	  }
 	  return true;
   }
+  private void setUimaASThreadPoolExecutor(int consumentCount) throws Exception{
+    super.setMessageListener(ml);
+    // create task executor with custom thread pool for:
+    // 1) GetMeta request processing
+    // 2) ReleaseCAS request
+    if ( taskExecutor == null ) {
+      UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
+      tf.setDaemon(true);
+      if ( isFreeCasQueueListener()) {
+        tf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread");
+      } else if ( isGetMetaListener()  ) {
+        tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+      } else if ( getDestination() != null && getMessageSelector() != null ) {
+        tf.setThreadNamePrefix(controller.getComponentName() + " Process Thread");
+      } else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
+        tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+      } else { 
+        throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()");
+      }
+      ExecutorService es = Executors.newFixedThreadPool(consumentCount,tf);
+      if ( es instanceof ThreadPoolExecutor ) {
+          threadPoolExecutor = (ThreadPoolExecutor)es;
+          super.setTaskExecutor(es);
+      }
+    }
+  }
+
+  
   /**
    * Called by Spring to inject TaskExecutor
    */
@@ -1002,6 +1059,7 @@ public class UimaDefaultMessageListenerC
       // PrimitiveController so that every thread can call it to initialize
       // the next available instance of a AE.
       tf = new UimaAsThreadFactory(threadGroup, (PrimitiveAnalysisEngineController) controller);
+      ((UimaAsThreadFactory)tf).setDaemon(true);
       // This ThreadExecutor will use custom thread factory instead of defult one
       ((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf);
       // Initialize the thread pool
@@ -1014,6 +1072,10 @@ public class UimaDefaultMessageListenerC
         controller.changeState(ServiceState.RUNNING);
       }
     }
+    
+    if ( threadPoolExecutor != null ) {
+    	threadPoolExecutor.prestartAllCoreThreads();
+    }
   }
 
   public void stop() throws JmsException {