You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/02/09 21:54:48 UTC
svn commit: r1069089 -
/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
Author: cwiklik
Date: Wed Feb 9 20:54:47 2011
New Revision: 1069089
URL: http://svn.apache.org/viewvc?rev=1069089&view=rev
Log:
UIMA-2038 Modified to support clean shutdown
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1069089&r1=1069088&r2=1069089&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Wed Feb 9 20:54:47 2011
@@ -19,16 +19,17 @@
package org.apache.uima.adapter.jms.activemq;
-import java.lang.reflect.Method;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
@@ -37,7 +38,6 @@ import javax.jms.TemporaryQueue;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.InputChannel;
@@ -45,9 +45,9 @@ import org.apache.uima.aae.UIMAEE_Consta
import org.apache.uima.aae.UimaAsThreadFactory;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
-import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.error.ErrorHandler;
import org.apache.uima.aae.error.Threshold;
@@ -104,10 +104,20 @@ public class UimaDefaultMessageListenerC
private volatile boolean awaitingShutdown = false;
- private boolean brokerWithDaemonThreads = false;
+ // When set to true, this flag prevents spring from using refreshUntilSuccessful
+ // logic which attempts to recover the connection. This flag is set to true during the
+ // service shutdown
+ public static volatile boolean terminating;
+
+ private ThreadPoolExecutor threadPoolExecutor = null;
+
+ private boolean pluginThreadPool;
public UimaDefaultMessageListenerContainer() {
super();
+ // reset global static. This only effects unit testing as services are deployed
+ // in the same process.
+ terminating = false;
UIMAFramework.getLogger(CLASS_NAME).setLevel(Level.WARNING);
__listenerRef = this;
setRecoveryInterval(5);
@@ -121,7 +131,24 @@ public class UimaDefaultMessageListenerC
this();
this.freeCasQueueListener = freeCasQueueListener;
}
+ /**
+ * Overriden Spring's method that tries to recover from lost connection. We dont
+ * want to recover when the service is stopping.
+ */
+ protected void refreshConnectionUntilSuccessful() {
+ if ( !terminating ) {
+ super.refreshConnectionUntilSuccessful();
+ }
+ }
+ protected void recoverAfterListenerSetupFailure() {
+ if ( !terminating ) {
+ super.recoverAfterListenerSetupFailure();
+ }
+ }
+ public void setTerminating() {
+ terminating = true;
+ }
public void setController(AnalysisEngineController aController) {
controller = aController;
}
@@ -389,11 +416,8 @@ public class UimaDefaultMessageListenerC
*/
protected void handleListenerSetupFailure(Throwable t, boolean alreadyHandled) {
// If shutdown already, nothing to do
- if (awaitingShutdown) {
- return;
- }
- // If controller is stopping not need to recover the connection
- if (controller != null && controller.isStopped()) {
+ // If controller is stopping no need to recover the connection
+ if (awaitingShutdown || terminating || (controller != null && controller.isStopped()) ) {
return;
}
if ( controller != null ) {
@@ -520,8 +544,6 @@ public class UimaDefaultMessageListenerC
try {
injectConnectionFactory();
- initializeTaskExecutor();
- injectTaskExecutor();
super.initialize();
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -561,14 +583,15 @@ public class UimaDefaultMessageListenerC
super.setMessageListener(messageListener);
}
}
-
+ public void afterPropertiesSet() {
+ afterPropertiesSet(true);
+ }
/**
* Called by Spring and some Uima AS components when all properties have been set. This method
* spins a thread in which the listener is initialized.
*/
- public void afterPropertiesSet() {
+ public void afterPropertiesSet(final boolean propagate) {
if (endpoint != null) {
-
// Override the prefetch size. The dd2spring always sets this to 1 which
// may effect the throughput of a service. Change the prefetch size to
// number of consumer threads defined in DD.
@@ -590,7 +613,8 @@ public class UimaDefaultMessageListenerC
super.setConcurrentConsumers(1);
if (cc > 1) {
try {
- concurrentListener = new ConcurrentMessageListener(cc, ml, getDestinationName());
+ String prefix = endpoint.getDelegateKey()+" Reply Thread";
+ concurrentListener = new ConcurrentMessageListener(cc, ml, getDestinationName(), threadGroup,prefix);
super.setMessageListener(concurrentListener);
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -607,11 +631,11 @@ public class UimaDefaultMessageListenerC
return;
}
} else {
- super.setMessageListener(ml);
+ pluginThreadPool = true;
}
} else {
- super.setMessageListener(ml);
super.setConcurrentConsumers(cc);
+ pluginThreadPool = true;
}
Thread t = new Thread(threadGroup, new Runnable() {
public void run() {
@@ -643,14 +667,23 @@ public class UimaDefaultMessageListenerC
}
// Plug in connection Factory to Spring's Listener
__listenerRef.injectConnectionFactory();
+
+ if ( pluginThreadPool ) {
+ setUimaASThreadPoolExecutor(cc);
+ }
+
// Initialize the TaskExecutor. This call injects a custom Thread Pool into the
// TaskExecutor provided in the spring xml. The custom thread pool initializes
// an instance of AE in a dedicated thread
initializeTaskExecutor();
- // Plug in TaskExecutor to Spring's Listener
- __listenerRef.injectTaskExecutor();
- // Notify Spring Listener that all properties are ready
- __listenerRef.allPropertiesSet();
+ if ( threadPoolExecutor == null ) {
+ // Plug in TaskExecutor to Spring's Listener
+ __listenerRef.injectTaskExecutor();
+ }
+ if ( propagate ) {
+ // Notify Spring Listener that all properties are ready
+ __listenerRef.allPropertiesSet();
+ }
if (isActiveMQDestination() && destination != null) {
destinationName = ((ActiveMQDestination) destination).getPhysicalName();
}
@@ -686,6 +719,9 @@ public class UimaDefaultMessageListenerC
"afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_jms_listener_failed_WARNING",
new Object[] { destination, getBrokerUrl(), e });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", e);
}
}
});
@@ -747,7 +783,9 @@ public class UimaDefaultMessageListenerC
((TempDestinationResolver) resolver).setListener(this);
super.setDestinationResolver(resolver);
}
-
+ /**
+ * Closes shares connection to a broker
+ **/
public void closeConnection() throws Exception {
try {
setRecoveryInterval(0);
@@ -872,11 +910,10 @@ public class UimaDefaultMessageListenerC
*
*/
public void destroy() {
+
if (awaitingShutdown) {
return;
}
- awaitingShutdown = true;
-
// Spin a thread that will wait until all threads complete. This is needed to avoid
// memory leak caused by the fact that we did not wait to collect the threads
Thread threadGroupDestroyer = new Thread(threadGroup.getParent().getParent(),
@@ -884,32 +921,23 @@ public class UimaDefaultMessageListenerC
public void run() {
try {
if ( !__listenerRef.awaitingShutdown && __listenerRef.isRunning() ) {
- // stop Spring listener and ActiveMQ threads
+ awaitingShutdown = true;
__listenerRef.stop();
- __listenerRef.closeConnection();
+ // If using non-default TaskExecutor, stop its threads
+ if (taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
+ ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdownNow();
+ } else if (concurrentListener != null) {
+ // Stop internal Executor
+ concurrentListener.stop();
+ } else if ( threadPoolExecutor != null ) {
+ threadPoolExecutor.shutdownNow();
+ }
+ __listenerRef.shutdown();
}
} catch (Exception e) {
+ e.printStackTrace();
}
- // If using non-default TaskExecutor, stop its threads
- if (taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
- ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdown();
- // Since the calling thread may be one of those managed by the executor allow
- // for one open thread when checking active thread count.
- while (((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().getActiveCount() > 1
- && !((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor()
- .isTerminated()) {
- try {
- ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().awaitTermination(200,
- TimeUnit.MILLISECONDS);
- } catch (Exception e) {
- }
- }
- } else if (concurrentListener != null) {
- // Stop internal Executor
- concurrentListener.stop();
- }
- // Shutdown the listener
- __listenerRef.shutdown();
+
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
threadGroup.getParent().list();
}
@@ -957,6 +985,7 @@ public class UimaDefaultMessageListenerC
}
+
private boolean isAmqThread(Thread t) {
String tName = t.getName();
// The following is necessary to account for the AMQ threads
@@ -969,6 +998,34 @@ public class UimaDefaultMessageListenerC
}
return true;
}
+ private void setUimaASThreadPoolExecutor(int consumentCount) throws Exception{
+ super.setMessageListener(ml);
+ // create task executor with custom thread pool for:
+ // 1) GetMeta request processing
+ // 2) ReleaseCAS request
+ if ( taskExecutor == null ) {
+ UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
+ tf.setDaemon(true);
+ if ( isFreeCasQueueListener()) {
+ tf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread");
+ } else if ( isGetMetaListener() ) {
+ tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+ } else if ( getDestination() != null && getMessageSelector() != null ) {
+ tf.setThreadNamePrefix(controller.getComponentName() + " Process Thread");
+ } else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
+ tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+ } else {
+ throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()");
+ }
+ ExecutorService es = Executors.newFixedThreadPool(consumentCount,tf);
+ if ( es instanceof ThreadPoolExecutor ) {
+ threadPoolExecutor = (ThreadPoolExecutor)es;
+ super.setTaskExecutor(es);
+ }
+ }
+ }
+
+
/**
* Called by Spring to inject TaskExecutor
*/
@@ -1002,6 +1059,7 @@ public class UimaDefaultMessageListenerC
// PrimitiveController so that every thread can call it to initialize
// the next available instance of a AE.
tf = new UimaAsThreadFactory(threadGroup, (PrimitiveAnalysisEngineController) controller);
+ ((UimaAsThreadFactory)tf).setDaemon(true);
// This ThreadExecutor will use custom thread factory instead of defult one
((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf);
// Initialize the thread pool
@@ -1014,6 +1072,10 @@ public class UimaDefaultMessageListenerC
controller.changeState(ServiceState.RUNNING);
}
}
+
+ if ( threadPoolExecutor != null ) {
+ threadPoolExecutor.prestartAllCoreThreads();
+ }
}
public void stop() throws JmsException {