You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/06/24 21:01:24 UTC

svn commit: r788127 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java

Author: cwiklik
Date: Wed Jun 24 19:01:24 2009
New Revision: 788127

URL: http://svn.apache.org/viewvc?rev=788127&view=rev
Log:
UIMA-1358 Modified handling of failed CASes

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=788127&r1=788126&r2=788127&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Wed Jun 24 19:01:24 2009
@@ -139,7 +139,9 @@
 		        {
               message.addObjectProperty(AsynchAEMessage.Cargo, wrapper);
 		        }
-		        vmTransport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch( message );
+		        if ( !aController.isStopped()) {
+	            vmTransport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch( message );
+		        }
 			   }
 			   else
 			   {
@@ -152,7 +154,10 @@
                 parentCasReferenceId = topParentEntry.getCasReferenceId();
               }
             } catch ( Exception e){}
-		        aController.getOutputChannel().sendReply(t, aCasReferenceId, parentCasReferenceId, anEndpoint, AsynchAEMessage.Process);
+            
+            if ( !aController.isStopped()) {
+              aController.getOutputChannel().sendReply(t, aCasReferenceId, parentCasReferenceId, anEndpoint, AsynchAEMessage.Process);
+            }
 			   }
 			}
 			catch( Exception e)
@@ -457,6 +462,12 @@
           }
 			  }
 			}
+			// By default return exception to the client. The exception will not be returned if the CAS is
+			// a subordinate and the flow controller is *not* configured to continue with bad CAS. In such
+			// case, the code below will mark the parent CAS as failed. When all child CASes of the parent
+			// CAS are accounted for, it will be returned to the client with an exception.
+			boolean doSendReplyToClient = true;
+			
 			//	Check if the caller has already decremented number of subordinates. This property is only
 			//	set in the Aggregate's finalStep() method before the CAS is sent back to the client. If
 			//	there was a problem sending the CAS to the client, we dont want to update the counter 
@@ -464,6 +475,7 @@
 			//	to decrement the number of subordinates associated with the parent CAS.
 			if (!flowControllerContinueFlag && !anErrorContext.containsKey(AsynchAEMessage.SkipSubordinateCountUpdate)) 
 			{
+        doSendReplyToClient = false;
 				//	Check if the CAS is a subordinate (has parent CAS).
 				if ( casStateEntry != null && casStateEntry.isSubordinate())
 				{
@@ -475,17 +487,44 @@
               CacheEntry parentCasCacheEntry = aController.getInProcessCache().
                 getCacheEntryForCAS(parentCasReferenceId);
 						  parentCasStateEntry = aController.getLocalCache().lookupEntry(parentCasReferenceId);
-              synchronized( parentCasStateEntry )
+						  String cmEndpointName = cacheEntry.getCasProducerKey();
+              String cmKey = ((AggregateAnalysisEngineController)aController).lookUpDelegateKey(cmEndpointName);
+						  if ( cmKey == null) {
+                cmKey = cacheEntry.getCasProducerKey();
+						  }
+						  Delegate delegateCM = 
+						    ((AggregateAnalysisEngineController)aController).lookupDelegate(cmKey);
+						  //  The aggregate will return the input CAS when all child CASes are accounted for
+						  synchronized( parentCasStateEntry )
               {
-                if ( parentCasStateEntry.getSubordinateCasInPlayCount() == 0 && parentCasStateEntry.isPendingReply())
-                {
-                  //  Complete processing of the Input CAS
-                  if ( flowControllerContinueFlag ==  false )
-                  {
-                    aController.process(parentCasCacheEntry.getCas(), parentCasCacheEntry.getCasReferenceId());
+                if ( !parentCasStateEntry.isFailed() ) {
+                  CasStateEntry predecessorCas = parentCasStateEntry;
+                  //  Processing a failure of the child. Mark the parent CAS
+                  //  as failed. All child CASes will be dropped upon return
+                  //  from delegates. When all child CASes are dropped the 
+                  //  aggregate will return an exception to the client containing
+                  //  the parent CAS id.
+                  parentCasStateEntry.setFailed();
+                  while( predecessorCas != null && predecessorCas.isSubordinate() ) {
+                    predecessorCas = 
+                      aController.getLocalCache().lookupEntry(predecessorCas.getInputCasReferenceId());
+                    predecessorCas.setFailed();
                   }
-                } else {
-                  parentCasStateEntry.decrementSubordinateCasInPlayCount();
+                  predecessorCas.addThrowable(t);
+                  //  Stop Cas Multiplier
+                  ((AggregateAnalysisEngineController)aController).
+                    stopCasMultiplier(delegateCM, parentCasCacheEntry.getCasReferenceId());
+                }
+                //  Add the exception to the list of exceptions maintained by the parent CAS
+                parentCasStateEntry.addThrowable(t);
+                casStateEntry.setReplyReceived();
+                //  Mark this CAS as failed
+                casStateEntry.setFailed();
+                if ( parentCasStateEntry.getSubordinateCasInPlayCount() == 0 && parentCasStateEntry.isPendingReply()) {
+                  aController.process(parentCasCacheEntry.getCas(), parentCasCacheEntry.getCasReferenceId());
+                } 
+                else {
+                  aController.process(null, casStateEntry.getCasReferenceId());
                 }
               }
 						}
@@ -494,7 +533,11 @@
 							//	Input CAS doesnt exist. Nothing to update, move on
 						}
 					}
-				}
+				} else if ( casStateEntry != null ){  // input CAS
+	        casStateEntry.setFailed();
+	        aController.process(null, casStateEntry.getCasReferenceId());
+	      }
+				return true;
 			}
 			
 			
@@ -516,12 +559,15 @@
 
 				return true;
 			}
-			else
+			else if ( doSendReplyToClient )
 			{
 				try
 				{
-				  //  Dont send TimeoutExceptions to client
-				  if ( deliverExceptionToClient(t) ) {
+				  //  Send exception to the client if the exception happened while processing input CAS
+				  //  Child CASes that cause exceptions will be dropped, their parent CAS will be marked
+				  //  as failed and it will be returned back to the client in the final step once all child
+				  //  CASes are accounted for and dropped.
+				  if ( casStateEntry != null && !casStateEntry.isSubordinate() && deliverExceptionToClient(t) ) {
 				    sendExceptionToClient( t, casReferenceId, endpoint, aController );
 				  }
 				}
@@ -546,7 +592,7 @@
 				if ( cacheEntry != null && aController.getName().equalsIgnoreCase(cacheEntry.getCasProducerAggregateName() ) )
 				{
 					//	Fetch the key of the Cas Multiplier
-					String casProducerKey = cacheEntry.getCasMultiplierKey();
+          String casProducerKey = cacheEntry.getCasProducerKey();
 					if ( casProducerKey != null )
 					{
 						//	Create an endpoint object from the key. This object will be cloned from the Endpoint object
@@ -558,8 +604,7 @@
 						//	If the Cas Multiplier is remote send a request to free a CAS with a given cas id
 						if ( cmEndpoint != null && cmEndpoint.isCasMultiplier() && cmEndpoint.isRemote() )
 						{
-							cmEndpoint.setEndpoint(cmEndpoint.getEndpoint()+"__CasSync");
-							aController.getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, cmEndpoint);
+							aController.getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, cacheEntry.getCasReferenceId(), cmEndpoint);
 						}
 					}
 				}