You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/08/04 15:55:04 UTC

svn commit: r800795 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java

Author: cwiklik
Date: Tue Aug  4 13:55:04 2009
New Revision: 800795

URL: http://svn.apache.org/viewvc?rev=800795&view=rev
Log:
UIMA-1454 Improved connection failure handling. Adds failed endpoint to the DoNotProcess list

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=800795&r1=800794&r2=800795&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Tue Aug  4 13:55:04 2009
@@ -33,6 +33,7 @@
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.ConnectionFailedException;
 import org.apache.activemq.advisory.ConsumerEvent;
 import org.apache.activemq.advisory.ConsumerListener;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -172,7 +173,13 @@
         brokerDestinations.setConnection(conn);
         connectionCreationTimestamp = System.nanoTime();
       }
-			producerSession = brokerDestinations.getConnection().createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+      Connection conn = brokerDestinations.getConnection();
+      if ( failed ) {
+        //  Unable to create a connection
+        return;
+      }
+      producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+			
 			if ( (delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint ) && delegateEndpoint.getDestination() != null )
 			{
 				producer = producerSession.createProducer(null); 
@@ -217,10 +224,15 @@
 		}
 		catch ( Exception e)
 		{
+		  boolean rethrow = true;
+		  
       if ( e instanceof JMSException ) {
-        handleJmsException( (JMSException)e );
+        rethrow = handleJmsException( (JMSException)e );
+        
+      } 
+      if ( rethrow ) {
+        throw new AsynchAEException(e);
       }
-			throw new AsynchAEException(e);
 		}
 	}
 	public synchronized void open() throws AsynchAEException, ServiceShutdownException {
@@ -308,8 +320,7 @@
 		int retryCount = 4;
 		while (retryCount > 0)
 		{
-			try
-			{
+			try	{
 				retryCount--;
 				
 				if (aTextMessage == null)
@@ -321,23 +332,19 @@
 					return producerSession.createTextMessage(aTextMessage);
 				}
 
-			}
-			catch ( javax.jms.IllegalStateException e)
-			{
-				try
-				{
+			}	catch ( javax.jms.IllegalStateException e) {
+				try	{
 					open();
-				}
-				catch ( ServiceShutdownException ex)
-				{
+				}	catch ( ServiceShutdownException ex) {
 					ex.printStackTrace();
-				}
-
-			}
-			catch ( Exception e)
-			{
+				} catch( AsynchAEException ex) {
+				  
+	        throw ex;
+	      }				
+			}	
+			catch ( Exception e) {
 				throw new AsynchAEException(e);
-			}
+			} 
 		}
 		throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
 	}
@@ -550,7 +557,10 @@
     }
 	}
 
-	private synchronized void handleJmsException( JMSException ex) {
+	private synchronized boolean handleJmsException( JMSException ex) {
+    if ( !failed ) {
+      failed = true;
+    }
     if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO) ) {
       //  Check if the exception is due to deleted queue. ActiveMQ does not identify
       //  this condition in the cause, so we need to parse the exception message and
@@ -565,19 +575,24 @@
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                 "handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_failed_deleted_queue_INFO",
                 new Object[] { componentName, destName});
-        return;
+        controller.addEndpointToDoNotProcessList(delegateEndpoint.getDestination().toString());
+        return false;
+
+      } if ( ex instanceof ConnectionFailedException && isReplyEndpoint ) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                "handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_connection_failure__INFO",
+                new Object[] { componentName, serverUri, delegateEndpoint.getDestination()});
+        controller.addEndpointToDoNotProcessList(delegateEndpoint.getDestination().toString());
+        return false;
+
       } else {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] {componentName, ex});
         }
         ex.printStackTrace();
       }
-        
-    }
-    if ( failed ) {
-      return;   // Already marked failed
     }
-    failed = true;
+    return true;
 	}
 	public void onConsumerEvent(ConsumerEvent arg0)
 	{