You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/06/24 21:01:24 UTC
svn commit: r788127 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
Author: cwiklik
Date: Wed Jun 24 19:01:24 2009
New Revision: 788127
URL: http://svn.apache.org/viewvc?rev=788127&view=rev
Log:
UIMA-1358 Modified handling of failed CASes
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=788127&r1=788126&r2=788127&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Wed Jun 24 19:01:24 2009
@@ -139,7 +139,9 @@
{
message.addObjectProperty(AsynchAEMessage.Cargo, wrapper);
}
- vmTransport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch( message );
+ if ( !aController.isStopped()) {
+ vmTransport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch( message );
+ }
}
else
{
@@ -152,7 +154,10 @@
parentCasReferenceId = topParentEntry.getCasReferenceId();
}
} catch ( Exception e){}
- aController.getOutputChannel().sendReply(t, aCasReferenceId, parentCasReferenceId, anEndpoint, AsynchAEMessage.Process);
+
+ if ( !aController.isStopped()) {
+ aController.getOutputChannel().sendReply(t, aCasReferenceId, parentCasReferenceId, anEndpoint, AsynchAEMessage.Process);
+ }
}
}
catch( Exception e)
@@ -457,6 +462,12 @@
}
}
}
+ // By default return exception to the client. The exception will not be returned if the CAS is
+ // a subordinate and the flow controller is *not* configured to continue with bad CAS. In such
+ // case, the code below will mark the parent CAS as failed. When all child CASes of the parent
+ // CAS are accounted for, it will be returned to the client with an exception.
+ boolean doSendReplyToClient = true;
+
// Check if the caller has already decremented number of subordinates. This property is only
// set in the Aggregate's finalStep() method before the CAS is sent back to the client. If
// there was a problem sending the CAS to the client, we dont want to update the counter
@@ -464,6 +475,7 @@
// to decrement the number of subordinates associated with the parent CAS.
if (!flowControllerContinueFlag && !anErrorContext.containsKey(AsynchAEMessage.SkipSubordinateCountUpdate))
{
+ doSendReplyToClient = false;
// Check if the CAS is a subordinate (has parent CAS).
if ( casStateEntry != null && casStateEntry.isSubordinate())
{
@@ -475,17 +487,44 @@
CacheEntry parentCasCacheEntry = aController.getInProcessCache().
getCacheEntryForCAS(parentCasReferenceId);
parentCasStateEntry = aController.getLocalCache().lookupEntry(parentCasReferenceId);
- synchronized( parentCasStateEntry )
+ String cmEndpointName = cacheEntry.getCasProducerKey();
+ String cmKey = ((AggregateAnalysisEngineController)aController).lookUpDelegateKey(cmEndpointName);
+ if ( cmKey == null) {
+ cmKey = cacheEntry.getCasProducerKey();
+ }
+ Delegate delegateCM =
+ ((AggregateAnalysisEngineController)aController).lookupDelegate(cmKey);
+ // The aggregate will return the input CAS when all child CASes are accounted for
+ synchronized( parentCasStateEntry )
{
- if ( parentCasStateEntry.getSubordinateCasInPlayCount() == 0 && parentCasStateEntry.isPendingReply())
- {
- // Complete processing of the Input CAS
- if ( flowControllerContinueFlag == false )
- {
- aController.process(parentCasCacheEntry.getCas(), parentCasCacheEntry.getCasReferenceId());
+ if ( !parentCasStateEntry.isFailed() ) {
+ CasStateEntry predecessorCas = parentCasStateEntry;
+ // Processing a failure of the child. Mark the parent CAS
+ // as failed. All child CASes will be dropped upon return
+ // from delegates. When all child CASes are dropped the
+ // aggregate will return an exception to the client containing
+ // the parent CAS id.
+ parentCasStateEntry.setFailed();
+ while( predecessorCas != null && predecessorCas.isSubordinate() ) {
+ predecessorCas =
+ aController.getLocalCache().lookupEntry(predecessorCas.getInputCasReferenceId());
+ predecessorCas.setFailed();
}
- } else {
- parentCasStateEntry.decrementSubordinateCasInPlayCount();
+ predecessorCas.addThrowable(t);
+ // Stop Cas Multiplier
+ ((AggregateAnalysisEngineController)aController).
+ stopCasMultiplier(delegateCM, parentCasCacheEntry.getCasReferenceId());
+ }
+ // Add the exception to the list of exceptions maintained by the parent CAS
+ parentCasStateEntry.addThrowable(t);
+ casStateEntry.setReplyReceived();
+ // Mark this CAS as failed
+ casStateEntry.setFailed();
+ if ( parentCasStateEntry.getSubordinateCasInPlayCount() == 0 && parentCasStateEntry.isPendingReply()) {
+ aController.process(parentCasCacheEntry.getCas(), parentCasCacheEntry.getCasReferenceId());
+ }
+ else {
+ aController.process(null, casStateEntry.getCasReferenceId());
}
}
}
@@ -494,7 +533,11 @@
// Input CAS doesnt exist. Nothing to update, move on
}
}
- }
+ } else if ( casStateEntry != null ){ // input CAS
+ casStateEntry.setFailed();
+ aController.process(null, casStateEntry.getCasReferenceId());
+ }
+ return true;
}
@@ -516,12 +559,15 @@
return true;
}
- else
+ else if ( doSendReplyToClient )
{
try
{
- // Dont send TimeoutExceptions to client
- if ( deliverExceptionToClient(t) ) {
+ // Send exception to the client if the exception happened while processing input CAS
+ // Child CASes that cause exceptions will be dropped, their parent CAS will be marked
+ // as failed and it will be returned back to the client in the final step once all child
+ // CASes are accounted for and dropped.
+ if ( casStateEntry != null && !casStateEntry.isSubordinate() && deliverExceptionToClient(t) ) {
sendExceptionToClient( t, casReferenceId, endpoint, aController );
}
}
@@ -546,7 +592,7 @@
if ( cacheEntry != null && aController.getName().equalsIgnoreCase(cacheEntry.getCasProducerAggregateName() ) )
{
// Fetch the key of the Cas Multiplier
- String casProducerKey = cacheEntry.getCasMultiplierKey();
+ String casProducerKey = cacheEntry.getCasProducerKey();
if ( casProducerKey != null )
{
// Create an endpoint object from the key. This object will be cloned from the Endpoint object
@@ -558,8 +604,7 @@
// If the Cas Multiplier is remote send a request to free a CAS with a given cas id
if ( cmEndpoint != null && cmEndpoint.isCasMultiplier() && cmEndpoint.isRemote() )
{
- cmEndpoint.setEndpoint(cmEndpoint.getEndpoint()+"__CasSync");
- aController.getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, cmEndpoint);
+ aController.getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, cacheEntry.getCasReferenceId(), cmEndpoint);
}
}
}