You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/02/22 03:55:35 UTC
svn commit: r1073208 - in /uima/uima-as/trunk:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Author: cwiklik
Date: Tue Feb 22 02:55:34 2011
New Revision: 1073208
URL: http://svn.apache.org/viewvc?rev=1073208&view=rev
Log:
UIMA-2065 Added shutdown hook to enable clean shutdown
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1073208&r1=1073207&r2=1073208&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Tue Feb 22 02:55:34 2011
@@ -43,7 +43,9 @@ import javax.naming.InitialContext;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
@@ -225,16 +227,27 @@ public class BaseUIMAAsynchronousEngine_
}
public void stop() {
- if (!running) {
- return;
- }
- super.stop();
synchronized (connectionMux) {
- running = false;
+ super.doStop();
+ if (!running) {
+ return;
+ }
+ running = false;
if (super.serviceDelegate != null) {
// Cancel all timers and purge lists
super.serviceDelegate.cleanup();
}
+ if (sender != null) {
+ sender.doStop();
+ }
+ if (initialized) {
+ try {
+ consumerSession.close();
+ ((ActiveMQMessageConsumer)consumer).stop();
+ consumer.close();
+ } catch (Exception exx) {}
+ }
+
try {
// SharedConnection object manages a single JMS connection to
// the broker. If the client is scaled out in the same JVM, the
@@ -267,9 +280,6 @@ public class BaseUIMAAsynchronousEngine_
} finally {
sharedConnectionSemaphore.release();
}
- if (sender != null) {
- sender.doStop();
- }
// Undeploy all containers
undeploy();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
@@ -279,13 +289,6 @@ public class BaseUIMAAsynchronousEngine_
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_undeployed_containers__INFO");
}
- if (initialized) {
- try {
- consumerSession.close();
- consumer.close();
- } catch (JMSException exx) {
- }
- }
// unregister client
if (jmxManager != null) {
jmxManager.unregisterMBean(clientJmxObjectName);
@@ -543,6 +546,10 @@ public class BaseUIMAAsynchronousEngine_
*/
public synchronized void initialize(Map anApplicationContext)
throws ResourceInitializationException {
+ // Add ShutdownHook to make sure the connection to the
+ // broker is always closed on process exit.
+ Runtime.getRuntime().addShutdownHook(
+ new Thread(new UimaASShutdownHook(this)));
// Check the version of uimaj that UIMA AS was built with, against the UIMA Core version. If not the same throw Exception
if (!UimaAsVersion.getUimajFullVersionString().equals(UimaVersion.getFullVersionString())) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1073208&r1=1073207&r2=1073208&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Tue Feb 22 02:55:34 2011
@@ -29,6 +29,8 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
@@ -195,6 +197,8 @@ public abstract class BaseUIMAAsynchrono
protected static SharedConnection sharedConnection = null;
+ private ExecutorService exec = Executors.newFixedThreadPool(1);
+
abstract public String getEndPointName() throws Exception;
abstract protected TextMessage createTextMessage() throws Exception;
@@ -409,7 +413,7 @@ public abstract class BaseUIMAAsynchrono
}
}
- public void stop() {
+ public void doStop() {
synchronized (stopMux) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop",
@@ -420,7 +424,8 @@ public abstract class BaseUIMAAsynchrono
return;
}
- running = false;
+ exec.shutdownNow();
+
casQueueProducerReady = false;
if (serviceDelegate != null) {
serviceDelegate.cancelDelegateTimer();
@@ -509,12 +514,12 @@ public abstract class BaseUIMAAsynchrono
// Every thread requesting a CAS adds an entry to this
// queue.
CasQueueEntry entry = threadQueue.take();
- if (!running) {
- return; // client API has been stopped
- }
CAS cas = null;
long startTime = System.nanoTime();
// Wait for a free CAS instance
+ if (!running || asynchManager == null) {
+ return; // client API has been stopped
+ }
if (remoteService) {
cas = asynchManager.getNewCas("ApplicationCasPoolContext");
} else {
@@ -1312,8 +1317,9 @@ public abstract class BaseUIMAAsynchrono
if (exception != null && cachedRequest != null) {
cachedRequest.setException(exception);
if (exception instanceof AnalysisEngineProcessException
- || (exception.getCause() != null && (exception.getCause() instanceof AnalysisEngineProcessException || exception
- .getCause() instanceof ServiceShutdownException))) {
+ || (exception.getCause() != null &&
+ (exception.getCause() instanceof AnalysisEngineProcessException ||
+ exception.getCause() instanceof ServiceShutdownException))) {
// Indicate that this is a process exception.
cachedRequest.setProcessException();
}
@@ -1649,58 +1655,78 @@ public abstract class BaseUIMAAsynchrono
* Listener method receiving JMS Messages from the response queue.
*
*/
- public void onMessage(Message message) {
- try {
-
+ public void onMessage(final Message message) {
+ // Process message in a separate thread. Previously the message was processed in ActiveMQ dispatch thread.
+ // This onMessage() method is called by ActiveMQ code from a critical region protected with a lock. The lock
+ // is only released if this method returns. Running in a dispatch thread caused a hang when an application
+ // decided to call System.exit() in any of its callback listener methods. The UIMA AS client adds a
+ // ShutdownHoook to the JVM to enable orderly shutdown which includes stopping JMS Consumer, JMS Producer
+ // and finally stopping JMS Connection. The ShutdownHook support was added to the client in case the
+ // application doesnt call client's stop() method. Now, the hang was caused by the fact that the dispatch
+ // thread was used to call System.exit() which in turn executed client's ShutdownHook code. The ShutdownHook
+ // code runs in a separate thread, but the the JVM blocks the dispatch thread until the ShutdownHook
+ // finishes. It never will though, since the ShutdownHook is calling ActiveMQSession.close() which tries to enter
+ // the same critical region that the dispatch thread is still stuck into. DEADLOCK.
+ // The code below uses a simple FixedThreadPool Executor with a single thread. This thread is reused instead
+ // creating one on the fly.
+ exec.execute( new Runnable() {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_msg_FINEST",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
- }
- if (!message.propertyExists(AsynchAEMessage.Command)) {
- return;
- }
+ public void run() {
+ try {
+
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_msg_FINEST",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+ }
+ if (!message.propertyExists(AsynchAEMessage.Command)) {
+ return;
+ }
+
+ int command = message.getIntProperty(AsynchAEMessage.Command);
+ if (AsynchAEMessage.CollectionProcessComplete == command) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_cpc_reply_FINE",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+ }
+ handleCollectionProcessCompleteReply(message);
+ } else if (AsynchAEMessage.GetMeta == command) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+ }
+ handleMetadataReply(message);
+ } else if (AsynchAEMessage.Process == command) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINE",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+ }
+ handleProcessReply(message, true, null);
+ } else if (AsynchAEMessage.ServiceInfo == command) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_received_service_info_FINEST",
+ new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
+ }
+ handleServiceInfo(message);
+ }
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
+ }
- int command = message.getIntProperty(AsynchAEMessage.Command);
- if (AsynchAEMessage.CollectionProcessComplete == command) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_cpc_reply_FINE",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
- }
- handleCollectionProcessCompleteReply(message);
- } else if (AsynchAEMessage.GetMeta == command) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
- }
- handleMetadataReply(message);
- } else if (AsynchAEMessage.Process == command) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINE",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
- }
- handleProcessReply(message, true, null);
- } else if (AsynchAEMessage.ServiceInfo == command) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_received_service_info_FINEST",
- new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) });
}
- handleServiceInfo(message);
- }
- } catch (Exception e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
- "onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_exception__WARNING", e);
+
}
+ });
- }
}
/**
@@ -2410,7 +2436,11 @@ public abstract class BaseUIMAAsynchrono
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_error_while_sending_msg__WARNING",
new Object[] { aDestination, aFailure });
}
- stop();
+ try {
+ stop();
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
}
/**
@@ -2609,11 +2639,17 @@ public abstract class BaseUIMAAsynchrono
synchronized(destroyMux) {
// Check if all clients have terminated and only than stop the shared connection
if (getClientCount() == 0 && connection != null
- && !((ActiveMQConnection) connection).isClosed()) {
+ && !((ActiveMQConnection) connection).isClosed()
+ && !((ActiveMQConnection) connection).isClosing()) {
try {
stop = true;
connection.stop();
connection.close();
+ while( !((ActiveMQConnection) connection).isClosed() ) {
+ try {
+ destroyMux.wait(100);
+ } catch( InterruptedException exx) {}
+ }
} catch (Exception e) {
/* ignore */
}
@@ -2636,4 +2672,20 @@ public abstract class BaseUIMAAsynchrono
}
}
}
+ public class UimaASShutdownHook implements Runnable {
+ UimaAsynchronousEngine asEngine=null;
+ public UimaASShutdownHook( UimaAsynchronousEngine asEngine) {
+ this.asEngine = asEngine;
+ }
+ public void run() {
+ try {
+ if ( asEngine != null ) {
+ asEngine.stop();
+ }
+ } catch( Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+
+ }
}