You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/10/21 14:11:54 UTC

svn commit: r1766028 - in /uima/uima-as/trunk: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-core/src/main/java/org/apache/uima/aae/controller/

Author: cwiklik
Date: Fri Oct 21 14:11:54 2016
New Revision: 1766028

URL: http://svn.apache.org/viewvc?rev=1766028&view=rev
Log:
UIMA-5123 Fixed a hang while recovering from a broker shutdown

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1766028&r1=1766027&r2=1766028&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Fri Oct 21 14:11:54 2016
@@ -19,6 +19,7 @@
 
 package org.apache.uima.adapter.jms.activemq;
 
+import java.net.ConnectException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -169,13 +170,13 @@ public class JmsEndpointConnection_impl
     return false;
   }
 
-  private void openChannel() throws AsynchAEException, ServiceShutdownException {
+  private void openChannel() throws AsynchAEException, ServiceShutdownException, ConnectException {
     openChannel(getServerUri(), componentName, endpoint, controller);
   }
 
   private void openChannel(String brokerUri, String aComponentName,
           String anEndpointName, AnalysisEngineController aController) throws AsynchAEException,
-          ServiceShutdownException {
+          ServiceShutdownException, ConnectException {
 	  synchronized (lock) {
 		    try {
 
@@ -234,10 +235,16 @@ public class JmsEndpointConnection_impl
 		              }
 		              // log connectivity problem once and retry
 		              boolean logConnectionProblem=true;
-		              
-		              // recover lost connection indefinitely while the service is running
-		              while( !controller.isStopped() ) {
 
+		              int retryCount = 4;  // 
+		              // recover lost connection indefinitely while the service is running
+//		              while( !controller.isStopped() ) {
+		              while( retryCount > 0 ) {
+		            	  retryCount--;
+		            	  if ( controller.isStopped() ) {
+		            		  break;
+		            	  }
+		            	  
 		            	  try {
 				              ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
 				              // White list packages for deserialization 
@@ -297,7 +304,7 @@ public class JmsEndpointConnection_impl
 		            		          
 		            		        }
 		            		  } 
-		            		 lock.wait(1000);  // wait between retries 
+		            		 lock.wait(5000);  // wait between retries 
 		            	  } catch ( Exception ee) {
 		            		  ee.printStackTrace();
 		            		  if ( conn != null  ) {
@@ -307,8 +314,11 @@ public class JmsEndpointConnection_impl
 		            		  }
 		            	  }
 		              } //while
-	            	  System.out.println("Service ...................... controller.isStopped() >>>> "+controller.isStopped());
 
+	            	  if ( retryCount == 0) {   // failed recovering a connection
+	            		  Thread.currentThread().dumpStack();
+	            		  throw new ConnectException("Unable to Create Connection to Broker:"+brokerUri);
+	            	  }
 		              if ( logConnectionProblem == false )  { // we had conectivity problem. Log the fact that it was recovered
 		            	  if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
             		          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
@@ -403,25 +413,27 @@ public class JmsEndpointConnection_impl
 		                  "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
 		                  "UIMAJMS_exception__WARNING", e);
 		        }
+		        if ( e instanceof ConnectException ) {
+		            throw (ConnectException)e;
+		        }
 
 		        if (e instanceof JMSException) {
 		          rethrow = handleJmsException((JMSException) e);
-
-		        }
+		        } 
+		        
 		        if (rethrow) {
 		          throw new AsynchAEException(e);
 		        }
 		      }
-		  
-	  }
+	  }  // synchronized
   }
 
