You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/05/28 19:19:17 UTC
svn commit: r779677 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
Author: cwiklik
Date: Thu May 28 17:19:17 2009
New Revision: 779677
URL: http://svn.apache.org/viewvc?rev=779677&view=rev
Log:
UIMA-1355 Modified to fix a bug while handling failures in parallel step. Fixes doubly incremented counter of delegates responded. Delayed processing of CASes on disabled delegate's pending lists.
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=779677&r1=779676&r2=779677&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Thu May 28 17:19:17 2009
@@ -183,7 +183,7 @@
public boolean handleError(Throwable t, ErrorContext anErrorContext, AnalysisEngineController aController)
{
org.apache.uima.aae.controller.LocalCache.CasStateEntry parentCasStateEntry = null;
-
+ String delegateKey = null;
if ( !isHandlerForError(anErrorContext, AsynchAEMessage.Process))
{
return false;
@@ -257,6 +257,7 @@
String key = "";
Threshold threshold = null;
boolean delegateDisabled = false;
+ Delegate delegate = null;
// R E T R Y
// Do retry first if this an Aggregate Controller
if ( !isEndpointTheClient && aController instanceof AggregateAnalysisEngineController )
@@ -266,11 +267,13 @@
if ( anErrorContext.get(AsynchAEMessage.Endpoint) != null )
{
endpoint = (Endpoint) anErrorContext.get(AsynchAEMessage.Endpoint);
+ key = ((AggregateAnalysisEngineController)aController).lookUpDelegateKey(endpoint.getEndpoint());
+ delegate = ((AggregateAnalysisEngineController)aController).lookupDelegate(key);
}
threshold = super.getThreshold(endpoint, delegateMap, aController);
if ( endpoint != null )
{
- key = ((AggregateAnalysisEngineController)aController).lookUpDelegateKey(endpoint.getEndpoint());
+ // key = ((AggregateAnalysisEngineController)aController).lookUpDelegateKey(endpoint.getEndpoint());
delegateDisabled = ((AggregateAnalysisEngineController)aController).isDelegateDisabled(key);
if ( threshold != null && threshold.getMaxRetries() > 0 && !delegateDisabled)
{
@@ -290,10 +293,10 @@
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, getClass().getName(), "handleError", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_threshold_for_endpoint__CONFIG", new Object[] { aController.getComponentName(), "Process", key });
}
}
- if ( key != null ) {
+ if ( delegate != null ) {
// Received reply from the delegate. Remove the CAS from the
// delegate's list of CASes pending reply
- Delegate delegate = ((AggregateAnalysisEngineController)aController).lookupDelegate(key);
+ // Delegate delegate = ((AggregateAnalysisEngineController)aController).lookupDelegate(key);
delegate.removeCasFromOutstandingList(casReferenceId);
}
}
@@ -321,7 +324,8 @@
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_retries_exceeded__FINE", new Object[] { aController.getComponentName(), key, casReferenceId });
}
}
-
+ boolean disabledDueToExceededThreshold = false;
+
// Dont increment errors for destinations that are clients of this service.
if ( key != null && !aController.isStopped() && (isRequest || !isEndpointTheClient ) )
{
@@ -359,6 +363,16 @@
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "handleError",
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_process_cas_exceeded_threshold__INFO", new Object[] { aController.getComponentName(), key, casReferenceId, threshold.getThreshold(), threshold.getAction() });
}
+ // Add new property to skip handling of CASes in pending lists. Those CASes
+ // will be handled later in this method, once we complete processing of the CAS
+ // that caused the exception currently being processed. During handling of the
+ // CASes in pending state, this error handler is called for each CAS to force
+ // its timeout.
+ disabledDueToExceededThreshold = ErrorHandler.DISABLE.equalsIgnoreCase(threshold.getAction());
+ if ( disabledDueToExceededThreshold ) {
+ delegateKey = key;
+ anErrorContext.add( AsynchAEMessage.SkipPendingLists, "true");
+ }
aController.takeAction(threshold.getAction(), key, anErrorContext);
}
}
@@ -383,7 +397,6 @@
}
}
}
-
int totalNumberOfParallelDelegatesProcessingCas = 1; // default
CacheEntry cacheEntry = null;
CasStateEntry casStateEntry = null;
@@ -400,10 +413,15 @@
catch( Exception e) {}
// Determine where to send the message
Endpoint endpoint = getDestination(aController, anErrorContext);
- // If the error occured during parallel step, treat the exception as response from the delegate
+ // If the error happened during a parallel step, treat the exception as response from the delegate
// When all responses from delegates are accounted for we allow the CAS to move on to the next
- // step in the flow
- if ( casStateEntry != null && totalNumberOfParallelDelegatesProcessingCas > 1 && ( casStateEntry.howManyDelegatesResponded() < totalNumberOfParallelDelegatesProcessingCas))
+ // step in the flow. Dont increment parallel delegate response count if a delegate was just
+ // disabled above. The count has been already incremented in handleAction() method of the
+ // AnalysisEngineController.
+ if ( !disabledDueToExceededThreshold &&
+ casStateEntry != null &&
+ totalNumberOfParallelDelegatesProcessingCas > 1 &&
+ ( casStateEntry.howManyDelegatesResponded() < totalNumberOfParallelDelegatesProcessingCas))
{
casStateEntry.incrementHowManyDelegatesResponded();
}
@@ -474,12 +492,19 @@
if ( threshold != null && flowControllerContinueFlag )
{
+ // The Exception has been almost fully handled. Check if the delegate was disabled above.
+ // If it was, we need to force timeout on all CASes in pending state associated with that
+ // delegate.
+ if ( disabledDueToExceededThreshold && delegateKey != null) {
+ aController.forceTimeoutOnPendingCases(delegateKey);
+ }
if (totalNumberOfParallelDelegatesProcessingCas == 1 || ( casStateEntry.howManyDelegatesResponded() == totalNumberOfParallelDelegatesProcessingCas) )
{
aController.process(aController.getInProcessCache().getCasByReference(casReferenceId), casReferenceId);
}
// Dont send request to release the CAS to remote CM. This will happen in the final step. We are continuing
// despite the error here.
+
return true;
}
else
@@ -556,7 +581,13 @@
}
}
}
-
+ // The Exception has been almost fully handled. Check if the delegate was disabled above.
+ // If it was, we need to force timeout on all CASes in pending state associated with that
+ // delegate.
+ if ( disabledDueToExceededThreshold && delegateKey != null) {
+ aController.forceTimeoutOnPendingCases(delegateKey);
+ }
+
try
{
// Only top level component can Drop the CAS.
@@ -594,7 +625,8 @@
}
private boolean deliverExceptionToClient( Throwable t) {
// Dont send TimeOutExceptions to client
- if ( t instanceof UimaEEServiceException && t.getCause() != null && t.getCause() instanceof MessageTimeoutException) {
+ if ( t instanceof MessageTimeoutException ||
+ (t instanceof UimaEEServiceException && t.getCause() != null && t.getCause() instanceof MessageTimeoutException) ) {
return false;
}
return true;