You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/04/06 16:24:34 UTC
svn commit: r762357 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
Author: cwiklik
Date: Mon Apr 6 14:24:33 2009
New Revision: 762357
URL: http://svn.apache.org/viewvc?rev=762357&view=rev
Log:
UIMA-1245 Fixed an exception related to parent CAS processing
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=762357&r1=762356&r2=762357&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Mon Apr 6 14:24:33 2009
@@ -332,6 +332,9 @@
String inputCasReferenceId = casReferenceId;
// Destination where Free Cas Notification will be sent if the CAS came from a Cas Multiplier
Endpoint freeCasEndpoint = null;
+
+ CasStateEntry inputCasStateEntry = null;
+
// CASes generated by a Cas Multiplier will have a CasSequence property set.
if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
{
@@ -339,7 +342,6 @@
inputCasReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
// Fetch Cache entry for the parent CAS
CacheEntry inputCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(inputCasReferenceId);
- CasStateEntry casStateEntry = null;
// Fetch an endpoint where Free CAS Notification must be sent.
// This endpoint is unique per CM instance. Meaning, each
// instance of CM will have an endpoint where it expects Free CAS
@@ -349,11 +351,11 @@
freeCasEndpoint = (Endpoint)((Endpoint_impl)freeCasEndpoint).clone();
if ( getController() instanceof AggregateAnalysisEngineController ) {
- casStateEntry = ((AggregateAnalysisEngineController)getController()).
+ inputCasStateEntry = ((AggregateAnalysisEngineController)getController()).
getLocalCache().lookupEntry(inputCasReferenceId);
// Associate Free Cas Notification Endpoint with an input Cas
- casStateEntry.setFreeCasNotificationEndpoint(freeCasEndpoint);
+ inputCasStateEntry.setFreeCasNotificationEndpoint(freeCasEndpoint);
}
computeStats(aMessageContext, inputCasReferenceId);
@@ -417,11 +419,21 @@
cse = getController().getLocalCache().lookupEntry(casReferenceId);
}
- if ( getController() instanceof AggregateAnalysisEngineController ) {
- String delegateKey =((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
- Delegate delegate = ((AggregateAnalysisEngineController)getController()).lookupDelegate(delegateKey);
- // Save the last delegate handling this CAS
- cse.setLastDelegate(delegate);
+ if ( getController() instanceof AggregateAnalysisEngineController && aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
+ String delegateInputQueueName = aMessageContext.getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ String delegateKey =((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(delegateInputQueueName); //aMessageContext.getEndpoint().getEndpoint());
+ if ( delegateKey != null ) {
+ Delegate delegate = ((AggregateAnalysisEngineController)getController()).lookupDelegate(delegateKey);
+ // Save the last delegate handling this CAS
+ cse.setLastDelegate(delegate);
+ // If there is one thread receiving messages from Cas Multiplier increment number of child Cases
+ // of the parent CAS. If there are more threads (consumers) a special object ConcurrentMessageListener
+ // has already incremented the count. This special object enforces order of processing for CASes
+ // coming in from the Cas Multiplier.
+ if ( !delegate.hasConcurrentConsumersOnReplyQueue() ) {
+ inputCasStateEntry.incrementSubordinateCasInPlayCount();
+ }
+ }
}
entry = deserializeCASandRegisterWithCache( casReferenceId, freeCasEndpoint, newCASProducedBy, aMessageContext);