You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/06/24 20:53:24 UTC

svn commit: r788121 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java

Author: cwiklik
Date: Wed Jun 24 18:53:23 2009
New Revision: 788121

URL: http://svn.apache.org/viewvc?rev=788121&view=rev
Log:
UIMA-1358 Returns parent CAS to the client on exception

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=788121&r1=788120&r2=788121&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Wed Jun 24 18:53:23 2009
@@ -931,7 +931,66 @@
 		}
 		
 	}
+	private boolean abortProcessingCas(CasStateEntry casStateEntry, CacheEntry entry ) { 
+    CasStateEntry parentCasStateEntry = null;
+    try {
+      //  Check if this CAS has a parent
+      if ( casStateEntry.isSubordinate() ) { 
+        //  Fetch parent's cache entry
+        parentCasStateEntry = getLocalCache().lookupEntry(casStateEntry.getInputCasReferenceId());
+        //  Check the state of the parent CAS. If it is marked as failed, it means that
+        //  one of its child CASes failed and error handling was configured to fail the
+        //  CAS. Such failure of a child CAS causes a failure of the parent CAS. All child
+        //  CASes will be dropped in finalStep() as they come back from delegates. When all are 
+        //  accounted for and dropped, the parent CAS will be returned back to the client
+        //  with an exception.
+        if ( parentCasStateEntry.isFailed()) {
+          //  Fetch Delegate object for the CM that produced the CAS. The producer key
+          //  is associated with a cache entry in the ProcessRequestHandler. Each new CAS
+          //  must have a key of a CM that produced it.
+          Delegate delegateCM = lookupDelegate(entry.getCasProducerKey());
+          if ( delegateCM != null && delegateCM.getEndpoint().isCasMultiplier() )
+          {
+            //  If the delegate CM is a remote, send a Free CAS notification 
+            if ( delegateCM.getEndpoint().isRemote()) {
+              getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, entry.getCasReferenceId(), delegateCM.getNotificationEndpoint());
+            }
+            //  Check if a request to stop generation of new CASes from the parent of
+            //  this CAS has been sent to the CM. The Delegate object keeps track of
+            //  requests to STOP that are sent to the CM. Only one STOP is needed.
+            if ( delegateCM.isGeneratingChildrenFrom(parentCasStateEntry.getCasReferenceId())){
+              //  Issue a request to the CM to stop producing new CASes from a given input
+              //  CAS
+              stopCasMultiplier(delegateCM, parentCasStateEntry.getCasReferenceId());
+            }
+          }
+			    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+      				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "abortProcessingCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_forcing_cas_to_finalstep__FINE", new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), casStateEntry.getSubordinateCasInPlayCount() });
+	    		}
+			    casStateEntry.setReplyReceived();
+          //  Force the CAS to go to the Final Step where it will be dropped
+          finalStep( new FinalStep(), casStateEntry.getCasReferenceId());
+          return true;  // Done here
+        }
+      } else if ( casStateEntry.isFailed() ) {
+			    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+      				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "abortProcessingCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_forcing_cas_to_finalstep__FINE", new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), casStateEntry.getSubordinateCasInPlayCount() });
+	    		}
+          casStateEntry.setReplyReceived();
+        //  move this CAS to the final step
+        finalStep( new FinalStep(), casStateEntry.getCasReferenceId());
+        return true;
+      }
+      
+    } catch ( Exception e) {
+    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "abortProcessingCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+    }
+    e.printStackTrace();
+  }
 
