You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2012/11/29 22:41:17 UTC

svn commit: r1415397 - /uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Author: cwiklik
Date: Thu Nov 29 21:41:17 2012
New Revision: 1415397

URL: http://svn.apache.org/viewvc?rev=1415397&view=rev
Log:
UIMA-2509 Use one listener thread to recover connection to a broker.

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1415397&r1=1415396&r2=1415397&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Thu Nov 29 21:41:17 2012
@@ -143,31 +143,45 @@ public class UimaDefaultMessageListenerC
    */
   protected void refreshConnectionUntilSuccessful() {
     boolean doLogFailureMsg = true;
-	    while (isRunning() && !terminating ) {
-	      try {
-	        if (sharedConnectionEnabled()) {
-	          refreshSharedConnection();
-	        }
-	        else {
-	          Connection con = createConnection();
-	          JmsUtils.closeConnection(con);
-	        }
-	        logger.info("Successfully refreshed JMS Connection");
-	        break;
-	      }
-	      catch (Exception ex) {
-	        if ( doLogFailureMsg ) {
-	          StringBuilder msg = new StringBuilder();
-	          msg.append("Could not refresh JMS Connection for destination '");
-	          msg.append(getDestinationDescription()).append("' - silently retrying in ");
-	          msg.append(5).append(" ms. Cause: ");
-	          msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
-	          logger.warn(msg);
-	          doLogFailureMsg = false;
-	        }
-	      }
-	      sleepInbetweenRecoveryAttempts();
-	    }	    
+    try {
+    	// Only one listener thread should enter to recover lost connection.
+    	// Seems like spring recovery api is not reentrant. If multiple listeners
+    	// are allowed to attempt recovery, some of them are closed. This is based
+    	// on observing jconsole attached to uima-as service with multiple listeners
+    	// on an endpoint.  
+    	synchronized(UimaDefaultMessageListenerContainer.class ) {
+            ActiveMQConnection con = (ActiveMQConnection)super.getSharedConnection();
+        	if ( con != null && con.isStarted() && !con.isTransportFailed() ) {
+        		return;
+        	}
+    	    while (isRunning() && !terminating ) {
+    		      try {
+    		        if (sharedConnectionEnabled()) {
+    		          refreshSharedConnection();
+    		        }
+    		        else {
+    		          Connection tcon = createConnection();
+    		          JmsUtils.closeConnection(tcon);
+    		        }
+    		        logger.info("Successfully refreshed JMS Connection");
+    		        break;
+    		      }
+    		      catch (Exception ex) {
+    		        if ( doLogFailureMsg ) {
+    		          StringBuilder msg = new StringBuilder();
+    		          msg.append("Could not refresh JMS Connection for destination '");
+    		          msg.append(getDestinationDescription()).append("' - silently retrying in ");
+    		          msg.append(5).append(" ms. Cause: ");
+    		          msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
+    		          logger.warn(msg);
+    		          doLogFailureMsg = false;
+    		        }
+    		      }
+    		      sleepInbetweenRecoveryAttempts();
+    		    }	    
+    	}
+    } catch( IllegalStateException e ) {
+    }
   }
   protected void recoverAfterListenerSetupFailure() {
 	  if ( !terminating ) {