You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/05/28 19:19:17 UTC

svn commit: r779677 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java

Author: cwiklik
Date: Thu May 28 17:19:17 2009
New Revision: 779677

URL: http://svn.apache.org/viewvc?rev=779677&view=rev
Log:
UIMA-1355 Modified to fix a bug while handling failures in parallel step. Fixes doubly incremented counter of delegates responded. Delayed processing of CASes on disabled delegate's pending lists.

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=779677&r1=779676&r2=779677&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Thu May 28 17:19:17 2009
@@ -183,7 +183,7 @@
 	public boolean handleError(Throwable t, ErrorContext anErrorContext, AnalysisEngineController aController)
 	{
     org.apache.uima.aae.controller.LocalCache.CasStateEntry parentCasStateEntry = null;
-		
+    String delegateKey = null;
 		if ( !isHandlerForError(anErrorContext, AsynchAEMessage.Process))
 		{
 			return false;
@@ -257,6 +257,7 @@
 		String key = ""; 
 		Threshold threshold = null;
 		boolean delegateDisabled = false;
+		Delegate delegate = null;
 		//                       R E T R Y
 		//	Do retry first if this an Aggregate Controller
 		if ( !isEndpointTheClient && aController instanceof AggregateAnalysisEngineController )
@@ -266,11 +267,13 @@
 			if ( anErrorContext.get(AsynchAEMessage.Endpoint) != null )
 			{
 				endpoint = (Endpoint) anErrorContext.get(AsynchAEMessage.Endpoint);
+        key = ((AggregateAnalysisEngineController)aController).lookUpDelegateKey(endpoint.getEndpoint());
+        delegate = ((AggregateAnalysisEngineController)aController).lookupDelegate(key);
 			}
 			threshold = super.getThreshold(endpoint, delegateMap, aController);
 			if ( endpoint != null )
 			{
-		    	key = ((AggregateAnalysisEngineController)aController).lookUpDelegateKey(endpoint.getEndpoint());
+		    //	key = ((AggregateAnalysisEngineController)aController).lookUpDelegateKey(endpoint.getEndpoint());
 		    	delegateDisabled = ((AggregateAnalysisEngineController)aController).isDelegateDisabled(key);
 		    	if ( threshold != null && threshold.getMaxRetries() > 0 && !delegateDisabled)
 		    	{
@@ -290,10 +293,10 @@
 		           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, getClass().getName(), "handleError", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_threshold_for_endpoint__CONFIG", new Object[] { aController.getComponentName(), "Process",  key });
 		         }
 		    	}
-		    	if ( key != null ) {
+		    	if ( delegate != null ) {
 		    		//	Received reply from the delegate. Remove the CAS from the 
 		    		//	delegate's list of CASes pending reply
-	          Delegate delegate = ((AggregateAnalysisEngineController)aController).lookupDelegate(key);
+	        //  Delegate delegate = ((AggregateAnalysisEngineController)aController).lookupDelegate(key);
 	          delegate.removeCasFromOutstandingList(casReferenceId);
 		    	}
 			}
@@ -321,7 +324,8 @@
 					UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_retries_exceeded__FINE", new Object[] { aController.getComponentName(), key, casReferenceId });
       }
 		}
-
+		boolean disabledDueToExceededThreshold = false;
+		
 		//	Dont increment errors for destinations that are clients of this service.
 		if ( key != null && !aController.isStopped() && (isRequest || !isEndpointTheClient ) )
 		{
@@ -359,6 +363,16 @@
               UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "handleError", 
 								UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_process_cas_exceeded_threshold__INFO", new Object[] { aController.getComponentName(), key, casReferenceId, threshold.getThreshold(), threshold.getAction() });
             }