+	  return false;
+	}
 	/**
 	 * This is a process method that is executed for CASes not created by a Multiplier in this aggregate.
 	 * 
@@ -950,6 +1009,14 @@
       try {
         CacheEntry entry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
         CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
+        //  Check if this CAS should be aborted due to previous error on this CAS or its
+        //  parent. If this is the case the method will move the CAS to the final state
+        //  where it will be dropped. If the CAS is an input CAS, it will be returned to
+        //  the client with an exception
+        if ( abortProcessingCas( casStateEntry, entry )) {
+          //  This CAS was aborted, we are done here
+          return;
+        }
         //  Check if this is an input CAS from the client. If not, check if last
         //	delegate handling this CAS was a Cas Multiplier configured to process
         //	parent CAS last
@@ -1076,7 +1143,6 @@
 		                 "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_executing_step_input_cas__FINEST",
 		                 new Object[] {getComponentName(), aCasReferenceId });
 		      }
-
 					// Execute a step in the flow. false means that this CAS has not
 					// been produced by CAS Multiplier
 					executeFlowStep(flow, aCasReferenceId, false);
@@ -1110,13 +1176,17 @@
 			if ( endpoint != null )
 			{
 				endpoint.setController(this);
-				CacheEntry entry = 
-					getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
         CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
 
-				if ( endpoint.isCasMultiplier() && endpoint.isRemote() )
+				if ( endpoint.isCasMultiplier() )
 				{
-					entry.setCasMultiplierKey(analysisEngineKey);
+					Delegate delegateCM = lookupDelegate(analysisEngineKey);
+					delegateCM.setGeneratingChildrenFrom(aCasReferenceId, true);
+					// Record the outgoing CAS. CASes destined for remote CM are recorded
+					// in JmsOutputchannel. 
+					if ( !endpoint.isRemote() ) {
+	          delegateCM.addNewCasToOutstandingList(aCasReferenceId, true);
+					}
 				}
 				
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
@@ -1397,6 +1467,7 @@
     boolean casDropped = false;
     boolean doDecrementChildCount = false;
     localCache.dumpContents();
+
     //  First locate entries in the global and local cache for a given CAS
     //  If not found, log a message and return
 		try
@@ -1410,8 +1481,11 @@
 	      //  may still have children and will not be returned to the client until
 	      //  all of them are fully processed. This state info will aid in the
 	      //  internal bookkeeping when the final child is processed.
-	      casStateEntry.setState(CacheEntry.FINAL_STATE);
         casStateEntry.setFinalStep(aStep);
+	      casStateEntry.setState(CacheEntry.FINAL_STATE);
+        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_in_finalstep__FINE", new Object[] { getComponentName(),casStateEntry.getCasReferenceId(),casStateEntry.getSubordinateCasInPlayCount() });
+        }
 			}
 		}
 		catch(Exception e)
@@ -1436,10 +1510,11 @@
 			  }
 			  // Check if this CAS has children that are still being processed in this aggregate
 			  if ( casHasChildrenInPlay(casStateEntry)) {
-			    // save this Step object in the local cache. It will be needed when this CAS is
-			    // resumed when all its children are fully processed
-//          casStateEntry.setFinalStep(aStep);
-
+			     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_has_children__FINE", new Object[] { getComponentName(),casStateEntry.getCasReferenceId(),casStateEntry.getCasReferenceId(),casStateEntry.getSubordinateCasInPlayCount() });
+           }
+			  
+			    replySentToClient = false;
           return;
         }
         if (UIMAFramework.getLogger().isLoggable(Level.FINEST) )
@@ -1460,7 +1535,7 @@
         }
 	      //  If the CAS was generated by this component but the Flow Controller wants to drop it OR this component
 	      //  is not a Cas Multiplier
-        if (forceToDropTheCas(cacheEntry, aStep)) {
+        if ( forceToDropTheCas(parentCasStateEntry, cacheEntry, aStep)) {
           if (casStateEntry.isReplyReceived()) {
             if (isSubordinate) {
               //  drop the flow since we no longer need it
@@ -1480,7 +1555,7 @@
           } else {
             doDecrementChildCount = false;
           }
-        } else {
+        } else if ( !casStateEntry.isDropped() ) {
           casStateEntry.setWaitingForRelease(true);
           // Send a reply to the Client. If the CAS is an input CAS it will be dropped
           cEndpoint = replyToClient(cacheEntry, casStateEntry);
@@ -1496,24 +1571,37 @@
           } else {
             // Remove entry from the local cache for this CAS. If the client
             // is remote the entry was removed in replyToClient()
+            try {
+              localCache.lookupEntry(aCasReferenceId).setDropped(true);
+            } catch( Exception e) {}
             localCache.remove(aCasReferenceId);
           }
           // If debug level=FINEST dump the entire cache
           localCache.dumpContents();
         }
 
-	      if ( doDecrementChildCount ) {
-          if ( parentCasStateEntry == null ) {
-             parentCasStateEntry = localCache.lookupEntry(casStateEntry.getInputCasReferenceId());
-          }
+        if ( parentCasStateEntry == null && isSubordinate ) {
+          parentCasStateEntry = localCache.lookupEntry(casStateEntry.getInputCasReferenceId());
+       }
+       if ( doDecrementChildCount ) {
           // Child CAS has been fully processed, decrement its parent count of active child CASes
           if ( parentCasStateEntry != null ) {
              parentCasStateEntry.decrementSubordinateCasInPlayCount();
              // If debug level=FINEST dump the entire cache
              localCache.dumpContents();
+	  		     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_decremented_child_count__FINE", new Object[] { getComponentName(),casStateEntry.getCasReferenceId(),casStateEntry.getCasReferenceId(),casStateEntry.getSubordinateCasInPlayCount() });
+             }
           }
 	      }
+       
 	      boolean clientIsCollocated = ( cEndpoint == null || !cEndpoint.isRemote());
+	      
+	      if ( parentCasStateEntry != null && 
+	           parentCasStateEntry.getSubordinateCasInPlayCount() == 0 && 
+	           parentCasStateEntry.isFailed() ) {
+	        parentCasStateEntry.setReplyReceived();
+	      }
 	      // For subordinate CAS, check if its parent needs to be put in play. This should happen if 
 	      // this CAS was the last of the children in play
 	      if ( isSubordinate && releaseParentCas(casDropped, clientIsCollocated, parentCasStateEntry) )
@@ -1541,6 +1629,7 @@
 		}
 		catch( Exception e)
 		{
+		  e.printStackTrace();
 		  HashMap map = new HashMap();
 			map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
 			map.put(AsynchAEMessage.CasReference, aCasReferenceId);
@@ -1608,12 +1697,15 @@
     }
     return retValue;
   }
-	private boolean forceToDropTheCas( CacheEntry cacheEntry, FinalStep aStep)
+	private boolean forceToDropTheCas( CasStateEntry entry, CacheEntry cacheEntry, FinalStep aStep)
 	{
 		//	Get the key of the Cas Producer
 		String casProducer = cacheEntry.getCasProducerAggregateName();
 		//	CAS is considered new from the point of view of this service IF it was produced by it
 		boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(casProducer));
+		if ( entry != null && entry.isFailed() && isNewCas ) {
+		  return true; // no point to continue if the CAS was produced in this aggregate and its parent failed here
+		}
 		//	If the CAS was generated by this component but the Flow Controller wants to drop the CAS OR this component
 		//	is not a Cas Multiplier
 		if (  isNewCas && ( aStep.getForceCasToBeDropped() || !isCasMultiplier()) )
@@ -1622,117 +1714,197 @@
 		}
 		return false;
 	}
+  private void sendReplyWithException( CacheEntry acacheEntry, CasStateEntry casStateEntry, Endpoint replyEndpoint) throws Exception {
+    //boolean casProducedInThisAggregate = getComponentName().equals(cacheEntry.getCasProducerAggregateName());
+    if ( casStateEntry.isSubordinate()) {
+      //  We must reply with the input CAS
+      //casStateEntry = getLocalCache().lookupEntry(casStateEntry.getInputCasReferenceId());
+      casStateEntry =  getLocalCache().getTopCasAncestor(casStateEntry.getCasReferenceId());
+    }
+    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), 
+					"sendReplyWithException", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_returning_exception_to_client__FINE", new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),replyEndpoint.getEndpoint()});
+    }
+    if ( replyEndpoint.isRemote()) {
+      //  this is an input CAS that has been marked as failed. Return the input CAS
+      //  and an exception to the client.
+      getOutputChannel().sendReply(casStateEntry.getErrors().get(0), 
+                                   casStateEntry.getCasReferenceId(),
+                                   null,
+                                   replyEndpoint,
+                                   AsynchAEMessage.Process);
+    } else {
+      replyEndpoint.setReplyEndpoint(true);
+      UimaTransport vmTransport = getTransport(replyEndpoint.getEndpoint()) ;
+      UimaMessage message = 
+        vmTransport.produceMessage(AsynchAEMessage.Process,AsynchAEMessage.Response, this.getName());
+      message.addIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception); 
+      message.addStringProperty(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId());
+      
+      Throwable wrapper = null;
+      Throwable cause = casStateEntry.getErrors().get(0);
+      if ( !(cause instanceof UimaEEServiceException) )
+      {
+        //  Strip off AsyncAEException and replace with UimaEEServiceException
+        if ( cause instanceof AsynchAEException && cause.getCause() != null )
+        {
+          wrapper = new UimaEEServiceException(cause.getCause());
+        }
+        else
+        {
+          wrapper = new UimaEEServiceException(cause);
+        }
+      }
+      if ( wrapper == null )
+      {
+        message.addObjectProperty(AsynchAEMessage.Cargo, cause);
+      }
+      else
+      {
+        message.addObjectProperty(AsynchAEMessage.Cargo, wrapper);
+      }
+      vmTransport.getUimaMessageDispatcher(replyEndpoint.getEndpoint()).dispatch( message );
+    }
+ }
+
 	
-	private Endpoint replyToClient(CacheEntry cacheEntry, CasStateEntry casStateEntry ) throws Exception
-	{
-		Endpoint endpoint = null;
-		
-		// Get the endpoint that represents a client that send the request
-		// to this service. If the first arg to getEndpoint() is null, the method
-		// should return the origin.
-		if (isTopLevelComponent())
-		{
-			if ( casStateEntry.isSubordinate()) 
-			{
-				endpoint = getInProcessCache().getTopAncestorEndpoint(cacheEntry);	
-			}
-			else
-			{
-				endpoint = getInProcessCache().getEndpoint(null, casStateEntry.getCasReferenceId());
-			}
-		}
-		else
-		{
-		  endpoint = getReplyEndpoint( cacheEntry );
-			dropFlow(casStateEntry.getCasReferenceId(), false);
-		}
-		if ( endpoint != null )
-		{
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), 
-					"replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step__FINEST", new Object[] { casStateEntry.getCasReferenceId(), (double) (System.nanoTime() - endpoint.getEntryTime()) / (double) 1000000 });
+  private boolean sendExceptionToClient( CacheEntry cacheEntry, CasStateEntry casStateEntry, Endpoint replyEndpoint ) throws Exception {
+    //  Dont send CASes to the client if the input CAS is in failed state. One
+    //  of the descendant CASes may have failed in one of the delegates. Any
+    //  exception on descendant CAS causes the input CAS to be returned to the
+    //  client with an exception but only when all its descendant CASes are 
+    //  accounted for and released.
+    if ( casStateEntry.isSubordinate() ) {
+      
+      //  Fetch the top ancestor CAS of this CAS.
+      CasStateEntry topAncestorCasStateEntry = getLocalCache().
+        getTopCasAncestor(casStateEntry.getInputCasReferenceId());
+      //  check the state
+      if ( topAncestorCasStateEntry.isFailed() && topAncestorCasStateEntry.getSubordinateCasInPlayCount() == 0) {
+        
+        return true;
+      } else {
+        //  Add the id of the generated CAS to the map holding outstanding CASes. This
+        //  map will be referenced when a client sends Free CAS Notification. The map
+        //  stores the id of the CAS both as a key and a value. Map is used to facilitate
+        //  quick lookup
+        cmOutstandingCASes.put(casStateEntry.getCasReferenceId(),casStateEntry.getCasReferenceId());
       }
-			if (endpoint.getEndpoint() == null)
-			{
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_reply_destination__INFO", new Object[] { casStateEntry.getCasReferenceId() });
+    } else if ( casStateEntry.isFailed()) {
+      return true;
+    }      
+    return false;
+  }
+  private void sendReplyToRemoteClient( CacheEntry cacheEntry, CasStateEntry casStateEntry, Endpoint replyEndpoint) throws Exception {
+    if ( sendExceptionToClient(cacheEntry, casStateEntry, replyEndpoint))  {
+      sendReplyWithException(cacheEntry, casStateEntry, replyEndpoint);
+    } else {
+      // Send response to a given endpoint
+      getOutputChannel().sendReply(cacheEntry, replyEndpoint);
+      //  Drop the CAS only if the client is remote and the CAS is an input CAS. 
+      //  If this CAS has a parent the client will send Release CAS notification to release the CAS.
+      if ( !casStateEntry.isSubordinate() )
+      {
+        dropCAS(casStateEntry.getCasReferenceId(), true);
+        //  If the cache is empty change the state of the Aggregate to idle
+        if ( getInProcessCache().isEmpty() )
+        {
+          endProcess(AsynchAEMessage.Process);
         }
-				HashMap map = new HashMap();
-				map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
-				map.put(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId());
-				handleError(map, new UnknownDestinationException());
+      }
+    }
+  }
 
-			}
-			// Dont send a reply to the client if the client is a CAS multiplier
-			else if ( !endpoint.isCasMultiplier() )
-			{
-				endpoint.setFinal(true);
-				
-				if ( !isStopped() )
-				{
-          if ( endpoint.isRemote())
-          {
-            //  Check if this CAS is new, meaning it has a parent and this component is a Cas Multiplier
-            if ( casStateEntry.isSubordinate() && isCasMultiplier()) {
-              //  Add the generated CAS to the outstanding CAS Map. Client notification will release
-              //  this CAS back to its pool
-              synchronized(syncObject)
-              {
-                cmOutstandingCASes.put(casStateEntry.getCasReferenceId(),casStateEntry.getCasReferenceId());
-              }
-            }
-            if ( casStateEntry.isSubordinate()) {
-              CasStateEntry topAncestorCasStateEntry = getLocalCache().
-                getTopCasAncestor(casStateEntry.getInputCasReferenceId());
-            }
-            // Send response to a given endpoint
-            getOutputChannel().sendReply(cacheEntry, endpoint);
-          } else {
-            int mType = AsynchAEMessage.Response;
-            //  Check if the CAS was produced in this aggregate by any of its delegates
-            //  If so, send the CAS as a request. Otherwise, the CAS is an input CAS and
-            //  needs to return as reply.
-            if ( casStateEntry.isSubordinate() && 
-                    isCasMultiplier() && 
-                      getComponentName().equals(cacheEntry.getCasProducerAggregateName()) ) {
-              //  this is a Cas Multiplier, send this CAS to the client in a request message.
-              mType = AsynchAEMessage.Request;
-              //  Return the CAS to the colocated client. First make sure that this CAS
-              //  is associated with the input CAS. This CAS may have been produced from
-              //  an intermediate CAS (which was produced from the input CAS). From the
-              //  client perspective, this Cas Multiplier Aggregate is a black box,
-              //  all CASes produced here must be linked with the input CAS.
-              //  Find the top ancestor of this CAS. It is the input CAS sent by the client
-              String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
-              //  Modify the parent of this CAS. 
-              if ( inputCasId != null && !inputCasId.equals(casStateEntry.getInputCasReferenceId())) {
-                casStateEntry.setInputCasReferenceId(inputCasId);
-                cacheEntry.setInputCasReferenceId(inputCasId);
-              }
-            }
-            sendVMMessage(mType, endpoint, cacheEntry);
-          }
-				}
-			}
-			//	Drop the CAS only if the client is remote and the CAS is an input CAS. 
-			//  If this CAS has a parent the client will send Release CAS notification to release the CAS.
-			if ( endpoint.isRemote() && !casStateEntry.isSubordinate())
-			{
-				dropCAS(casStateEntry.getCasReferenceId(), true);
-				//	If the cache is empty change the state of the Aggregate to idle
-				if ( getInProcessCache().isEmpty() )
-				{
-					endProcess(AsynchAEMessage.Process);
-				}
-			}
-		}
-		else
-		{
+  private void sendReplyToCollocatedClient( CacheEntry cacheEntry, CasStateEntry casStateEntry, Endpoint replyEndpoint) throws Exception {
+    boolean casProducedInThisAggregate = getComponentName().equals(cacheEntry.getCasProducerAggregateName());
+    String componentName = getComponentName();
+    
+    boolean isSubordinate = casStateEntry.isSubordinate();
+    boolean serviceIsCM = isCasMultiplier();
+    if ( sendExceptionToClient(cacheEntry, casStateEntry, replyEndpoint)  ) {
+      try {
+        sendReplyWithException(cacheEntry, casStateEntry, replyEndpoint);
+      } catch (Exception e) {
+      } finally {
+        if ( casProducedInThisAggregate ) {
+          //  Drop the CAS generated in this Aggregate
+          dropCAS(casStateEntry.getCasReferenceId(), true);
+        }
+      }
+    } else {
+    	if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), 
+					"sendReplyToCollocatedClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_sending_reply_to_client__FINE", new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),replyEndpoint.getEndpoint()});
+    	}
+      int mType = AsynchAEMessage.Response;
+      //  Check if the CAS was produced in this aggregate by any of its delegates
+      //  If so, send the CAS as a request. Otherwise, the CAS is an input CAS and
+      //  needs to return as reply.
+      if ( isSubordinate && serviceIsCM && casProducedInThisAggregate ) {
+        //  this is a Cas Multiplier, send this CAS to the client in a request message.
+        mType = AsynchAEMessage.Request;
+        //  Return the CAS to the colocated client. First make sure that this CAS
+        //  is associated with the input CAS. This CAS may have been produced from
+        //  an intermediate CAS (which was produced from the input CAS). From the
+        //  client perspective, this Cas Multiplier Aggregate is a black box,
+        //  all CASes produced here must be linked with the input CAS.
+        //  Find the top ancestor of this CAS. It is the input CAS sent by the client
+        String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
+        //  Modify the parent of this CAS. 
+        if ( inputCasId != null && !inputCasId.equals(casStateEntry.getInputCasReferenceId())) {
+          casStateEntry.setInputCasReferenceId(inputCasId);
+          cacheEntry.setInputCasReferenceId(inputCasId);
+        }
+      }
+      // Send CAS to a given reply endpoint
+      sendVMMessage(mType, replyEndpoint, cacheEntry);
+    }
+ }
+	
+	
+  private boolean validEndpoint( Endpoint endpoint, CasStateEntry casStateEntry) {
+    if ( endpoint == null ) {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), 
-          "replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_client_endpoint_not_found__INFO", new Object[] { getComponentName(), casStateEntry.getCasReferenceId() });
+         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), 
+           "validEndpoint", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_client_endpoint_not_found__INFO", new Object[] { getComponentName(), casStateEntry.getCasReferenceId() });
+      }
+      return false;
+   }
+   if (endpoint.getEndpoint() == null)
+   {
+     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "validEndpoint", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_reply_destination__INFO", new Object[] { casStateEntry.getCasReferenceId() });
+     }
+     HashMap map = new HashMap();
+     map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
+     map.put(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId());
+     handleError(map, new UnknownDestinationException());
+     return false;
+   }
+   // Dont send a reply to the client if the client is a CAS multiplier
+   if ( endpoint.isCasMultiplier() ) {
+     return false;
+   }
+
+   return true;
+  }
+	private Endpoint replyToClient(CacheEntry cacheEntry, CasStateEntry casStateEntry ) throws Exception
+	{
+    Endpoint endpoint = getReplyEndpoint(cacheEntry, casStateEntry);
+    if ( !validEndpoint(endpoint, casStateEntry)) {
+      return null; // the reason has already been logged
+    }
+    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), 
+					"replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step__FINEST", new Object[] { casStateEntry.getCasReferenceId(), (double) (System.nanoTime() - endpoint.getEntryTime()) / (double) 1000000 });
+    }
+		endpoint.setFinal(true);
+		if ( !isStopped() ) {
+		  if ( endpoint.isRemote()) {
+		    sendReplyToRemoteClient(cacheEntry, casStateEntry, endpoint);
+      } else {  
+        sendReplyToCollocatedClient(cacheEntry, casStateEntry, endpoint);
       }
-		  
 		}
 		return endpoint;
 	}
@@ -1760,6 +1932,32 @@
     //  Send reply back to the client. Use internal (non-jms) transport
     transport.getUimaMessageDispatcher(endpoint.getEndpoint()).dispatch(message);
   }
+  
+  
+  private Endpoint getReplyEndpoint(CacheEntry cacheEntry, CasStateEntry casStateEntry ) throws Exception {
+    Endpoint endpoint = null;
+    // Get the endpoint that represents a client that send the request
+    // to this service. If the first arg to getEndpoint() is null, the method
+    // should return the origin.
+    if (isTopLevelComponent())
+    {
+      if ( casStateEntry.isSubordinate()) 
+      {
+        endpoint = getInProcessCache().getTopAncestorEndpoint(cacheEntry);  
+      }
+      else
+      {
+        endpoint = getInProcessCache().getEndpoint(null, casStateEntry.getCasReferenceId());
+      }
+    }
+    else
+    {
+      endpoint = getReplyEndpoint( cacheEntry );
+      dropFlow(casStateEntry.getCasReferenceId(), false);
+    }
+    return endpoint;
+  }
+  
 	private Endpoint getReplyEndpoint(CacheEntry cacheEntry) throws Exception
 	{
 	  if ( cacheEntry == null )