You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2012/01/06 22:08:31 UTC

svn commit: r1228406 - /uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Author: cwiklik
Date: Fri Jan  6 21:08:31 2012
New Revision: 1228406

URL: http://svn.apache.org/viewvc?rev=1228406&view=rev
Log:
UIMA-2309 Use CountDownLatch to detect process thread exit

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1228406&r1=1228405&r2=1228406&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Jan  6 21:08:31 2012
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -114,6 +115,8 @@ public class UimaDefaultMessageListenerC
   
   private boolean pluginThreadPool;
   
+  private CountDownLatch latchToCountNumberOfTerminatedThreads;
+  
   public UimaDefaultMessageListenerContainer() {
     super();
     // reset global static. This only effects unit testing as services are deployed 
@@ -689,7 +692,7 @@ public class UimaDefaultMessageListenerC
           // TaskExecutor provided in the spring xml. The custom thread pool initializes
           // an instance of AE in a dedicated thread
           if ( getMessageSelector() != null && !isGetMetaListener()) {
-            initializeTaskExecutor();
+            initializeTaskExecutor(cc);
           }
           if ( threadPoolExecutor == null ) {
               // Plug in TaskExecutor to Spring's Listener
@@ -987,7 +990,7 @@ public class UimaDefaultMessageListenerC
                        "destroy.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                        "UIMAJMS_listener_shutdown__INFO", new Object[] {controllerName,__listenerRef.getMessageSelector(),__listenerRef.getBrokerUrl()});
          }
-          __listenerRef.shutdown();
+         // __listenerRef.shutdown();
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
               UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                        "destroy.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
@@ -1013,7 +1016,18 @@ public class UimaDefaultMessageListenerC
       }
     };
     threadGroupDestroyer.start();
-    
+	  //	Wait for process threads to finish. Each thread
+	  // will count down the latch on exit. When all thread
+	  // finish we can continue. Otherwise we block on the latch
+    try {
+      if ( latchToCountNumberOfTerminatedThreads != null && cc > 1) {
+        latchToCountNumberOfTerminatedThreads.await();
+      }
+    } catch( Exception ex) {
+       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+                  "destroy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAJMS_exception__WARNING", ex);
+    }
     
   }
   
@@ -1055,6 +1069,7 @@ public class UimaDefaultMessageListenerC
         } else { 
           throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()");
         }
+        
     }
   }
 
@@ -1078,7 +1093,7 @@ public class UimaDefaultMessageListenerC
    * 
    * @throws Exception
    */
-  private void initializeTaskExecutor() throws Exception {
+  private void initializeTaskExecutor(int consumers) throws Exception {
     // TaskExecutor is only used with primitives
     if (controller instanceof PrimitiveAnalysisEngineController) {
       // in case the taskExecutor is not plugged in yet, wait until one
@@ -1088,10 +1103,11 @@ public class UimaDefaultMessageListenerC
           mux2.wait(20);
         }
       }
+      latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumers);
       // Create a Custom Thread Factory. Provide it with an instance of
       // PrimitiveController so that every thread can call it to initialize
       // the next available instance of a AE.
-      tf = new UimaAsThreadFactory(threadGroup, (PrimitiveAnalysisEngineController) controller);
+      tf = new UimaAsThreadFactory(threadGroup, (PrimitiveAnalysisEngineController) controller, latchToCountNumberOfTerminatedThreads);
       ((UimaAsThreadFactory)tf).setDaemon(true);
       // This ThreadExecutor will use custom thread factory instead of defult one
       ((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf);