You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2012/01/06 22:08:31 UTC
svn commit: r1228406 -
/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
Author: cwiklik
Date: Fri Jan 6 21:08:31 2012
New Revision: 1228406
URL: http://svn.apache.org/viewvc?rev=1228406&view=rev
Log:
UIMA-2309 Use CountDownLatch to detect process thread exit
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1228406&r1=1228405&r2=1228406&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Jan 6 21:08:31 2012
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -114,6 +115,8 @@ public class UimaDefaultMessageListenerC
private boolean pluginThreadPool;
+ private CountDownLatch latchToCountNumberOfTerminatedThreads;
+
public UimaDefaultMessageListenerContainer() {
super();
// reset global static. This only effects unit testing as services are deployed
@@ -689,7 +692,7 @@ public class UimaDefaultMessageListenerC
// TaskExecutor provided in the spring xml. The custom thread pool initializes
// an instance of AE in a dedicated thread
if ( getMessageSelector() != null && !isGetMetaListener()) {
- initializeTaskExecutor();
+ initializeTaskExecutor(cc);
}
if ( threadPoolExecutor == null ) {
// Plug in TaskExecutor to Spring's Listener
@@ -987,7 +990,7 @@ public class UimaDefaultMessageListenerC
"destroy.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_listener_shutdown__INFO", new Object[] {controllerName,__listenerRef.getMessageSelector(),__listenerRef.getBrokerUrl()});
}
- __listenerRef.shutdown();
+ // __listenerRef.shutdown();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"destroy.run()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
@@ -1013,7 +1016,18 @@ public class UimaDefaultMessageListenerC
}
};
threadGroupDestroyer.start();
-
+ // Wait for process threads to finish. Each thread
+ // will count down the latch on exit. When all thread
+ // finish we can continue. Otherwise we block on the latch
+ try {
+ if ( latchToCountNumberOfTerminatedThreads != null && cc > 1) {
+ latchToCountNumberOfTerminatedThreads.await();
+ }
+ } catch( Exception ex) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+ "destroy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", ex);
+ }
}
@@ -1055,6 +1069,7 @@ public class UimaDefaultMessageListenerC
} else {
throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()");
}
+
}
}
@@ -1078,7 +1093,7 @@ public class UimaDefaultMessageListenerC
*
* @throws Exception
*/
- private void initializeTaskExecutor() throws Exception {
+ private void initializeTaskExecutor(int consumers) throws Exception {
// TaskExecutor is only used with primitives
if (controller instanceof PrimitiveAnalysisEngineController) {
// in case the taskExecutor is not plugged in yet, wait until one
@@ -1088,10 +1103,11 @@ public class UimaDefaultMessageListenerC
mux2.wait(20);
}
}
+ latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumers);
// Create a Custom Thread Factory. Provide it with an instance of
// PrimitiveController so that every thread can call it to initialize
// the next available instance of a AE.
- tf = new UimaAsThreadFactory(threadGroup, (PrimitiveAnalysisEngineController) controller);
+ tf = new UimaAsThreadFactory(threadGroup, (PrimitiveAnalysisEngineController) controller, latchToCountNumberOfTerminatedThreads);
((UimaAsThreadFactory)tf).setDaemon(true);
// This ThreadExecutor will use custom thread factory instead of defult one
((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf);