You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/08/04 15:55:04 UTC
svn commit: r800795 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Author: cwiklik
Date: Tue Aug 4 13:55:04 2009
New Revision: 800795
URL: http://svn.apache.org/viewvc?rev=800795&view=rev
Log:
UIMA-1454 Improved connection failure handling. Adds failed endpoint to the DoNotProcess list
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=800795&r1=800794&r2=800795&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Tue Aug 4 13:55:04 2009
@@ -33,6 +33,7 @@
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.ConnectionFailedException;
import org.apache.activemq.advisory.ConsumerEvent;
import org.apache.activemq.advisory.ConsumerListener;
import org.apache.activemq.command.ActiveMQDestination;
@@ -172,7 +173,13 @@
brokerDestinations.setConnection(conn);
connectionCreationTimestamp = System.nanoTime();
}
- producerSession = brokerDestinations.getConnection().createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ Connection conn = brokerDestinations.getConnection();
+ if ( failed ) {
+ // Unable to create a connection
+ return;
+ }
+ producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
if ( (delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint ) && delegateEndpoint.getDestination() != null )
{
producer = producerSession.createProducer(null);
@@ -217,10 +224,15 @@
}
catch ( Exception e)
{
+ boolean rethrow = true;
+
if ( e instanceof JMSException ) {
- handleJmsException( (JMSException)e );
+ rethrow = handleJmsException( (JMSException)e );
+
+ }
+ if ( rethrow ) {
+ throw new AsynchAEException(e);
}
- throw new AsynchAEException(e);
}
}
public synchronized void open() throws AsynchAEException, ServiceShutdownException {
@@ -308,8 +320,7 @@
int retryCount = 4;
while (retryCount > 0)
{
- try
- {
+ try {
retryCount--;
if (aTextMessage == null)
@@ -321,23 +332,19 @@
return producerSession.createTextMessage(aTextMessage);
}
- }
- catch ( javax.jms.IllegalStateException e)
- {
- try
- {
+ } catch ( javax.jms.IllegalStateException e) {
+ try {
open();
- }
- catch ( ServiceShutdownException ex)
- {
+ } catch ( ServiceShutdownException ex) {
ex.printStackTrace();
- }
-
- }
- catch ( Exception e)
- {
+ } catch( AsynchAEException ex) {
+
+ throw ex;
+ }
+ }
+ catch ( Exception e) {
throw new AsynchAEException(e);
- }
+ }
}
throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
}
@@ -550,7 +557,10 @@
}
}
- private synchronized void handleJmsException( JMSException ex) {
+ private synchronized boolean handleJmsException( JMSException ex) {
+ if ( !failed ) {
+ failed = true;
+ }
if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
// Check if the exception is due to deleted queue. ActiveMQ does not identify
// this condition in the cause, so we need to parse the exception message and
@@ -565,19 +575,24 @@
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_failed_deleted_queue_INFO",
new Object[] { componentName, destName});
- return;
+ controller.addEndpointToDoNotProcessList(delegateEndpoint.getDestination().toString());
+ return false;
+
+ } if ( ex instanceof ConnectionFailedException && isReplyEndpoint ) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_connection_failure__INFO",
+ new Object[] { componentName, serverUri, delegateEndpoint.getDestination()});
+ controller.addEndpointToDoNotProcessList(delegateEndpoint.getDestination().toString());
+ return false;
+
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] {componentName, ex});
}
ex.printStackTrace();
}
-
- }
- if ( failed ) {
- return; // Already marked failed
}
- failed = true;
+ return true;
}
public void onConsumerEvent(ConsumerEvent arg0)
{