You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:16:24 UTC
svn commit: r810547 [5/5] -
/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=810547&r1=810546&r2=810547&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Wed Sep 2 15:16:23 2009
@@ -34,7 +34,6 @@
import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.command.ActiveMQDestination;
@@ -59,198 +58,238 @@
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+public class UimaDefaultMessageListenerContainer extends DefaultMessageListenerContainer implements
+ ExceptionListener {
+ private static final Class CLASS_NAME = UimaDefaultMessageListenerContainer.class;
+
+ private String destinationName = "";
+
+ private Endpoint endpoint;
+
+ private volatile boolean freeCasQueueListener;
+
+ private AnalysisEngineController controller;
+
+ private volatile boolean failed = false;
+
+ private Object mux = new Object();
+
+ private final UimaDefaultMessageListenerContainer __listenerRef;
+
+ private TaskExecutor taskExecutor = null;
+
+ private ConnectionFactory connectionFactory = null;
+
+ private Object mux2 = new Object();
-public class UimaDefaultMessageListenerContainer extends DefaultMessageListenerContainer
-implements ExceptionListener
-{
- private static final Class CLASS_NAME = UimaDefaultMessageListenerContainer.class;
- private String destinationName="";
- private Endpoint endpoint;
- private volatile boolean freeCasQueueListener;
- private AnalysisEngineController controller;
- private volatile boolean failed = false;
- private Object mux = new Object();
- private final UimaDefaultMessageListenerContainer __listenerRef;
- private TaskExecutor taskExecutor = null;
- private ConnectionFactory connectionFactory = null;
- private Object mux2 = new Object();
private ThreadGroup threadGroup = null;
- private ThreadFactory tf = null;
- // stores number of consumer threads
- private int cc=0;
- // stores message listener plugged in by Spring
- private Object ml=null;
- // A new listener will be injected between
- // spring and JmsInputChannel Pojo Listener. This
- // listener purpose is to increment number of children for
- // an input CAS.
+
+ private ThreadFactory tf = null;
+
+ // stores number of consumer threads
+ private int cc = 0;
+
+ // stores message listener plugged in by Spring
+ private Object ml = null;
+
+ // A new listener will be injected between
+ // spring and JmsInputChannel Pojo Listener. This
+ // listener purpose is to increment number of children for
+ // an input CAS.
private ConcurrentMessageListener concurrentListener = null;
+
private volatile boolean awaitingShutdown = false;
-
- public UimaDefaultMessageListenerContainer()
- {
- super();
- UIMAFramework.getLogger(CLASS_NAME).setLevel(Level.WARNING);
- __listenerRef = this;
+
+ public UimaDefaultMessageListenerContainer() {
+ super();
+ UIMAFramework.getLogger(CLASS_NAME).setLevel(Level.WARNING);
+ __listenerRef = this;
setRecoveryInterval(5);
- setAcceptMessagesWhileStopping(false);
- setExceptionListener(this);
- threadGroup = new ThreadGroup("ListenerThreadGroup_"+Thread.currentThread().getThreadGroup().getName());
- }
- public UimaDefaultMessageListenerContainer(boolean freeCasQueueListener)
- {
- this();
- this.freeCasQueueListener = freeCasQueueListener;
- }
- public void setController( AnalysisEngineController aController)
- {
- controller = aController;
- }
- /**
- *
- * @param t
- * @return
- */
- private boolean disableListener( Throwable t)
- {
- System.out.println(t.toString());
- if ( t.toString().indexOf("SharedConnectionNotInitializedException") > 0 ||
- ( t instanceof JMSException && t.getCause() != null && t.getCause() instanceof ConnectException ) )
- return true;
- return false;
- }
- /**
- * Stops this Listener
- */
- private void handleListenerFailure() {
- // If shutdown already, nothing to do
- if ( awaitingShutdown ) {
+ setAcceptMessagesWhileStopping(false);
+ setExceptionListener(this);
+ threadGroup = new ThreadGroup("ListenerThreadGroup_"
+ + Thread.currentThread().getThreadGroup().getName());
+ }
+
+ public UimaDefaultMessageListenerContainer(boolean freeCasQueueListener) {
+ this();
+ this.freeCasQueueListener = freeCasQueueListener;
+ }
+
+ public void setController(AnalysisEngineController aController) {
+ controller = aController;
+ }
+
+ /**
+ *
+ * @param t
+ * @return
+ */
+ private boolean disableListener(Throwable t) {
+ System.out.println(t.toString());
+ if (t.toString().indexOf("SharedConnectionNotInitializedException") > 0
+ || (t instanceof JMSException && t.getCause() != null && t.getCause() instanceof ConnectException))
+ return true;
+ return false;
+ }
+
+ /**
+ * Stops this Listener
+ */
+ private void handleListenerFailure() {
+ // If shutdown already, nothing to do
+ if (awaitingShutdown) {
return;
}
try {
- if ( controller instanceof AggregateAnalysisEngineController ) {
- String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
+ if (controller instanceof AggregateAnalysisEngineController) {
+ String delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(endpoint.getEndpoint());
InputChannel iC = null;
String queueName = null;
- if ( endpoint.getDestination() != null ) {
+ if (endpoint.getDestination() != null) {
queueName = endpoint.getDestination().toString();
} else {
queueName = endpoint.getEndpoint();
}
- iC = ((AggregateAnalysisEngineController)controller).getInputChannel(queueName);
- if ( iC != null ) {
+ iC = ((AggregateAnalysisEngineController) controller).getInputChannel(queueName);
+ if (iC != null) {
iC.destroyListener(queueName, delegateKey);
} else {
- System.out.println(">>> Listener Unable To LookUp InputChannel For Queue:"+queueName);
+ System.out.println(">>> Listener Unable To LookUp InputChannel For Queue:" + queueName);
}
}
- } catch( Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
- /**
- * Handles failure on a temp queue
- * @param t
- */
+
+ /**
+ * Handles failure on a temp queue
+ *
+ * @param t
+ */
private void handleTempQueueFailure(Throwable t) {
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
- new Object[] { endpoint.getDestination(), getBrokerUrl(), t });
- }
- // Check if the failure is due to the failed connection. Spring (and ActiveMQ) dont seem to provide
- // the cause. Just the top level IllegalStateException with a text message. This is what we need to
- // check for.
- if ( t instanceof javax.jms.IllegalStateException && t.getMessage().equals("The Consumer is closed")) {
- if ( controller != null && controller instanceof AggregateAnalysisEngineController ) {
- String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
+ "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_jms_listener_failed_WARNING",
+ new Object[] { endpoint.getDestination(), getBrokerUrl(), t });
+ }
+ // Check if the failure is due to the failed connection. Spring (and ActiveMQ) dont seem to
+ // provide
+ // the cause. Just the top level IllegalStateException with a text message. This is what we need
+ // to
+ // check for.
+ if (t instanceof javax.jms.IllegalStateException
+ && t.getMessage().equals("The Consumer is closed")) {
+ if (controller != null && controller instanceof AggregateAnalysisEngineController) {
+ String delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(endpoint.getEndpoint());
try {
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
- "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_listener_INFO",
- new Object[] { controller.getComponentName(), endpoint.getDestination(),delegateKey });
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ this.getClass().getName(),
+ "handleTempQueueFailure",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_stopping_listener_INFO",
+ new Object[] { controller.getComponentName(), endpoint.getDestination(),
+ delegateKey });
}
- // Stop current listener
+ // Stop current listener
handleListenerFailure();
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
- "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_listener_INFO",
- new Object[] { controller.getComponentName(), endpoint.getDestination() });
+ "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_stopped_listener_INFO",
+ new Object[] { controller.getComponentName(), endpoint.getDestination() });
}
- } catch ( Exception e ) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
- } else if ( disableListener(t)) {
+ } else if (disableListener(t)) {
handleQueueFailure(t);
}
}
-
+
private ErrorHandler fetchGetMetaErrorHandler() {
ErrorHandler handler = null;
Iterator it = controller.getErrorHandlerChain().iterator();
- // Find the error handler for GetMeta in the Error Handler List provided in the
- // deployment descriptor
- while ( it.hasNext() )
- {
- handler = (ErrorHandler)it.next();
- if ( handler instanceof GetMetaErrorHandler )
- {
+ // Find the error handler for GetMeta in the Error Handler List provided in the
+ // deployment descriptor
+ while (it.hasNext()) {
+ handler = (ErrorHandler) it.next();
+ if (handler instanceof GetMetaErrorHandler) {
return handler;
}
}
return null;
}
+
/**
* Handles failures on non-temp queues
+ *
* @param t
*/
private void handleQueueFailure(Throwable t) {
- final String endpointName =
- (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName();
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+ final String endpointName = (getDestination() == null) ? ""
+ : ((ActiveMQDestination) getDestination()).getPhysicalName();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
- new Object[] { endpointName, getBrokerUrl(), t });
+ "handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_jms_listener_failed_WARNING",
+ new Object[] { endpointName, getBrokerUrl(), t });
}
boolean terminate = true;
- // Check if the failure is severe enough to disable this listener. Whether or not this listener is actully
- // disabled depends on the action associated with GetMeta Error Handler. If GetMeta Error Handler is
- // configured to terminate the service on failure, this listener will be terminated and the entire service
- // will be stopped.
- if ( disableListener(t) ) {
+ // Check if the failure is severe enough to disable this listener. Whether or not this listener
+ // is actully
+ // disabled depends on the action associated with GetMeta Error Handler. If GetMeta Error
+ // Handler is
+ // configured to terminate the service on failure, this listener will be terminated and the
+ // entire service
+ // will be stopped.
+ if (disableListener(t)) {
endpoint.setReplyDestinationFailed();
- // If this is a listener attached to the Aggregate Controller, use GetMeta Error
- // Thresholds defined to determine what to do next after failure. Either terminate
- // the service or disable the delegate with which this listener is associated with
- if ( controller != null && controller instanceof AggregateAnalysisEngineController )
- {
+ // If this is a listener attached to the Aggregate Controller, use GetMeta Error
+ // Thresholds defined to determine what to do next after failure. Either terminate
+ // the service or disable the delegate with which this listener is associated with
+ if (controller != null && controller instanceof AggregateAnalysisEngineController) {
ErrorHandler handler = fetchGetMetaErrorHandler();
- // Fetch a Map containing thresholds for GetMeta for each delegate.
- Map thresholds = handler.getEndpointThresholdMap();
- // Lookup delegate's key using delegate's endpoint name
- String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
- // If the delegate has a threshold defined on GetMeta apply Action defined
- if ( delegateKey != null && thresholds.containsKey(delegateKey))
- {
- // Fetch the Threshold object containing error configuration
+ // Fetch a Map containing thresholds for GetMeta for each delegate.
+ Map thresholds = handler.getEndpointThresholdMap();
+ // Lookup delegate's key using delegate's endpoint name
+ String delegateKey = ((AggregateAnalysisEngineController) controller)
+ .lookUpDelegateKey(endpoint.getEndpoint());
+ // If the delegate has a threshold defined on GetMeta apply Action defined
+ if (delegateKey != null && thresholds.containsKey(delegateKey)) {
+ // Fetch the Threshold object containing error configuration
Threshold threshold = (Threshold) thresholds.get(delegateKey);
- // Check if the delegate needs to be disabled
+ // Check if the delegate needs to be disabled
if (threshold.getAction().equalsIgnoreCase(ErrorHandler.DISABLE)) {
- // The disable delegate method takes a list of delegates
+ // The disable delegate method takes a list of delegates
List list = new ArrayList();
- // Add the delegate to disable to the list
+ // Add the delegate to disable to the list
list.add(delegateKey);
try {
- System.out.println(">>>> Controller:"+controller.getComponentName()+" Disabling Listener On Queue:"+endpoint.getEndpoint()+". Component's "+delegateKey+" Broker:"+getBrokerUrl()+" is Invalid");
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
- "handleQueueFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_disabled_delegate_bad_broker__INFO",
- new Object[] { controller.getComponentName(), delegateKey, getBrokerUrl() });
+ System.out.println(">>>> Controller:" + controller.getComponentName()
+ + " Disabling Listener On Queue:" + endpoint.getEndpoint() + ". Component's "
+ + delegateKey + " Broker:" + getBrokerUrl() + " is Invalid");
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME)
+ .logrb(
+ Level.INFO,
+ this.getClass().getName(),
+ "handleQueueFailure",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_disabled_delegate_bad_broker__INFO",
+ new Object[] { controller.getComponentName(), delegateKey,
+ getBrokerUrl() });
}
- // Remove the delegate from the routing table.
+ // Remove the delegate from the routing table.
((AggregateAnalysisEngineController) controller).disableDelegates(list);
- terminate = false; //just disable the delegate and continue
+ terminate = false; // just disable the delegate and continue
} catch (Exception e) {
e.printStackTrace();
terminate = true;
@@ -259,202 +298,216 @@
}
}
}
- System.out.println("****** Unable To Connect Listener To Broker:"+getBrokerUrl());
- System.out.println("****** Closing Listener on Queue:"+endpoint.getEndpoint());
+ System.out.println("****** Unable To Connect Listener To Broker:" + getBrokerUrl());
+ System.out.println("****** Closing Listener on Queue:" + endpoint.getEndpoint());
setRecoveryInterval(0);
-
- // Spin a shutdown thread to terminate listener.
+
+ // Spin a shutdown thread to terminate listener.
new Thread() {
- public void run()
- {
- try
- {
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+ public void run() {
+ try {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_disable_listener__WARNING",
- new Object[] { endpointName, getBrokerUrl() });
+ "handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_disable_listener__WARNING",
+ new Object[] { endpointName, getBrokerUrl() });
}
shutdown();
+ } catch (Exception e) {
+ e.printStackTrace();
}
- catch( Exception e) { e.printStackTrace();}
}
}.start();
- if ( terminate )
- {
+ if (terminate) {
terminate(t);
}
}
+
/**
- * This method is called by Spring when a listener fails
- */
- protected void handleListenerSetupFailure( Throwable t, boolean alreadyHandled )
- {
- // If shutdown already, nothing to do
- if ( awaitingShutdown ) {
+ * This method is called by Spring when a listener fails
+ */
+ protected void handleListenerSetupFailure(Throwable t, boolean alreadyHandled) {
+ // If shutdown already, nothing to do
+ if (awaitingShutdown) {
return;
}
- // If controller is stopping not need to recover the connection
- if ( controller != null && controller.isStopped()) {
- return;
- }
- if ( endpoint == null ) {
- super.handleListenerSetupFailure(t, true);
- String controllerId = "";
- if (controller != null ) {
- controllerId = "Uima AS Service:"+controller.getComponentName();
+ // If controller is stopping not need to recover the connection
+ if (controller != null && controller.isStopped()) {
+ return;
+ }
+ if (endpoint == null) {
+ super.handleListenerSetupFailure(t, true);
+ String controllerId = "";
+ if (controller != null) {
+ controllerId = "Uima AS Service:" + controller.getComponentName();
}
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_listener_connection_failure__WARNING",
- new Object[] { controllerId, getBrokerUrl() });
- }
- System.out.println(controllerId+" Listener Unable to Connect to Broker:" +getBrokerUrl()+" Retrying ....");
- // This code executes during initialization of the service. The Endpoint is not yet
+ "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_listener_connection_failure__WARNING",
+ new Object[] { controllerId, getBrokerUrl() });
+ }
+ System.out.println(controllerId + " Listener Unable to Connect to Broker:" + getBrokerUrl()
+ + " Retrying ....");
+ // This code executes during initialization of the service. The Endpoint is not yet
// available. The connection to a broker cannot be established. Keep trying until
// the broker becomes available.
refreshConnectionUntilSuccessful();
- System.out.println(controllerId+" Listener Established Connection to Broker:" +getBrokerUrl());
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+ System.out.println(controllerId + " Listener Established Connection to Broker:"
+ + getBrokerUrl());
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_listener_connection_recovered__WARNING",
- new Object[] { controllerId, getBrokerUrl() });
+ "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_listener_connection_recovered__WARNING",
+ new Object[] { controllerId, getBrokerUrl() });
}
return;
- }
+ }
- // Connection failure that occurs AFTER the service initialized.
- t.printStackTrace();
-
- synchronized( mux ) {
- if ( !failed ) {
- // Check if this listener is attached to a temp queue. If so, this is a listener
- // on a reply queue. Handle temp queue listener failure differently than an
- // input queue listener.
- if ( endpoint.isTempReplyDestination()) {
- handleTempQueueFailure(t);
- } else {
- // Handle non-temp queue failure
- handleQueueFailure(t);
- }
- }
- failed = true;
- }
- }
+ // Connection failure that occurs AFTER the service initialized.
+ t.printStackTrace();
- private void terminate(Throwable t) {
+ synchronized (mux) {
+ if (!failed) {
+ // Check if this listener is attached to a temp queue. If so, this is a listener
+ // on a reply queue. Handle temp queue listener failure differently than an
+ // input queue listener.
+ if (endpoint.isTempReplyDestination()) {
+ handleTempQueueFailure(t);
+ } else {
+ // Handle non-temp queue failure
+ handleQueueFailure(t);
+ }
+ }
+ failed = true;
+ }
+ }
+
+ private void terminate(Throwable t) {
// ****************************************
- // terminate the service
+ // terminate the service
// ****************************************
- System.out.println(">>>> Terminating Controller:"+controller.getComponentName()+" Unable To Initialize Listener Due to Invalid Broker URL:"+getBrokerUrl());
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+ System.out.println(">>>> Terminating Controller:" + controller.getComponentName()
+ + " Unable To Initialize Listener Due to Invalid Broker URL:" + getBrokerUrl());
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "terminate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_terminate_service_dueto_bad_broker__WARNING",
- new Object[] { controller.getComponentName(), getBrokerUrl() });
+ "terminate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_terminate_service_dueto_bad_broker__WARNING",
+ new Object[] { controller.getComponentName(), getBrokerUrl() });
}
controller.notifyListenersWithInitializationStatus(new ResourceInitializationException(t));
- if ( !controller.isStopped() && !controller.isAwaitingCacheCallbackNotification()) {
- controller.stop();
+ if (!controller.isStopped() && !controller.isAwaitingCacheCallbackNotification()) {
+ controller.stop();
}
- }
- protected void handleListenerException( Throwable t )
- {
- // Already shutdown, nothing to do
- if ( awaitingShutdown ) {
+ }
+
+ protected void handleListenerException(Throwable t) {
+ // Already shutdown, nothing to do
+ if (awaitingShutdown) {
return;
}
t.printStackTrace();
- String endpointName =
- (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName();
-
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "handleListenerException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
- new Object[] { endpointName, getBrokerUrl(), t });
+ String endpointName = (getDestination() == null) ? ""
+ : ((ActiveMQDestination) getDestination()).getPhysicalName();
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+ "handleListenerException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_jms_listener_failed_WARNING",
+ new Object[] { endpointName, getBrokerUrl(), t });
}
super.handleListenerException(t);
- }
+ }
+
+ private void allPropertiesSet() {
+ super.afterPropertiesSet();
+ }
+
+ private void injectConnectionFactory() {
+ while (connectionFactory == null) {
+ try {
+ Thread.sleep(50);
+ } catch (Exception e) {
+ }
+ }
+ String brokerURL = ((ActiveMQConnectionFactory) connectionFactory).getBrokerURL();
+ super.setConnectionFactory(connectionFactory);
+ }
+
+ private void injectTaskExecutor() {
+ super.setTaskExecutor(taskExecutor);
+ }
+
+ private boolean isGetMetaListener() {
+ return getMessageSelector() != null
+ && __listenerRef.getMessageSelector().equals("Command=2001");
+ }
+
+ private boolean isActiveMQDestination() {
+ return getDestination() != null && getDestination() instanceof ActiveMQDestination;
+ }
- private void allPropertiesSet() {
- super.afterPropertiesSet();
- }
- private void injectConnectionFactory() {
- while( connectionFactory == null ) {
- try {
- Thread.sleep(50);
- } catch (Exception e){}
- }
- String brokerURL = ((ActiveMQConnectionFactory)connectionFactory).getBrokerURL();
- super.setConnectionFactory(connectionFactory);
- }
- private void injectTaskExecutor() {
- super.setTaskExecutor(taskExecutor);
- }
- private boolean isGetMetaListener() {
- return getMessageSelector() != null && __listenerRef.getMessageSelector().equals( "Command=2001");
- }
- private boolean isActiveMQDestination() {
- return getDestination() != null && getDestination() instanceof ActiveMQDestination;
- }
-
- public void initializeContainer() {
+ public void initializeContainer() {
try {
injectConnectionFactory();
initializeTaskExecutor();
injectTaskExecutor();
super.initialize();
- } catch( Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
}
- }
- /** Intercept Spring call to increment number of consumer
- * threads. If the value > 1, don't propagate to
- * Spring. A new listener will be injected and it will
- * use provided number of consumer threads.
+ }
+
+ /**
+ * Intercept Spring call to increment number of consumer threads. If the value > 1, don't
+ * propagate to Spring. A new listener will be injected and it will use provided number of
+ * consumer threads.
**/
public void setConcurrentConsumers(int concurrentConsumers) {
cc = concurrentConsumers;
- if ( this.freeCasQueueListener ) {
+ if (this.freeCasQueueListener) {
super.setConcurrentConsumers(concurrentConsumers);
}
}
- /** Intercept Spring call to inject application Pojo
- * listener. Don't propagate the listener up to Spring
- * just yet. If more than one consumer thread is used,
- * a different listener will be injected.
- **/
+
+ /**
+ * Intercept Spring call to inject application Pojo listener. Don't propagate the listener up to
+ * Spring just yet. If more than one consumer thread is used, a different listener will be
+ * injected.
+ **/
public void setMessageListener(Object messageListener) {
ml = messageListener;
- if ( this.freeCasQueueListener ) {
+ if (this.freeCasQueueListener) {
super.setMessageListener(messageListener);
}
}
- /**
- * Called by Spring and some Uima AS components when all properties have been set.
- * This method spins a thread in which the listener is initialized.
- */
- public void afterPropertiesSet()
- {
- if ( endpoint != null ) {
- // Endpoint has been plugged in from spring xml. This means this is a listener
- // for a reply queue. We need to rewire things a bit. First make Spring use
- // one thread to make sure we receive messages in order. To fix a race condition
- // where a parent CAS is processed first instead of its last child, we need to
- // assure that we get the child first. We need to update the counter of the
- // parent CAS to reflect that there is another child. In the race condition that
- // was observed, the parent was being processed first in one thread. The parent
- // reached the final step and subsequently was dropped. Subsequent to that, a
- // child CAS processed on another thread begun executing and failed since a look
- // on its parent resulted in CAS Not Found In Cache Exception.
- // Make sure Spring uses one thread
- super.setConcurrentConsumers(1);
+
+ /**
+ * Called by Spring and some Uima AS components when all properties have been set. This method
+ * spins a thread in which the listener is initialized.
+ */
+ public void afterPropertiesSet() {
+ if (endpoint != null) {
+ // Endpoint has been plugged in from spring xml. This means this is a listener
+ // for a reply queue. We need to rewire things a bit. First make Spring use
+ // one thread to make sure we receive messages in order. To fix a race condition
+ // where a parent CAS is processed first instead of its last child, we need to
+ // assure that we get the child first. We need to update the counter of the
+ // parent CAS to reflect that there is another child. In the race condition that
+ // was observed, the parent was being processed first in one thread. The parent
+ // reached the final step and subsequently was dropped. Subsequent to that, a
+ // child CAS processed on another thread begun executing and failed since a look
+ // on its parent resulted in CAS Not Found In Cache Exception.
+ // Make sure Spring uses one thread
+ super.setConcurrentConsumers(1);
if (cc > 1) {
try {
concurrentListener = new ConcurrentMessageListener(cc, ml);
super.setMessageListener(concurrentListener);
- } catch( Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
return;
}
@@ -473,338 +526,353 @@
while (connectionFactory == null) {
try {
Thread.sleep(50);
- } catch ( InterruptedException ex) {}
+ } catch (InterruptedException ex) {
+ }
}
System.setProperty("BrokerURI", ((ActiveMQConnectionFactory) connectionFactory)
.getBrokerURL());
boolean done = false;
- // Wait for controller to be injected by Uima AS
+ // Wait for controller to be injected by Uima AS
if (isActiveMQDestination() && !isGetMetaListener()
&& !((ActiveMQDestination) destination).isTemporary()) {
- // Add self to InputChannel
+ // Add self to InputChannel
connectWithInputChannel();
- // Wait for InputChannel to plug in a controller
+ // Wait for InputChannel to plug in a controller
done = true;
while (controller == null)
try {
Thread.sleep(50);
- } catch ( InterruptedException ex) {}
- ;
+ } catch (InterruptedException ex) {
+ }
+ ;
}
- // Plug in connection Factory to Spring's Listener
+ // Plug in connection Factory to Spring's Listener
__listenerRef.injectConnectionFactory();
- // Initialize the TaskExecutor. This call injects a custom Thread Pool into the
- // TaskExecutor provided in the spring xml. The custom thread pool initializes
- // an instance of AE in a dedicated thread
+ // Initialize the TaskExecutor. This call injects a custom Thread Pool into the
+ // TaskExecutor provided in the spring xml. The custom thread pool initializes
+ // an instance of AE in a dedicated thread
initializeTaskExecutor();
- // Plug in TaskExecutor to Spring's Listener
+ // Plug in TaskExecutor to Spring's Listener
__listenerRef.injectTaskExecutor();
- // Notify Spring Listener that all properties are ready
+ // Notify Spring Listener that all properties are ready
__listenerRef.allPropertiesSet();
- if (isActiveMQDestination() && destination != null ) {
+ if (isActiveMQDestination() && destination != null) {
destinationName = ((ActiveMQDestination) destination).getPhysicalName();
}
- if ( !done ) {
+ if (!done) {
connectWithInputChannel();
done = true;
}
- if ( concurrentListener != null ) {
+ if (concurrentListener != null) {
concurrentListener.setAnalysisEngineController(controller);
}
- // Save number of concurrent consumers on the temp reply queue in case we need to
- // recreate a new listener on a new temp queue created during recovery
- if ( endpoint != null && controller instanceof AggregateAnalysisEngineController ) {
- Delegate delegate =
- ((AggregateAnalysisEngineController)controller).lookupDelegate(endpoint.getDelegateKey());
- if ( delegate != null ) {
+ // Save number of concurrent consumers on the temp reply queue in case we need to
+ // recreate a new listener on a new temp queue created during recovery
+ if (endpoint != null && controller instanceof AggregateAnalysisEngineController) {
+ Delegate delegate = ((AggregateAnalysisEngineController) controller)
+ .lookupDelegate(endpoint.getDelegateKey());
+ if (delegate != null) {
delegate.getEndpoint().setConcurrentReplyConsumers(cc);
- }
+ }
}
String selector = "";
- if ( __listenerRef.getMessageSelector() != null ) {
- selector = " Selector:"+__listenerRef.getMessageSelector();
+ if (__listenerRef.getMessageSelector() != null) {
+ selector = " Selector:" + __listenerRef.getMessageSelector();
}
- if ( getDestination() != null) {
- System.out.println("Service:"+controller.getComponentName()+" Listener Ready. Broker:" + getBrokerUrl()+" Queue:"+getDestination()+selector);
+ if (getDestination() != null) {
+ System.out.println("Service:" + controller.getComponentName()
+ + " Listener Ready. Broker:" + getBrokerUrl() + " Queue:" + getDestination()
+ + selector);
}
- } catch ( Exception e ) {
+ } catch (Exception e) {
e.printStackTrace();
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
+ "afterPropertiesSet", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_jms_listener_failed_WARNING",
new Object[] { destination, getBrokerUrl(), e });
}
- }
- }
- );
- t.start();
- }
- /**
- * Inject instance of this listener into the InputChannel
- *
- * @throws Exception
- */
- private void connectWithInputChannel() throws Exception {
- Object pojoListener = getPojoListener();
+ }
+ });
+ t.start();
+ }
+
+ /**
+ * Inject instance of this listener into the InputChannel
+ *
+ * @throws Exception
+ */
+ private void connectWithInputChannel() throws Exception {
+ Object pojoListener = getPojoListener();
-
if (pojoListener instanceof JmsInputChannel) {
- // Wait until InputChannel has a valid controller. The controller will be plug in
- // by Spring on a different thread
- while( (((JmsInputChannel) pojoListener).getController()) == null ) {
+ // Wait until InputChannel has a valid controller. The controller will be plug in
+ // by Spring on a different thread
+ while ((((JmsInputChannel) pojoListener).getController()) == null) {
try {
Thread.currentThread().sleep(50);
- } catch ( Exception e) {}
+ } catch (Exception e) {
+ }
}
((JmsInputChannel) pojoListener).setListenerContainer(__listenerRef);
} else if (pojoListener instanceof ModifiableListener) {
((ModifiableListener) pojoListener).setListener(__listenerRef);
}
- }
- public String getDestinationName()
- {
-
- return destinationName;
- }
- public String getEndpointName()
- {
- if ( getDestination() != null )
- {
- return ((ActiveMQDestination)getDestination()).getPhysicalName();
- }
- return null;
- }
- public String getBrokerUrl()
- {
- return ((ActiveMQConnectionFactory)connectionFactory).getBrokerURL();
- }
- /* Overrides specified Connection Factory. Need to append maxInactivityDuration=0 to the
- * broker URL. The Connection Factory is immutable thus we need to intercept the one
- * provided in the deployment descriptor and create a new one with rewritten Broker URL.
- * We will inject the prefetch policy to the new CF based on what is found in the CF
- * in the deployment descriptor.
- */
-
- public void setConnectionFactory(ConnectionFactory aConnectionFactory) {
- connectionFactory = aConnectionFactory;
+ }
+
+ public String getDestinationName() {
+
+ return destinationName;
+ }
+
+ public String getEndpointName() {
+ if (getDestination() != null) {
+ return ((ActiveMQDestination) getDestination()).getPhysicalName();
+ }
+ return null;
+ }
+
+ public String getBrokerUrl() {
+ return ((ActiveMQConnectionFactory) connectionFactory).getBrokerURL();
+ }
+
+ /*
+ * Overrides specified Connection Factory. Need to append maxInactivityDuration=0 to the broker
+ * URL. The Connection Factory is immutable thus we need to intercept the one provided in the
+ * deployment descriptor and create a new one with rewritten Broker URL. We will inject the
+ * prefetch policy to the new CF based on what is found in the CF in the deployment descriptor.
+ */
+
+ public void setConnectionFactory(ConnectionFactory aConnectionFactory) {
+ connectionFactory = aConnectionFactory;
super.setConnectionFactory(connectionFactory);
- }
-
-
- public void setDestinationResolver( DestinationResolver resolver )
- {
- ((TempDestinationResolver)resolver).setListener(this);
- super.setDestinationResolver(resolver);
- }
- public void closeConnection() throws Exception
- {
- try
- {
- setRecoveryInterval(0);
- setAcceptMessagesWhileStopping(false);
- setAutoStartup(false);
- getSharedConnection().close();
- }
- catch( Exception e)
- {
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "closeConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING",
- new Object[] {Thread.currentThread().getId(), e});
- }
- }
- }
- public void setDestination( Destination aDestination )
- {
- super.setDestination(aDestination);
- if ( endpoint != null)
- {
- endpoint.setDestination(aDestination);
- if ( aDestination instanceof TemporaryQueue ) {
- endpoint.setTempReplyDestination(true);
- String serviceName = "";
- if ( controller != null ) {
- serviceName = ">>>Controller:"+controller.getComponentName();
- }
- Object pojoListener = getPojoListener();
- if ( pojoListener != null && pojoListener instanceof InputChannel ) {
- ((JmsInputChannel)pojoListener).setListenerContainer(this);
- }
- }
- endpoint.setServerURI(getBrokerUrl());
- }
- }
- private Object getPojoListener() {
- Object pojoListener = null;
- if ( ml != null ) {
+ }
+
+ public void setDestinationResolver(DestinationResolver resolver) {
+ ((TempDestinationResolver) resolver).setListener(this);
+ super.setDestinationResolver(resolver);
+ }
+
+ public void closeConnection() throws Exception {
+ try {
+ setRecoveryInterval(0);
+ setAcceptMessagesWhileStopping(false);
+ setAutoStartup(false);
+ getSharedConnection().close();
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+ "closeConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", new Object[] { Thread.currentThread().getId(), e });
+ }
+ }
+ }
+
+ public void setDestination(Destination aDestination) {
+ super.setDestination(aDestination);
+ if (endpoint != null) {
+ endpoint.setDestination(aDestination);
+ if (aDestination instanceof TemporaryQueue) {
+ endpoint.setTempReplyDestination(true);
+ String serviceName = "";
+ if (controller != null) {
+ serviceName = ">>>Controller:" + controller.getComponentName();
+ }
+ Object pojoListener = getPojoListener();
+ if (pojoListener != null && pojoListener instanceof InputChannel) {
+ ((JmsInputChannel) pojoListener).setListenerContainer(this);
+ }
+ }
+ endpoint.setServerURI(getBrokerUrl());
+ }
+ }
+
+ private Object getPojoListener() {
+ Object pojoListener = null;
+ if (ml != null) {
pojoListener = ml;
- } else if ( getMessageListener() != null ){
- pojoListener= getMessageListener();
+ } else if (getMessageListener() != null) {
+ pojoListener = getMessageListener();
}
- return pojoListener;
- }
- public Destination getListenerEndpoint()
- {
- return getDestination();
- }
-
- public void onException(JMSException arg0)
- {
- if ( awaitingShutdown ) {
+ return pojoListener;
+ }
+
+ public Destination getListenerEndpoint() {
+ return getDestination();
+ }
+
+ public void onException(JMSException arg0) {
+ if (awaitingShutdown) {
return;
}
- arg0.printStackTrace();
- String endpointName =
- (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName();
-
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
+ arg0.printStackTrace();
+ String endpointName = (getDestination() == null) ? ""
+ : ((ActiveMQDestination) getDestination()).getPhysicalName();
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "onException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
- new Object[] { endpointName, getBrokerUrl(), arg0});
+ "onException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_jms_listener_failed_WARNING",
+ new Object[] { endpointName, getBrokerUrl(), arg0 });
}
- }
+ }
- public void setTargetEndpoint( Endpoint anEndpoint )
- {
- endpoint = anEndpoint;
- }
- public boolean isFreeCasQueueListener()
- {
- return freeCasQueueListener;
- }
- protected void setModifiedTaskExecutor( TaskExecutor taskExecutor) {
+ public void setTargetEndpoint(Endpoint anEndpoint) {
+ endpoint = anEndpoint;
+ }
+
+ public boolean isFreeCasQueueListener() {
+ return freeCasQueueListener;
+ }
+
+ protected void setModifiedTaskExecutor(TaskExecutor taskExecutor) {
super.setTaskExecutor(taskExecutor);
- System.out.println("Injected Updated Task Executor Into Listener For Destination:"+getDestination());
+ System.out.println("Injected Updated Task Executor Into Listener For Destination:"
+ + getDestination());
}
+
/**
* Delegate shutdown to the super class
*/
public void doDestroy() {
super.destroy();
}
+
/**
- * Spins a shutdown thread and stops Sprint and ActiveMQ threads.
+ * Spins a shutdown thread and stops Sprint and ActiveMQ threads.
*
*/
public void destroy() {
- if ( awaitingShutdown ) {
+ if (awaitingShutdown) {
return;
}
awaitingShutdown = true;
- // Spin a thread that will wait until all threads complete. This is needed to avoid
- // memory leak caused by the fact that we did not wait to collect the threads
- Thread threadGroupDestroyer = new Thread(threadGroup.getParent().getParent(),"threadGroupDestroyer") {
- public void run() {
+ // Spin a thread that will wait until all threads complete. This is needed to avoid
+ // memory leak caused by the fact that we did not wait to collect the threads
+ Thread threadGroupDestroyer = new Thread(threadGroup.getParent().getParent(),
+ "threadGroupDestroyer") {
+ public void run() {
+ try {
+ // stop Spring listener and ActiveMQ threads
+ __listenerRef.stop();
+ __listenerRef.closeConnection();
+ } catch (Exception e) {
+ }
+ // If using non-default TaskExecutor, stop its threads
+ if (taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
+ ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdown();
+ // Since the calling thread may be one of those managed by the executor allow
+ // for one open thread when checking active thread count.
+ while (((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().getActiveCount() > 1
+ && !((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor()
+ .isTerminated()) {
+ try {
+ ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().awaitTermination(200,
+ TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ }
+ }
+ } else if (concurrentListener != null) {
+ // Stop internal Executor
+ concurrentListener.stop();
+ }
+ // if ( taskExecutor != null ) {
+ // System.out.println(">>>>> Thread:"+Thread.currentThread().getId()+
+ // " +++++++++ Listener:"+getDestination()+" Controller ThreadPoolExecutor Stopped ...");
+ // }
+ // Shutdown the listener
+ __listenerRef.shutdown();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ threadGroup.getParent().list();
+ }
+ // System.out.println(">>>>> Thread:"+Thread.currentThread().getId()+
+ // " ThreadGroupDestroyer waiting for threads to stop. Active thread count:"+threadGroup.activeCount()+" Active thread group count:"+threadGroup.activeGroupCount());
+ // Wait until all threads are accounted for
+ while (threadGroup.activeCount() > 0) {
try {
- // stop Spring listener and ActiveMQ threads
- __listenerRef.stop();
- __listenerRef.closeConnection();
- } catch( Exception e) {}
- // If using non-default TaskExecutor, stop its threads
- if ( taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) {
- ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().shutdown();
- // Since the calling thread may be one of those managed by the executor allow
- // for one open thread when checking active thread count.
- while ( ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().getActiveCount() > 1 &&
- !((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().isTerminated() ) {
+ Thread[] threads = new Thread[threadGroup.activeCount()];
+ threadGroup.enumerate(threads);
+ boolean foundExpectedThreads = true;
+
+ for (Thread t : threads) {
try {
- ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().awaitTermination(200,TimeUnit.MILLISECONDS);
- } catch ( Exception e){}
- }
- } else if ( concurrentListener != null ) {
- // Stop internal Executor
- concurrentListener.stop();
- }
-// if ( taskExecutor != null ) {
-// System.out.println(">>>>> Thread:"+Thread.currentThread().getId()+ " +++++++++ Listener:"+getDestination()+" Controller ThreadPoolExecutor Stopped ...");
-// }
- // Shutdown the listener
- __listenerRef.shutdown();
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST) ) {
- threadGroup.getParent().list();
- }
-// System.out.println(">>>>> Thread:"+Thread.currentThread().getId()+ " ThreadGroupDestroyer waiting for threads to stop. Active thread count:"+threadGroup.activeCount()+" Active thread group count:"+threadGroup.activeGroupCount());
- // Wait until all threads are accounted for
- while (threadGroup.activeCount() > 0) {
- try {
- Thread[] threads = new Thread[threadGroup.activeCount()];
- threadGroup.enumerate(threads);
- boolean foundExpectedThreads = true;
-
- for (Thread t : threads) {
- try {
- String tName = t.getName();
- // The following is necessary to account for the AMQ threads
- // Any threads not named in the list below will cause a wait
- // and retry until all non-amq threads are stopped
- if (!tName.startsWith("main") && !tName.equalsIgnoreCase("timer-0")
- && !tName.equals("ReaderThread") && !tName.equals("BrokerThreadGroup")
- && !tName.startsWith("ActiveMQ")) {
- foundExpectedThreads = false;
- break; // from for
- }
- } catch (Exception e) {
+ String tName = t.getName();
+ // The following is necessary to account for the AMQ threads
+ // Any threads not named in the list below will cause a wait
+ // and retry until all non-amq threads are stopped
+ if (!tName.startsWith("main") && !tName.equalsIgnoreCase("timer-0")
+ && !tName.equals("ReaderThread") && !tName.equals("BrokerThreadGroup")
+ && !tName.startsWith("ActiveMQ")) {
+ foundExpectedThreads = false;
+ break; // from for
}
+ } catch (Exception e) {
}
- if (foundExpectedThreads) {
- break; // from while
- }
- Thread.sleep(100);
- } catch (InterruptedException e) {
}
+ if (foundExpectedThreads) {
+ break; // from while
+ }
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
}
-// System.out.println(">>>>> Thread:"+Thread.currentThread().getId()+ " ThreadGroupDestroyer all threads stopped");
+ }
+ // System.out.println(">>>>> Thread:"+Thread.currentThread().getId()+
+ // " ThreadGroupDestroyer all threads stopped");
- try {
- synchronized(threadGroup ) {
- if ( !threadGroup.isDestroyed() ) {
- threadGroup.destroy();
- }
+ try {
+ synchronized (threadGroup) {
+ if (!threadGroup.isDestroyed()) {
+ threadGroup.destroy();
}
-// System.out.println(">>>>> Thread:"+Thread.currentThread().getId()+ " >>>>>>>>>>>> Listener:"+getDestinationName()+" Thread Group Destroyed");
- } catch( Exception e) {} // Ignore
- }
- };
- threadGroupDestroyer.start();
+ }
+ // System.out.println(">>>>> Thread:"+Thread.currentThread().getId()+
+ // " >>>>>>>>>>>> Listener:"+getDestinationName()+" Thread Group Destroyed");
+ } catch (Exception e) {
+ } // Ignore
+ }
+ };
+ threadGroupDestroyer.start();
}
+
/**
* Called by Spring to inject TaskExecutor
*/
- public void setTaskExecutor( TaskExecutor aTaskExecutor) {
+ public void setTaskExecutor(TaskExecutor aTaskExecutor) {
taskExecutor = aTaskExecutor;
}
+
/**
- * This method initializes ThreadPoolExecutor with a custom ThreadPool.
- * Each thread produced by the ThreadPool is used to first initialize
- * an instance of the AE before the thread is added to the pool. From
- * this point on, a thread used to initialize the AE will also be used
- * to call this AE's process() method.
- *
+ * This method initializes ThreadPoolExecutor with a custom ThreadPool. Each thread produced by
+ * the ThreadPool is used to first initialize an instance of the AE before the thread is added to
+ * the pool. From this point on, a thread used to initialize the AE will also be used to call this
+ * AE's process() method.
+ *
* @throws Exception
*/
private void initializeTaskExecutor() throws Exception {
- // TaskExecutor is only used with primitives
+ // TaskExecutor is only used with primitives
if (controller instanceof PrimitiveAnalysisEngineController) {
- // in case the taskExecutor is not plugged in yet, wait until one
- // becomes available. The TaskExecutor is plugged in by Spring
- synchronized( mux2 ) {
- while( taskExecutor == null ) {
+ // in case the taskExecutor is not plugged in yet, wait until one
+ // becomes available. The TaskExecutor is plugged in by Spring
+ synchronized (mux2) {
+ while (taskExecutor == null) {
mux2.wait(20);
}
}
- // Create a Custom Thread Factory. Provide it with an instance of
- // PrimitiveController so that every thread can call it to initialize
- // the next available instance of a AE.
+ // Create a Custom Thread Factory. Provide it with an instance of
+ // PrimitiveController so that every thread can call it to initialize
+ // the next available instance of a AE.
tf = new UimaAsThreadFactory(threadGroup, (PrimitiveAnalysisEngineController) controller);
- // This ThreadExecutor will use custom thread factory instead of defult one
+ // This ThreadExecutor will use custom thread factory instead of defult one
((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf);
- // Initialize the thread pool
+ // Initialize the thread pool
((ThreadPoolTaskExecutor) taskExecutor).initialize();
- // Make sure all threads are started. This forces each thread to call
- // PrimitiveController to initialize the next instance of AE
+ // Make sure all threads are started. This forces each thread to call
+ // PrimitiveController to initialize the next instance of AE
((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().prestartAllCoreThreads();
}
}
+
public void stop() throws JmsException {
setAcceptMessagesWhileStopping(false);
destroy();
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaEEAdminSpringContext.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaEEAdminSpringContext.java?rev=810547&r1=810546&r2=810547&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaEEAdminSpringContext.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaEEAdminSpringContext.java Wed Sep 2 15:16:23 2009
@@ -31,184 +31,166 @@
import org.springframework.context.ApplicationListener;
import org.springframework.context.support.FileSystemXmlApplicationContext;
-public class UimaEEAdminSpringContext
-implements UimaEEAdminContext, ApplicationListener
-{
- private static final Class CLASS_NAME = UimaEEAdminSpringContext.class;
-
- private FileSystemXmlApplicationContext springContainer = null;
- private BrokerService service;
- private boolean isShutdown;
- private ConcurrentHashMap listenerMap = new ConcurrentHashMap();
-
-
- public UimaEEAdminSpringContext( FileSystemXmlApplicationContext aSpringContainer )
- {
- springContainer = aSpringContainer;
- String beanNames[] = springContainer.getBeanNamesForType(org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer.class);
- for( int i=0; beanNames != null && i < beanNames.length; i++)
- {
- try
- {
- UimaDefaultMessageListenerContainer listenerContainer =
- ((UimaDefaultMessageListenerContainer)springContainer.getBean(beanNames[i]));
- ListenerEntry listenerEntry = new ListenerEntry();
- listenerEntry.setListenerContainer(listenerContainer);
- listenerMap.put(listenerContainer.getDestinationName(), listenerEntry);
- }
- catch( Exception e)
- {
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "UimaEEAdminSpringContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { e });
- }
- }
- }
- }
-
- public void setBroker( BrokerService aBrokerService )
- {
- service = aBrokerService;
- }
-
- public ApplicationContext getSpringContainer()
- {
- return springContainer;
- }
-
- public int getConcurrentConsumerCount( String anEndpointName )
- {
- try
- {
- if ( anEndpointName != null && anEndpointName.trim().length() > 0 && springContainer.isActive() && listenerMap.containsKey(anEndpointName))
- {
+public class UimaEEAdminSpringContext implements UimaEEAdminContext, ApplicationListener {
+ private static final Class CLASS_NAME = UimaEEAdminSpringContext.class;
+
+ private FileSystemXmlApplicationContext springContainer = null;
+
+ private BrokerService service;
+
+ private boolean isShutdown;
+
+ private ConcurrentHashMap listenerMap = new ConcurrentHashMap();
+
+ public UimaEEAdminSpringContext(FileSystemXmlApplicationContext aSpringContainer) {
+ springContainer = aSpringContainer;
+ String beanNames[] = springContainer
+ .getBeanNamesForType(org.apache.uima.adapter.jms.activemq.UimaDefaultMessageListenerContainer.class);
+ for (int i = 0; beanNames != null && i < beanNames.length; i++) {
+ try {
+ UimaDefaultMessageListenerContainer listenerContainer = ((UimaDefaultMessageListenerContainer) springContainer
+ .getBean(beanNames[i]));
+ ListenerEntry listenerEntry = new ListenerEntry();
+ listenerEntry.setListenerContainer(listenerContainer);
+ listenerMap.put(listenerContainer.getDestinationName(), listenerEntry);
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "UimaEEAdminSpringContext", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", new Object[] { e });
+ }
+ }
+ }
+ }
+
+ public void setBroker(BrokerService aBrokerService) {
+ service = aBrokerService;
+ }
+
+ public ApplicationContext getSpringContainer() {
+ return springContainer;
+ }
+
+ public int getConcurrentConsumerCount(String anEndpointName) {
+ try {
+ if (anEndpointName != null && anEndpointName.trim().length() > 0
+ && springContainer.isActive() && listenerMap.containsKey(anEndpointName)) {
+ ListenerEntry listenerEntry = null;
+
+ listenerEntry = ((ListenerEntry) listenerMap.get(anEndpointName));
+ if (listenerEntry != null && listenerEntry.isStopped() == false) {
+ UimaDefaultMessageListenerContainer listenerContainer = listenerEntry
+ .getListenerContainer();
+ return listenerContainer.getConcurrentConsumers();
+ }
+ }
+ } catch (Exception e) {
+ }
+ return -1;
+ }
+
+ /**
+ * Stops a listener thread on a given endpoint
+ */
+ public synchronized void stopListener(String anEndpointName) {
+ try {
+ if (anEndpointName != null && anEndpointName.trim().length() > 0
+ && springContainer.isActive() && listenerMap.containsKey(anEndpointName)) {
ListenerEntry listenerEntry = null;
-
- listenerEntry =((ListenerEntry)listenerMap.get(anEndpointName));
- if ( listenerEntry != null && listenerEntry.isStopped() == false )
- {
- UimaDefaultMessageListenerContainer listenerContainer =
- listenerEntry.getListenerContainer();
- return listenerContainer.getConcurrentConsumers();
+
+ listenerEntry = ((ListenerEntry) listenerMap.get(anEndpointName));
+ if (listenerEntry != null && listenerEntry.isStopped() == false) {
+ listenerEntry.setStopped(true);
+ if (listenerMap.get(anEndpointName) != null) {
+ ListenerEntry entry = (ListenerEntry) listenerMap.get(anEndpointName);
+ if (entry != null) {
+ UimaDefaultMessageListenerContainer listenerContainer = entry.getListenerContainer();
+ spinThreadForListenerShutdown(listenerContainer);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ }
+ }
+
+ private void spinThreadForListenerShutdown(
+ final UimaDefaultMessageListenerContainer listenerContainer) {
+ // Spin a shutdown thread to terminate listener. The thread is needed due
+ // to Spring.
+ new Thread() {
+ public void run() {
+ try {
+ listenerContainer.setAutoStartup(false);
+ listenerContainer.setRecoveryInterval(0);
+ listenerContainer.shutdown();
+ listenerContainer.destroy();
+ String eName = listenerContainer.getEndpointName();
+ if (eName != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
+ "spinThreadForListenerShutdown.run()", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_stop_listener__INFO", new Object[] { eName });
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ }
+
+ public void onApplicationEvent(ApplicationEvent anEvent) {
+ shutdown();
+ }
+
+ /**
+ * Stops the Spring Container
+ */
+ public void shutdown() {
+ if (springContainer.isActive()) {
+ isShutdown = true;
+ // Spin a thread so that the Spring container can shut itself down
+ new Thread("Spring Container Shutdown Thread") {
+ public void run() {
+ try {
+ System.out.println("Destroying Spring Container");
+ springContainer.destroy();
+ springContainer = null;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
- }
+ }.start();
}
- catch( Exception e) {}
- return -1;
- }
- /**
- * Stops a listener thread on a given endpoint
- */
- public synchronized void stopListener(String anEndpointName )
- {
- try
- {
- if ( anEndpointName != null && anEndpointName.trim().length() > 0 && springContainer.isActive() && listenerMap.containsKey(anEndpointName))
- {
- ListenerEntry listenerEntry = null;
-
- listenerEntry =((ListenerEntry)listenerMap.get(anEndpointName));
- if ( listenerEntry != null && listenerEntry.isStopped() == false )
- {
- listenerEntry.setStopped(true);
- if ( listenerMap.get(anEndpointName) != null )
- {
- ListenerEntry entry = (ListenerEntry)listenerMap.get(anEndpointName);
- if ( entry != null )
- {
- UimaDefaultMessageListenerContainer listenerContainer =
- entry.getListenerContainer();
- spinThreadForListenerShutdown(listenerContainer);
- }
- }
- }
- }
- }
- catch( Exception e) {}
- }
-
- private void spinThreadForListenerShutdown(final UimaDefaultMessageListenerContainer listenerContainer)
- {
- // Spin a shutdown thread to terminate listener. The thread is needed due
- // to Spring.
- new Thread() {
- public void run()
- {
- try
- {
- listenerContainer.setAutoStartup(false);
- listenerContainer.setRecoveryInterval(0);
- listenerContainer.shutdown();
- listenerContainer.destroy();
- String eName = listenerContainer.getEndpointName();
- if ( eName != null )
- {
- if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
- "spinThreadForListenerShutdown.run()", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stop_listener__INFO",
- new Object[] { eName });
- }
- }
- }
- catch( Exception e) { e.printStackTrace();}
- }
- }.start();
- }
- public void onApplicationEvent(ApplicationEvent anEvent)
- {
- shutdown();
- }
- /**
- * Stops the Spring Container
- */
- public void shutdown()
- {
- if ( springContainer.isActive() )
- {
- isShutdown = true;
- // Spin a thread so that the Spring container can shut itself down
- new Thread("Spring Container Shutdown Thread") {
- public void run()
- {
- try
- {
- System.out.println("Destroying Spring Container");
- springContainer.destroy();
- springContainer = null;
- }
- catch( Exception e) { e.printStackTrace();}
- }
- }.start();
- }
- listenerMap.clear();
- }
- public boolean isShutdown()
- {
- return isShutdown;
- }
-
- protected static class ListenerEntry
- {
- private boolean stopped = false;
- private UimaDefaultMessageListenerContainer listenerContainer = null;
-
- protected UimaDefaultMessageListenerContainer getListenerContainer()
- {
- return listenerContainer;
- }
- protected void setListenerContainer(UimaDefaultMessageListenerContainer listenerContainer)
- {
- this.listenerContainer = listenerContainer;
- }
- protected boolean isStopped()
- {
- return stopped;
- }
- protected void setStopped(boolean stopped)
- {
- this.stopped = stopped;
- }
-
- }
-
+ listenerMap.clear();
+ }
+
+ public boolean isShutdown() {
+ return isShutdown;
+ }
+
+ protected static class ListenerEntry {
+ private boolean stopped = false;
+
+ private UimaDefaultMessageListenerContainer listenerContainer = null;
+
+ protected UimaDefaultMessageListenerContainer getListenerContainer() {
+ return listenerContainer;
+ }
+
+ protected void setListenerContainer(UimaDefaultMessageListenerContainer listenerContainer) {
+ this.listenerContainer = listenerContainer;
+ }
+
+ protected boolean isStopped() {
+ return stopped;
+ }
+
+ protected void setStopped(boolean stopped) {
+ this.stopped = stopped;
+ }
+
+ }
}