You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/10/22 18:22:05 UTC
svn commit: r707119 - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq:
JmsEndpointConnection_impl.java JmsInputChannel.java JmsOutputChannel.java
UimaDefaultMessageListenerContainer.java
Author: eae
Date: Wed Oct 22 09:22:04 2008
New Revision: 707119
URL: http://svn.apache.org/viewvc?rev=707119&view=rev
Log: (empty)
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=707119&r1=707118&r2=707119&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Wed Oct 22 09:22:04 2008
@@ -45,6 +45,8 @@
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InputChannel;
+import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.Endpoint;
import org.apache.uima.aae.error.AsynchAEException;
@@ -53,6 +55,7 @@
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.util.Level;
+import org.springframework.jms.JmsException;
import org.springframework.util.Assert;
public class JmsEndpointConnection_impl implements ConsumerListener
@@ -93,6 +96,10 @@
private boolean isReplyEndpoint;
+ private volatile boolean failed = false;
+
+ private Object recoveryMux = new Object();
+
public JmsEndpointConnection_impl(Map aConnectionMap, Endpoint anEndpoint)
{
connectionMap = aConnectionMap;
@@ -218,6 +225,9 @@
}
catch ( Exception e)
{
+ if ( e instanceof JMSException ) {
+ handleJmsException( (JMSException)e );
+ }
throw new AsynchAEException(e);
}
@@ -471,83 +481,133 @@
{
String destinationName = "";
- Exception lastException = null;
- // Send a message to the destination. Retry 10 times if unable to send.
- // After 10 tries give up and throw exception
- for (int i = 0; i < 10; i++)
+ try
{
- try
- {
- stopTimer();
+ stopTimer();
- if (conn == null || producerSession == null || !((ActiveMQSession) producerSession).isRunning())
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open_connection_to_endpoint__FINE", new Object[] { getEndpoint() });
- openChannel();
+ if ( failed || conn == null || producerSession == null || !((ActiveMQSession) producerSession).isRunning())
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open_connection_to_endpoint__FINE", new Object[] { getEndpoint() });
+ openChannel();
+ // The connection has been successful. Now check if we need to create a new listener
+ // and a temp queue to receive replies. A new listener will be created only if the
+ // endpoint for the delegate is marked as FAILED. This will be the case if the listener
+ // on the reply queue for the endpoint has failed.
+ synchronized( recoveryMux ) {
+ if ( controller instanceof AggregateAnalysisEngineController ) {
+ // Using the queue name lookup the delegate key
+ String key = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(delegateEndpoint.getEndpoint());
+ if ( key != null && destination != null && !isReplyEndpoint ) {
+ // The aggregate has a master list of endpoints which are typically cloned during processing
+ // This object uses a copy of the master. When a listener fails, the status of the master
+ // endpoint is changed. To check the status, fetch the master endpoint, check its status
+ // and if marked as FAILED, create a new listener, a new temp queue and override this
+ // object endpoint copy destination property. It *IS* a new replyTo temp queue.
+ Endpoint masterEndpoint = ((AggregateAnalysisEngineController)controller).lookUpEndpoint(key, false);
+ if ( masterEndpoint.getStatus() == Endpoint.FAILED ) {
+ // Create a new Listener Object to receive replies
+ createListener(key);
+ destination = (Destination)masterEndpoint.getDestination();
+ delegateEndpoint.setDestination(destination);
+ // Override the reply destination. A new listener has been created along with a new temp queue for replies.
+ aMessage.setJMSReplyTo(destination);
+ }
+ }
+ }
}
- // Send a reply to a queue provided by the client
- if ( isReplyEndpoint && delegateEndpoint.getDestination() != null )
+ }
+ // Send a reply to a queue provided by the client
+ if ( isReplyEndpoint && delegateEndpoint.getDestination() != null )
+ {
+ destinationName = ((ActiveMQDestination)delegateEndpoint.getDestination()).getPhysicalName();
+ if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
{
- destinationName = ((ActiveMQDestination)delegateEndpoint.getDestination()).getPhysicalName();
- if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
- }
- synchronized(producer)
- {
- producer.send((Destination)delegateEndpoint.getDestination(), aMessage);
- }
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
}
- else
+ synchronized(producer)
{
- destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
- if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
- }
- synchronized(producer)
- {
- producer.send(aMessage);
- }
+ producer.send((Destination)delegateEndpoint.getDestination(), aMessage);
}
+ }
+ else
+ {
+ destinationName = ((ActiveMQQueue) producer.getDestination()).getPhysicalName();
+ if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", new Object[] {destinationName });
+ }
+ synchronized(producer)
+ {
+ producer.send(aMessage);
+ }
+ }
if (startTimer)
{
startTimer(connectionCreationTimestamp);
}
- lastException = null;
return true;
- }
- catch ( Exception e)
- {
- lastException = e;
- // If the controller has been stopped no need to send messages
- if ( controller.isStopped())
- {
- return true;
- }
- else
- {
- e.printStackTrace();
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_not_ableto_send_msg_INFO", new Object[] { controller.getComponentName(), destinationName, i+1, 10 });
- }
- }
- try
+ }
+ catch ( Exception e)
+ {
+ // If the controller has been stopped no need to send messages
+ if ( controller.isStopped())
{
- wait(50);
+ return true;
}
- catch ( Exception ex)
+ else
{
+ if ( e instanceof JMSException ) {
+ handleJmsException( (JMSException)e );
+ }
+
+ e.printStackTrace();
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { controller.getComponentName(), e});
}
}
- if ( lastException != null )
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { controller.getComponentName(), lastException});
- }
stopTimer();
return false;
}
+ /**
+ * This method is called during recovery of failed connection. It is only called if the endpoint
+ * associated with a given delegate is marked as FAILED. It is marked that way when a listener
+ * attached to the reply queue fails. This method creates a new listener and a new temp queue.
+ *
+ * @param delegateKey
+ * @throws Exception
+ */
+ private void createListener(String delegateKey) throws Exception {
+ if ( controller instanceof AggregateAnalysisEngineController ) {
+ // Fetch an InputChannel that handles messages for a given delegate
+ InputChannel iC = controller.getReplyInputChannel(delegateKey);
+ // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
+ iC.createListener(delegateKey);
+ }
+ }
+
+ private synchronized void handleJmsException( JMSException ex) {
+ if ( failed ) {
+ return; // Already marked failed
+ }
+ failed = true;
+/*
+ try {
+ if ( controller instanceof AggregateAnalysisEngineController ) {
+ String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(delegateEndpoint.getEndpoint());
+ if ( delegateEndpoint.getDestination() != null ) {
+ InputChannel iC = ((AggregateAnalysisEngineController)controller).getInputChannel(delegateEndpoint.getDestination().toString());
+ iC.destroyListener(delegateEndpoint.getDestination().toString(), delegateKey);
+ } else {
+ InputChannel iC = ((AggregateAnalysisEngineController)controller).getInputChannel(delegateEndpoint.getEndpoint());
+ iC.destroyListener(delegateEndpoint.getEndpoint(), delegateKey);
+ }
+ }
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+*/
+ }
public void onConsumerEvent(ConsumerEvent arg0)
{
if (controller != null)
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=707119&r1=707118&r2=707119&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Wed Oct 22 09:22:04 2008
@@ -22,22 +22,21 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
-import mx4j.tools.adaptor.http.GetAttributeCommandProcessor;
-
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.Endpoint_impl;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.error.InvalidMessageException;
import org.apache.uima.aae.handler.Handler;
@@ -94,6 +93,9 @@
private Object mux = new Object();
+ private ConcurrentHashMap<String, UimaDefaultMessageListenerContainer> failedListenerMap =
+ new ConcurrentHashMap<String, UimaDefaultMessageListenerContainer>();
+
public AnalysisEngineController getController()
{
return controller;
@@ -236,7 +238,8 @@
if ( command != AsynchAEMessage.Process &&
command != AsynchAEMessage.GetMeta &&
command != AsynchAEMessage.ReleaseCAS &&
- command != AsynchAEMessage.Stop &&
+ command != AsynchAEMessage.Stop &&
+ command != AsynchAEMessage.Ping &&
command != AsynchAEMessage.CollectionProcessComplete )
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
@@ -270,7 +273,8 @@
int command = aMessage.getIntProperty(AsynchAEMessage.Command);
if ( command == AsynchAEMessage.GetMeta ||
command == AsynchAEMessage.CollectionProcessComplete ||
- command == AsynchAEMessage.Stop ||
+ command == AsynchAEMessage.Stop ||
+ command == AsynchAEMessage.Ping ||
command == AsynchAEMessage.ReleaseCAS)
{
// Payload not included in GetMeta Request
@@ -316,7 +320,7 @@
// Shutting down
return true;
}
- if ( command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response )
+ if ( command == AsynchAEMessage.Process && msgType == AsynchAEMessage.Response )
{
String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
if (!getController().getInProcessCache().entryExists(casReferenceId))
@@ -404,8 +408,10 @@
return "CollectionProcessComplete";
case AsynchAEMessage.ReleaseCAS:
return "ReleaseCAS";
- case AsynchAEMessage.Stop:
- return "Stop";
+ case AsynchAEMessage.Stop:
+ return "Stop";
+ case AsynchAEMessage.Ping:
+ return "Ping";
}
}
@@ -706,7 +712,11 @@
public String getInputQueueName()
{
if ( messageListener != null )
- return messageListener.getDestinationName();//getEndpointName();
+ if ( messageListener.getDestination() != null ) {
+ return messageListener.getDestination().toString();
+ } else {
+ return messageListener.getDestinationName();//getEndpointName();
+ }
else
{
return "";
@@ -777,4 +787,103 @@
{
return messageListener.getConcurrentConsumers();
}
+
+ public void createListener( String aDelegateKey ) throws Exception {
+
+ UimaDefaultMessageListenerContainer failedListener =
+ failedListenerMap.get(aDelegateKey);
+ UimaDefaultMessageListenerContainer newListener =
+ new UimaDefaultMessageListenerContainer();
+ newListener.setConnectionFactory(failedListener.getConnectionFactory());
+ newListener.setMessageListener(this);
+ newListener.setController(getController());
+
+ TempDestinationResolver resolver = new TempDestinationResolver();
+ resolver.setConnectionFactory((ActiveMQConnectionFactory)failedListener.getConnectionFactory());
+ newListener.setDestinationResolver(resolver);
+
+ org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor executor = new org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(failedListener.getConcurrentConsumers());
+ executor.setMaxPoolSize(failedListener.getMaxConcurrentConsumers());
+ executor.setQueueCapacity(failedListener.getMaxConcurrentConsumers());
+ executor.initialize();
+
+ newListener.setTaskExecutor(executor);
+ newListener.initialize();
+ newListener.start();
+ // Wait until the resolver plugs in the destination
+ while (newListener.getDestination() == null) {
+ synchronized( newListener ) {
+
+ System.out.println(".... Waiting For Destination ...");
+ newListener.wait(100);
+ }
+ }
+ newListener.afterPropertiesSet();
+ // Get the endpoint object for a given delegate key from the Aggregate
+ Endpoint endpoint = ((AggregateAnalysisEngineController)getController()).lookUpEndpoint(aDelegateKey, false);
+ endpoint.setStatus(Endpoint.OK);
+ // Override the reply destination.
+ endpoint.setDestination(newListener.getDestination());
+ Object clone = ((Endpoint_impl) endpoint).clone();
+ newListener.setTargetEndpoint((Endpoint)clone);
+ }
+
+ private UimaDefaultMessageListenerContainer getListenerForEndpoint( String anEndpointName ) {
+ for( int i=0; i < listenerContainerList.size(); i++ ) {
+ UimaDefaultMessageListenerContainer mListener =
+ (UimaDefaultMessageListenerContainer) listenerContainerList.get(i);
+ if ( mListener.getDestination() != null && mListener.getDestination().toString().equals( anEndpointName)) {
+ return mListener;
+ }
+ }
+ return null;
+ }
+ public void destroyListener( String anEndpointName, String aDelegateKey ) {
+ final UimaDefaultMessageListenerContainer mListener =
+ getListenerForEndpoint(anEndpointName);
+ if ( mListener == null ) {
+ System.out.println("--- Listener For Endpoint: "+aDelegateKey+" Not Found");
+ return;
+ }
+
+ try {
+// if ( messageListener.getDestination().toString().equals( anEndpointName)) {
+ System.out.println("++++ Stopping Listener ...");
+ mListener.stop();
+ System.out.println("++++ Destroying Listener ...");
+ new Thread() {
+ public void run() {
+ mListener.destroy();
+ }
+ };
+ while( mListener.isRunning());
+ System.out.println("++++ Listener on Queue:"+anEndpointName+" Has Been Stopped...");
+ Endpoint endpoint = ((AggregateAnalysisEngineController)getController()).lookUpEndpoint(aDelegateKey, false);
+ endpoint.setStatus(Endpoint.FAILED);
+ if ( mListener.getConnectionFactory() != null) {
+ if ( getController() instanceof AggregateAnalysisEngineController ) {
+ if ( !failedListenerMap.containsKey(aDelegateKey )) {
+ failedListenerMap.put( aDelegateKey, mListener);
+ listenerContainerList.remove(mListener);
+ System.out.println("++++ Saving Connection Factory");
+ }
+ }
+ }
+ //}
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ public boolean isFailed(String aDelegateKey) {
+ return failedListenerMap.containsKey(aDelegateKey);
+ }
+ public boolean isListenerForDestination( String anEndpointName) {
+ UimaDefaultMessageListenerContainer mListener =
+ getListenerForEndpoint(anEndpointName);
+ if ( mListener == null ) {
+ return false;
+ }
+ return true;
+ }
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=707119&r1=707118&r2=707119&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Wed Oct 22 09:22:04 2008
@@ -559,8 +559,6 @@
"sendRequest", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_serialized_cas__FINEST",
new Object[] { getAnalysisEngineController().getComponentName(), anEndpoint.getEndpoint(),aCasReferenceId,serializedCAS });
}
-
-
// Send process request to remote delegate and start timeout timer
sendCasToRemoteEndpoint(true, serializedCAS, null, aCasReferenceId, anEndpoint, true, 0);
} else {
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=707119&r1=707118&r2=707119&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Wed Oct 22 09:22:04 2008
@@ -23,19 +23,21 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.InputChannel;
import org.apache.uima.aae.UIMAEE_Constants;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.Endpoint;
-import org.apache.uima.aae.controller.Endpoint_impl;
import org.apache.uima.aae.error.ErrorHandler;
import org.apache.uima.aae.error.Threshold;
import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
@@ -51,14 +53,16 @@
private static final Class CLASS_NAME = UimaDefaultMessageListenerContainer.class;
private String destinationName="";
private Endpoint endpoint;
- private boolean freeCasQueueListener;
+ private volatile boolean freeCasQueueListener;
private AnalysisEngineController controller;
- private int retryCount = 2;
+ private volatile boolean failed = false;
+ private Object mux = new Object();
public UimaDefaultMessageListenerContainer()
{
super();
- setRecoveryInterval(60000);
+ setRecoveryInterval(5);
+// setRecoveryInterval(60000);
setAcceptMessagesWhileStopping(false);
setExceptionListener(this);
}
@@ -71,6 +75,11 @@
{
controller = aController;
}
+ /**
+ *
+ * @param t
+ * @return
+ */
private boolean disableListener( Throwable t)
{
System.out.println(t.toString());
@@ -79,119 +88,202 @@
return true;
return false;
}
+ /**
+ * Stops this Listener
+ */
+ private synchronized void handleListenerFailure() {
+ try {
+ if ( controller instanceof AggregateAnalysisEngineController ) {
+ String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
+ if ( endpoint.getDestination() != null ) {
+ InputChannel iC = ((AggregateAnalysisEngineController)controller).getInputChannel(endpoint.getDestination().toString());
+ iC.destroyListener(endpoint.getDestination().toString(), delegateKey);
+ } else {
+ InputChannel iC = ((AggregateAnalysisEngineController)controller).getInputChannel(endpoint.getEndpoint());
+ iC.destroyListener(endpoint.getEndpoint(), delegateKey);
+ }
+ }
+ } catch( Exception e) {
+ e.printStackTrace();
+ }
+ }
+ /**
+ * Handles failure on a temp queue
+ * @param t
+ */
+ private void handleTempQueueFailure(Throwable t) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+ "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
+ new Object[] { endpoint.getDestination(), getBrokerUrl(), t });
+ // Check if the failure is due to the failed connection. Spring (and ActiveMQ) dont seem to provide
+ // the cause. Just the top level IllegalStateException with a text message. This is what we need to
+ // check for.
+ if ( t instanceof javax.jms.IllegalStateException && t.getMessage().equals("The Consumer is closed")) {
+ if ( controller != null && controller instanceof AggregateAnalysisEngineController ) {
+ String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
+ try {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
+ "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopping_listener_INFO",
+ new Object[] { controller.getComponentName(), endpoint.getDestination(),delegateKey });
+ // Stop current listener
+ handleListenerFailure();
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
+ "handleTempQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_stopped_listener_INFO",
+ new Object[] { controller.getComponentName(), endpoint.getDestination() });
+ } catch ( Exception e ) {
+ e.printStackTrace();
+ }
+ }
+ } else if ( disableListener(t)) {
+ handleQueueFailure(t);
+// terminate(t);
+ }
+ }
+
+ private ErrorHandler fetchGetMetaErrorHandler() {
+ ErrorHandler handler = null;
+ Iterator it = controller.getErrorHandlerChain().iterator();
+ // Find the error handler for GetMeta in the Error Handler List provided in the
+ // deployment descriptor
+ while ( it.hasNext() )
+ {
+ handler = (ErrorHandler)it.next();
+ if ( handler instanceof GetMetaErrorHandler )
+ {
+ return handler;
+ }
+ }
+ return null;
+ }
+ /**
+ * Handles failures on non-temp queues
+ * @param t
+ */
+ private void handleQueueFailure(Throwable t) {
+ final String endpointName =
+ (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName();
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+ "handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
+ new Object[] { endpointName, getBrokerUrl(), t });
+ boolean terminate = true;
+ // Check if the failure is severe enough to disable this listener. Whether or not this listener is actully
+ // disabled depends on the action associated with GetMeta Error Handler. If GetMeta Error Handler is
+ // configured to terminate the service on failure, this listener will be terminated and the entire service
+ // will be stopped.
+ if ( disableListener(t) ) {
+ endpoint.setReplyDestinationFailed();
+ // If this is a listener attached to the Aggregate Controller, use GetMeta Error
+ // Thresholds defined to determine what to do next after failure. Either terminate
+ // the service or disable the delegate with which this listener is associated with
+ if ( controller != null && controller instanceof AggregateAnalysisEngineController )
+ {
+ ErrorHandler handler = fetchGetMetaErrorHandler();
+ // Fetch a Map containing thresholds for GetMeta for each delegate.
+ Map thresholds = handler.getEndpointThresholdMap();
+ // Lookup delegate's key using delegate's endpoint name
+ String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
+ // If the delegate has a threshold defined on GetMeta apply Action defined
+ if ( delegateKey != null && thresholds.containsKey(delegateKey))
+ {
+ // Fetch the Threshold object containing error configuration
+ Threshold threshold = (Threshold) thresholds.get(delegateKey);
+ // Check if the delegate needs to be disabled
+ if (threshold.getAction().equalsIgnoreCase(ErrorHandler.DISABLE)) {
+ // The disable delegate method takes a list of delegates
+ List list = new ArrayList();
+ // Add the delegate to disable to the list
+ list.add(delegateKey);
+ try {
+ System.out.println(">>>> Controller:"+controller.getComponentName()+" Disabling Listener On Queue:"+endpoint.getEndpoint()+". Component's "+delegateKey+" Broker:"+getBrokerUrl()+" is Invalid");
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
+ "handleQueueFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_disabled_delegate_bad_broker__INFO",
+ new Object[] { controller.getComponentName(), delegateKey, getBrokerUrl() });
+ // Remove the delegate from the routing table.
+ ((AggregateAnalysisEngineController) controller).disableDelegates(list);
+ terminate = false; //just disable the delegate and continue
+ } catch (Exception e) {
+ e.printStackTrace();
+ terminate = true;
+ }
+ }
+ }
+ }
+ }
+ System.out.println("****** Unable To Connect Listener To Broker:"+getBrokerUrl());
+ System.out.println("****** Closing Listener on Queue:"+endpoint.getEndpoint());
+ setRecoveryInterval(0);
+
+ // Spin a shutdown thread to terminate listener.
+ new Thread() {
+ public void run()
+ {
+ try
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+ "handleQueueFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_disable_listener__WARNING",
+ new Object[] { endpointName, getBrokerUrl() });
+
+ shutdown();
+ }
+ catch( Exception e) { e.printStackTrace();}
+ }
+ }.start();
+
+ if ( terminate )
+ {
+ terminate(t);
+ }
+
+ }
+ /**
+ * This method is called by Spring when a listener fails
+ */
protected void handleListenerSetupFailure( Throwable t, boolean alreadyHandled )
{
- if ( !(t instanceof javax.jms.IllegalStateException ) )
- {
- t.printStackTrace();
- final String endpointName =
- (getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName();
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_jms_listener_failed_WARNING",
- new Object[] { endpointName, getBrokerUrl(), t });
-
- boolean terminate = true;
- if ( disableListener(t) )
- {
- if ( endpoint != null )
- {
- endpoint.setReplyDestinationFailed();
- // If this is a listener attached to the Aggregate Controller, use GetMeta Error
- // Thresholds defined to determine what to do next after failure. Either terminate
- // the service or disable the delegate with which this listener is associated with
- if ( controller != null && controller instanceof AggregateAnalysisEngineController )
- {
- ErrorHandler handler = null;
- Iterator it = controller.getErrorHandlerChain().iterator();
- // Find the error handler for GetMeta in the Error Handler List provided in the
- // deployment descriptor
- while ( it.hasNext() )
- {
- handler = (ErrorHandler)it.next();
- if ( handler instanceof GetMetaErrorHandler )
- {
- break;
- }
- }
- // Fetch a Map containing thresholds for GetMeta for each delegate.
- java.util.Map thresholds = handler.getEndpointThresholdMap();
- // Lookup delegate's key using delegate's endpoint name
- String delegateKey = ((AggregateAnalysisEngineController)controller).lookUpDelegateKey(endpoint.getEndpoint());
- // If the delegate has a threshold defined on GetMeta apply Action defined
- if ( delegateKey != null && thresholds.containsKey(delegateKey))
- {
- // Fetcg the Threshold object containing error configuration
- Threshold threshold = (Threshold) thresholds.get(delegateKey);
- // Check if the delegate needs to be disabled
- if (threshold.getAction().equalsIgnoreCase(ErrorHandler.DISABLE)) {
- // The disable delegate method takes a list of delegates
- List list = new ArrayList();
- // Add the delegate to disable to the list
- list.add(delegateKey);
- try {
- System.out.println(">>>> Controller:"+controller.getComponentName()+" Disabling Listener On Queue:"+endpoint.getEndpoint()+". Component's "+delegateKey+" Broker:"+getBrokerUrl()+" is Invalid");
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, this.getClass().getName(),
- "handleListenerSetupFailure", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_disabled_delegate_bad_broker__INFO",
- new Object[] { controller.getComponentName(), delegateKey, getBrokerUrl() });
- // Remove the delegate from the routing table.
- ((AggregateAnalysisEngineController) controller).disableDelegates(list);
- } catch (Exception e) {
- e.printStackTrace();
- }
- terminate = false; //just disable the delegate and continue
- }
- }
- }
- }
-
- System.out.println("****** Unable To Connect Listener To Broker:"+getBrokerUrl());
- if ( endpoint != null )
- {
- System.out.println("****** Closing Listener on Queue:"+endpoint.getEndpoint());
- }
- setRecoveryInterval(0);
-
- // Spin a shutdown thread to terminate listener.
- new Thread() {
- public void run()
- {
- try
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_disable_listener__WARNING",
- new Object[] { endpointName, getBrokerUrl() });
-
- shutdown();
- }
- catch( Exception e) { e.printStackTrace();}
- }
- }.start();
-
- if ( terminate )
- {
- // ****************************************
- // terminate the service
- // ****************************************
- System.out.println(">>>> Terminating Controller:"+controller.getComponentName()+" Unable To Initialize Listener Due to Invalid Broker URL:"+getBrokerUrl());
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
- "handleListenerSetupFailure", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_terminate_service_dueto_bad_broker__WARNING",
- new Object[] { controller.getComponentName(), getBrokerUrl() });
- controller.stop();
- controller.notifyListenersWithInitializationStatus(new ResourceInitializationException(t));
- }
- }
- else
- {
- super.handleListenerSetupFailure(t, false);
- }
- }
-
-
+ // If controller is stopping not need to recover the connection
+ if ( controller != null && controller.isStopped()) {
+ return;
+ }
+ t.printStackTrace();
+
+
+ if ( endpoint == null ) {
+
+ super.handleListenerSetupFailure(t, false);
+ terminate(t);
+ return;
+ }
+
+ synchronized( mux ) {
+ if ( !failed ) {
+ // Check if this listener is attached to a temp queue. If so, this is a listener
+ // on a reply queue. Handle temp queue listener failure differently than an
+ // input queue listener.
+ if ( endpoint.isTempReplyDestination()) {
+ handleTempQueueFailure(t);
+ } else {
+ // Handle non-temp queue failure
+ handleQueueFailure(t);
+ }
+ }
+ failed = true;
+ }
+ }
+
+ private void terminate(Throwable t) {
+ // ****************************************
+ // terminate the service
+ // ****************************************
+ System.out.println(">>>> Terminating Controller:"+controller.getComponentName()+" Unable To Initialize Listener Due to Invalid Broker URL:"+getBrokerUrl());
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),
+ "terminate", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_terminate_service_dueto_bad_broker__WARNING",
+ new Object[] { controller.getComponentName(), getBrokerUrl() });
+ controller.stop();
+ controller.notifyListenersWithInitializationStatus(new ResourceInitializationException(t));
}
-
protected void handleListenerException( Throwable t )
{
+ t.printStackTrace();
String endpointName =
(getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName();
@@ -277,6 +369,13 @@
if ( endpoint != null)
{
endpoint.setDestination(aDestination);
+ if ( aDestination instanceof TemporaryQueue ) {
+ endpoint.setTempReplyDestination(true);
+ System.out.println("Resolver Plugged In a Temp Queue:"+aDestination);
+ if ( getMessageListener() != null && getMessageListener() instanceof InputChannel ) {
+ ((JmsInputChannel)getMessageListener()).setListenerContainer(this);
+ }
+ }
endpoint.setServerURI(getBrokerUrl());
}
}
@@ -287,7 +386,8 @@
public void onException(JMSException arg0)
{
- String endpointName =
+ arg0.printStackTrace();
+ String endpointName =
(getDestination() == null) ? "" : ((ActiveMQDestination)getDestination()).getPhysicalName();
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(),