You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/04/06 16:24:34 UTC

svn commit: r762357 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java

Author: cwiklik
Date: Mon Apr  6 14:24:33 2009
New Revision: 762357

URL: http://svn.apache.org/viewvc?rev=762357&view=rev
Log:
UIMA-1245 Fixed an exception related to parent CAS processing

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=762357&r1=762356&r2=762357&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Mon Apr  6 14:24:33 2009
@@ -332,6 +332,9 @@
 			String inputCasReferenceId = casReferenceId;
 			//	Destination where Free Cas Notification will be sent if the CAS came from a Cas Multiplier
 			Endpoint freeCasEndpoint = null;
+
+      CasStateEntry inputCasStateEntry = null;
+			
 			//	CASes generated by a Cas Multiplier will have a CasSequence property set. 
 			if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
 			{
@@ -339,7 +342,6 @@
 				inputCasReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
 				//	Fetch Cache entry for the parent CAS
         CacheEntry inputCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(inputCasReferenceId);
-        CasStateEntry casStateEntry = null;
         // Fetch an endpoint where Free CAS Notification must be sent.
         // This endpoint is unique per CM instance. Meaning, each 
         //  instance of CM will have an endpoint where it expects Free CAS
@@ -349,11 +351,11 @@
         freeCasEndpoint = (Endpoint)((Endpoint_impl)freeCasEndpoint).clone();
 
         if ( getController() instanceof AggregateAnalysisEngineController ) {
-          casStateEntry = ((AggregateAnalysisEngineController)getController()).
+          inputCasStateEntry = ((AggregateAnalysisEngineController)getController()).
               getLocalCache().lookupEntry(inputCasReferenceId);
 
           //  Associate Free Cas Notification Endpoint with an input Cas
-          casStateEntry.setFreeCasNotificationEndpoint(freeCasEndpoint);
+          inputCasStateEntry.setFreeCasNotificationEndpoint(freeCasEndpoint);
         }
 
 				computeStats(aMessageContext, inputCasReferenceId);
@@ -417,11 +419,21 @@
 		      cse = getController().getLocalCache().lookupEntry(casReferenceId);
 		    }
 
-	      if (  getController() instanceof AggregateAnalysisEngineController ) {
-	        String delegateKey =((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
-	        Delegate delegate = ((AggregateAnalysisEngineController)getController()).lookupDelegate(delegateKey);
-	        // Save the last delegate handling this CAS
-	        cse.setLastDelegate(delegate);
+		    if (  getController() instanceof AggregateAnalysisEngineController && aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
+		      String delegateInputQueueName = aMessageContext.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+		      String delegateKey =((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(delegateInputQueueName); //aMessageContext.getEndpoint().getEndpoint());
+	        if ( delegateKey != null ) {
+	          Delegate delegate = ((AggregateAnalysisEngineController)getController()).lookupDelegate(delegateKey);
+	          // Save the last delegate handling this CAS
+	          cse.setLastDelegate(delegate);
+	          // If there is one thread receiving messages from Cas Multiplier increment number of child Cases
+	          // of the parent CAS. If there are more threads (consumers) a special object ConcurrentMessageListener
+	          // has already incremented the count. This special object enforces order of processing for CASes 
+	          // coming in from the Cas Multiplier.
+	          if ( !delegate.hasConcurrentConsumersOnReplyQueue() ) {
+	            inputCasStateEntry.incrementSubordinateCasInPlayCount();
+	          }
+	        }
 	      }
 		    
 				entry = deserializeCASandRegisterWithCache( casReferenceId, freeCasEndpoint, newCASProducedBy, aMessageContext);