-  public synchronized void open() throws AsynchAEException, ServiceShutdownException {
+  public synchronized void open() throws AsynchAEException, ServiceShutdownException, ConnectException {
     open(delegateEndpoint.getEndpoint(), serverUri);
   }
 
   public synchronized void open( String anEndpointName, String brokerUri) throws AsynchAEException,
-          ServiceShutdownException {
+          ServiceShutdownException, ConnectException {
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "open",
               JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open__FINE",
@@ -484,7 +496,7 @@ public class JmsEndpointConnection_impl
     this.serverUri = serverUri;
   }
 
-  public TextMessage produceTextMessage(String aTextMessage) throws AsynchAEException {
+  public TextMessage produceTextMessage(String aTextMessage) throws AsynchAEException, ConnectException {
 	synchronized( lock ) {
 		  if ( producerSession == null ) {
 		      throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
@@ -495,7 +507,7 @@ public class JmsEndpointConnection_impl
 		       } else {
 		          return producerSession.createTextMessage(aTextMessage);
 		       }
-		     } catch (javax.jms.IllegalStateException e) {
+		    } catch (javax.jms.IllegalStateException e) {
 		        try {
 		          open();
 		        } catch (ServiceShutdownException ex) {
@@ -511,14 +523,15 @@ public class JmsEndpointConnection_impl
 		        } catch (AsynchAEException ex) {
 		          throw ex;
 		        }
-		      } catch (Exception e) {
+		     
+		    } catch (Exception e) {
 		        throw new AsynchAEException(e);
 		    }
 		    throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
 	}
   }
 
-  public BytesMessage produceByteMessage(byte[] aSerializedCAS) throws AsynchAEException {
+  public BytesMessage produceByteMessage(byte[] aSerializedCAS) throws AsynchAEException, ConnectException {
     synchronized( lock ) {
         if ( producerSession == null ) {
             throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1766028&r1=1766027&r2=1766028&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Fri Oct 21 14:11:54 2016
@@ -149,6 +149,9 @@ public class JmsOutputChannel implements
     }
 
   }
+  public boolean isStopping() {
+	  return aborting;
+  }
   /**
    * Sets the ActiveMQ Broker URI
    */
@@ -632,7 +635,8 @@ public class JmsOutputChannel implements
 
     
     
-    } catch (InterruptedException e) {
+    }
+    catch (InterruptedException e) {
       } finally {
 	    connectionSemaphore.release();	  
       }
@@ -1711,10 +1715,12 @@ public class JmsOutputChannel implements
     } catch (ServiceShutdownException e) {
       throw e;
     } catch (AsynchAEException e) {
-      throw e;
+    	throw e;
+    } catch (ConnectException e) {
+		  casStateEntry.setDeliveryToClientFailed();
     } catch (Exception e) {
-      throw new AsynchAEException(e);
-    }
+        throw new AsynchAEException(e);
+      }
 
   }
 

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1766028&r1=1766027&r2=1766028&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Fri Oct 21 14:11:54 2016
@@ -2174,6 +2174,36 @@ public class AggregateAnalysisEngineCont
     	if ( cmOutstandingCASes.containsKey(casStateEntry.getCasReferenceId())) {
         	  cmOutstandingCASes.remove(casStateEntry.getCasReferenceId());
     	}
+		if ( casStateEntry.isSubordinate()) {
+			try {
+				
+				String inputCasId = casStateEntry.getInputCasReferenceId();
+				if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+							"sendReplyToRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+							"UIMAEE_force_cas_abort__INFO",
+							new Object[] { getComponentName(), "parent", inputCasId });
+			    }
+				
+				CasStateEntry parentCasStateEntry = getLocalCache().lookupEntry(inputCasId);
+				parentCasStateEntry.setFailed();
+				addAbortedCasReferenceId(inputCasId);
+				List<AnalysisEngineController> controllers = 
+							getChildControllerList();
+				for( AnalysisEngineController ctrl : controllers) {
+					ctrl.addAbortedCasReferenceId(inputCasId);
+				}
+			} catch( Exception e) {
+		        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+		                "sendReplyToRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+		                "UIMAEE_service_exception_WARNING", getComponentName());
+		        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+		                "sendReplyToRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+		                "UIMAEE_exception__WARNING", e);
+
+			}
+		}
+
     	if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(
                 Level.FINE,
@@ -2894,6 +2924,8 @@ public class AggregateAnalysisEngineCont
             flowControllerDescriptor, analysisEngineMetaDataMap, getUimaContextAdmin(),
             ((AnalysisEngineDescription) getResourceSpecifier()).getSofaMappings(), super
                     .getManagementInterface());
+//    super.addUimaObject(flowControllerContainer.getMBean().getUniqueMBeanName());
+    
     if (isTopLevelComponent()) {
       //  Add FC's meta
       getCasManagerWrapper().addMetadata((ProcessingResourceMetaData)flowControllerContainer.getMetaData());