+            //  Add new property to skip handling of CASes in pending lists. Those CASes
+            //  will be handled later in this method, once we complete processing of the CAS
+            //  that caused the exception currently being processed. During handling of the
+            //  CASes in pending state, this error handler is called for each CAS to force
+            //  its timeout. 
+            disabledDueToExceededThreshold = ErrorHandler.DISABLE.equalsIgnoreCase(threshold.getAction());
+            if ( disabledDueToExceededThreshold ) {
+              delegateKey = key;
+              anErrorContext.add( AsynchAEMessage.SkipPendingLists, "true");
+            }
 						aController.takeAction(threshold.getAction(), key, anErrorContext);
 					}
 				}
@@ -383,7 +397,6 @@
         }
 			}
 		}
-		
 		int totalNumberOfParallelDelegatesProcessingCas = 1; // default
 		CacheEntry cacheEntry = null;
 		CasStateEntry casStateEntry = null;
@@ -400,10 +413,15 @@
 		catch( Exception e) {}
 		//	Determine where to send the message
 		Endpoint endpoint = getDestination(aController, anErrorContext);
-		//	If the error occured during parallel step, treat the exception as response from the delegate
+		//	If the error happened during a parallel step, treat the exception as response from the delegate
 		//	When all responses from delegates are accounted for we allow the CAS to move on to the next
-		//	step in the flow
-		if ( casStateEntry != null && totalNumberOfParallelDelegatesProcessingCas > 1 && ( casStateEntry.howManyDelegatesResponded() < totalNumberOfParallelDelegatesProcessingCas))
+		//	step in the flow. Dont increment parallel delegate response count if a delegate was just
+		//  disabled above. The count has been already incremented in handleAction() method of the 
+		//  AnalysisEngineController.
+    if ( !disabledDueToExceededThreshold  &&
+		     casStateEntry != null && 
+		     totalNumberOfParallelDelegatesProcessingCas > 1 && 
+		     ( casStateEntry.howManyDelegatesResponded() < totalNumberOfParallelDelegatesProcessingCas))
 		{
 			casStateEntry.incrementHowManyDelegatesResponded();
 		}
@@ -474,12 +492,19 @@
 			
 			if ( threshold != null && flowControllerContinueFlag )
 			{
+		    //  The Exception has been almost fully handled. Check if the delegate was disabled above.
+		    //  If it was, we need to force timeout on all CASes in pending state associated with that
+		    //  delegate.
+        if ( disabledDueToExceededThreshold && delegateKey != null) {
+          aController.forceTimeoutOnPendingCases(delegateKey);
+        }   
 				if (totalNumberOfParallelDelegatesProcessingCas == 1 || ( casStateEntry.howManyDelegatesResponded() == totalNumberOfParallelDelegatesProcessingCas) )
 				{
 					aController.process(aController.getInProcessCache().getCasByReference(casReferenceId), casReferenceId);
 				}
 				//	Dont send request to release the CAS to remote CM. This will happen in the final step. We are continuing
 				//	despite the error here.
+
 				return true;
 			}
 			else
@@ -556,7 +581,13 @@
         }
 			}
 		}
-
+		//  The Exception has been almost fully handled. Check if the delegate was disabled above.
+		//  If it was, we need to force timeout on all CASes in pending state associated with that
+		//  delegate.
+    if ( disabledDueToExceededThreshold && delegateKey != null) {
+      aController.forceTimeoutOnPendingCases(delegateKey);
+    }		
+		
 		try
 		{
 			//	Only top level component can Drop the CAS. 
@@ -594,7 +625,8 @@
 	}
 	private boolean deliverExceptionToClient( Throwable t) {
     //  Dont send TimeOutExceptions to client
-	  if ( t instanceof UimaEEServiceException && t.getCause() != null && t.getCause() instanceof MessageTimeoutException) {
+	  if ( t instanceof MessageTimeoutException ||
+	       (t instanceof UimaEEServiceException && t.getCause() != null && t.getCause() instanceof MessageTimeoutException) ) {
       return false;
     }
     return true;