You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/02/10 17:02:42 UTC

svn commit: r1069443 - /uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Author: cwiklik
Date: Thu Feb 10 16:02:42 2011
New Revision: 1069443

URL: http://svn.apache.org/viewvc?rev=1069443&view=rev
Log:
UIMA-2038 Modified destroy()

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1069443&r1=1069442&r2=1069443&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Thu Feb 10 16:02:42 2011
@@ -789,7 +789,6 @@ public class UimaDefaultMessageListenerC
   public void closeConnection() throws Exception {
     try {
       setRecoveryInterval(0);
-      setAcceptMessagesWhileStopping(false);
       setAutoStartup(false);
       if ( getSharedConnection() != null ) {
         ActiveMQConnection amqc = (ActiveMQConnection)getSharedConnection();
@@ -905,6 +904,11 @@ public class UimaDefaultMessageListenerC
     //  we will start listeners on input queue.
     this.setAutoStartup(false);
   }
+  public void shutdownTaskExecutor(ThreadPoolExecutor tpe) throws InterruptedException {
+    tpe.purge();
+    tpe.shutdownNow();
+    tpe.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+  }
   /**
    * Spins a shutdown thread and stops Sprint and ActiveMQ threads.
    * 
@@ -914,28 +918,32 @@ public class UimaDefaultMessageListenerC
     if (awaitingShutdown) {
       return;
     }
-    // Spin a thread that will wait until all threads complete. This is needed to avoid
-    // memory leak caused by the fact that we did not wait to collect the threads
+    // Spin a thread that will shutdown all taskExecutors and wait for their threads to stop.
+    // A separate thread is necessary since we cant stop a threadPoolExecutor if one of its
+    // threads is busy stopping the executor. This leads to a hang.
     Thread threadGroupDestroyer = new Thread(threadGroup.getParent().getParent(),
             "threadGroupDestroyer") {
       public void run() {
         try {
         	if ( !__listenerRef.awaitingShutdown && __listenerRef.isRunning() ) {
         	    awaitingShutdown = true;
-                __listenerRef.stop();
-                // If using non-default TaskExecutor, stop its threads
-                if (taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
-                    ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdownNow();
+                // delegate stop request to Spring 
+              __listenerRef.delegateStop();
+              if (taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
+                  ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdownNow();
+                  ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
                 } else if (concurrentListener != null) {
-                  // Stop internal Executor
+                  shutdownTaskExecutor(concurrentListener.getTaskExecutor());
                   concurrentListener.stop();
                 } else if ( threadPoolExecutor != null ) {
-                	threadPoolExecutor.shutdownNow();
+                  shutdownTaskExecutor(threadPoolExecutor);
                 }
                 __listenerRef.shutdown();
         	}
         } catch (Exception e) {
-        	e.printStackTrace();
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+                  "destroy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAJMS_exception__WARNING", e);
         }
 
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
@@ -1077,9 +1085,10 @@ public class UimaDefaultMessageListenerC
     	threadPoolExecutor.prestartAllCoreThreads();
     }
   }
-
+  public void delegateStop() {
+    super.stop();
+  }
   public void stop() throws JmsException {
-    setAcceptMessagesWhileStopping(false);
     destroy();
   }
 }