You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/04/10 16:07:44 UTC
svn commit: r763938 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
Author: cwiklik
Date: Fri Apr 10 14:07:44 2009
New Revision: 763938
URL: http://svn.apache.org/viewvc?rev=763938&view=rev
Log:
UIMA-1109 Modified to support orderly shutdown of Uima AS service
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=763938&r1=763937&r2=763938&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Fri Apr 10 14:07:44 2009
@@ -52,6 +52,7 @@
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.Level;
import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.JmsException;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -67,7 +68,7 @@
private AnalysisEngineController controller;
private volatile boolean failed = false;
private Object mux = new Object();
- private UimaDefaultMessageListenerContainer __listenerRef;
+ private final UimaDefaultMessageListenerContainer __listenerRef;
private TaskExecutor taskExecutor = null;
private ConnectionFactory connectionFactory = null;
private Object mux2 = new Object();
@@ -82,7 +83,7 @@
// listener purpose is to increment number of children for
// an input CAS.
private ConcurrentMessageListener concurrentListener = null;
-
+ private volatile boolean awaitingShutdown = false;
public UimaDefaultMessageListenerContainer()
{
@@ -118,7 +119,11 @@
/**
* Stops this Listener
*/
- private synchronized void handleListenerFailure() {
+ private void handleListenerFailure() {
+ // If shutdown already, nothing to do
+ if ( awaitingShutdown ) {
+ return;
+ }
try {
if ( controller instanceof AggregateAnalysisEngineController ) {
String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
@@ -283,6 +288,10 @@
*/
protected void handleListenerSetupFailure( Throwable t, boolean alreadyHandled )
{
+ // If shutdown already, nothing to do
+ if ( awaitingShutdown ) {
+ return;
+ }
// If controller is stopping not need to recover the connection
if ( controller != null && controller.isStopped()) {
return;
@@ -342,21 +351,26 @@
new Object[] { controller.getComponentName(), getBrokerUrl() });
}
controller.notifyListenersWithInitializationStatus(new ResourceInitializationException(t));
+ if ( !controller.isStopped() && !controller.isAwaitingCacheCallbackNotification()) {
controller.stop();
+ }
}
protected void handleListenerException( Throwable t )
{
- t.printStackTrace();
- String endpointName =
+ // Already shutdown, nothing to do
+ if ( awaitingShutdown ) {
+ return;
+ }
+ t.printStackTrace();
+ String endpointName =
(getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName();
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+ if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
"handleListenerException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
new Object[] { endpointName, getBrokerUrl(), t });
- }
- super.handleListenerException(t);
-
+ }
+ super.handleListenerException(t);
}
private void allPropertiesSet() {
@@ -660,6 +674,9 @@
public void onException(JMSException arg0)
{
+ if ( awaitingShutdown ) {
+ return;
+ }
arg0.printStackTrace();
String endpointName =
(getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName();
@@ -684,35 +701,61 @@
System.out.println("Injected Updated Task Executor Into Listener For Destination:"+getDestination());
}
/**
- * Called when the object goes out of scope. Main task in this method is to list
- * all threads and wait until they terminate.
+ * Delegate shutdown to the super class
+ */
+ public void doDestroy() {
+ super.destroy();
+ }
+ /**
+ * Spins a shutdown thread and stops Sprint and ActiveMQ threads.
*
*/
public void destroy() {
- super.destroy();
+ if ( awaitingShutdown ) {
+ return;
+ }
+ awaitingShutdown = true;
+ if ( getDestination() != null ) {
+ System.out.println("Listener:"+getDestination()+" Destroy Called. Active Consumer Count:"+super.getActiveConsumerCount());
+ } else {
+ System.out.println("Listener:"+getDestinationName()+" Destroy Called. Active Consumer Count:"+super.getActiveConsumerCount());
+ }
// Spin a thread that will wait until all threads complete. This is needed to avoid
// memory leak caused by the fact that we did not wait to collect the threads
- //
Thread threadGroupDestroyer = new Thread(threadGroup.getParent().getParent(),"threadGroupDestroyer") {
public void run() {
+ try {
+ // stop Spring listener and ActiveMQ threads
+ __listenerRef.stop();
+ __listenerRef.closeConnection();
+ } catch( Exception e) {}
+ // If using non-default TaskExecutor, stop its threads
if ( taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
- ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdown();
- try {
- ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- } catch ( Exception e){}
+ if (!((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().isTerminated() ) {
+ ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdown();
+ try {
+ ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ } catch ( Exception e){}
+ }
}
+ // Shutdown the listener
+ __listenerRef.shutdown();
if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST) ) {
threadGroup.getParent().list();
}
-
+ // Wait until all threads are accounted for
while (threadGroup.activeCount() > 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
- threadGroup.destroy();
try {
+ synchronized(threadGroup ) {
+ if ( !threadGroup.isDestroyed() ) {
+ threadGroup.destroy();
+ }
+ }
System.out.println(">>>>>>>>>>>> Listener:"+getDestinationName()+" Thread Group Destroyed");
} catch( Exception e) {} // Ignore
}
@@ -757,4 +800,8 @@
((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().prestartAllCoreThreads();
}
}
+ public void stop() throws JmsException {
+ setAcceptMessagesWhileStopping(false);
+ destroy();
+ }
}