You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/01/30 22:43:35 UTC
svn commit: r739415 - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller:
AggregateAnalysisEngineController_impl.java LocalCache.java
Author: cwiklik
Date: Fri Jan 30 21:43:35 2009
New Revision: 739415
URL: http://svn.apache.org/viewvc?rev=739415&view=rev
Log:
UIMA-1285 Fixes Flow Object Not In Flow Cache
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=739415&r1=739414&r2=739415&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Fri Jan 30 21:43:35 2009
@@ -1576,54 +1576,51 @@
handleError(map, new UnknownDestinationException());
}
+ // Dont send a reply to the client if the client is a CAS multiplier
else if ( !endpoint.isCasMultiplier() )
{
endpoint.setFinal(true);
if ( !isStopped() )
{
- // Check if this CAS is new, meaning it has a parent and this component is a Cas Multiplier
- if ( casStateEntry.isSubordinate() && isCasMultiplier())
- {
- if ( endpoint.isRemote())
- {
- // Add the generated CAS to the outstanding CAS Map. Client notification will release
- // this CAS back to its pool
- synchronized(syncObject)
- {
- cmOutstandingCASes.put(casStateEntry.getCasReferenceId(),casStateEntry.getCasReferenceId());
- }
- }
- // If the client is in the same JVM use the non-jms queue for messaging
- if ( !endpoint.isRemote() )
- {
- int mType = AsynchAEMessage.Response;
- // Check if the CAS was produced in this aggregate by any of its delegates
- // If so, send the CAS as a request. Otherwise, the CAS is an input CAS and
- // needs to return as reply.
- if ( getComponentName().equals(cacheEntry.getCasProducerAggregateName()) ) {
- mType = AsynchAEMessage.Request;
- }
- sendVMMessage(mType, endpoint, cacheEntry);
- }
- else
- {
- // Send response to a given endpoint
- getOutputChannel().sendReply(cacheEntry, endpoint);
- }
- }
- else
- {
- if ( !endpoint.isRemote())
- {
- sendVMMessage(AsynchAEMessage.Response, endpoint, cacheEntry);
+ if ( endpoint.isRemote())
+ {
+ // Check if this CAS is new, meaning it has a parent and this component is a Cas Multiplier
+ if ( casStateEntry.isSubordinate() && isCasMultiplier()) {
+ // Add the generated CAS to the outstanding CAS Map. Client notification will release
+ // this CAS back to its pool
+ synchronized(syncObject)
+ {
+ cmOutstandingCASes.put(casStateEntry.getCasReferenceId(),casStateEntry.getCasReferenceId());
+ }
}
- else if ( casStateEntry != null )
- {
- // Send response to a given endpoint
- getOutputChannel().sendReply(casStateEntry.getCasReferenceId(), endpoint);
+ // Send response to a given endpoint
+ getOutputChannel().sendReply(cacheEntry, endpoint);
+ } else {
+ int mType = AsynchAEMessage.Response;
+ // Check if the CAS was produced in this aggregate by any of its delegates
+ // If so, send the CAS as a request. Otherwise, the CAS is an input CAS and
+ // needs to return as reply.
+ if ( casStateEntry.isSubordinate() &&
+ isCasMultiplier() &&
+ getComponentName().equals(cacheEntry.getCasProducerAggregateName()) ) {
+ // this is a Cas Multiplier, send this CAS to the client in a request message.
+ mType = AsynchAEMessage.Request;
+ // Return the CAS to the colocated client. First make sure that this CAS
+ // is associated with the input CAS. This CAS may have been produced from
+ // an intermediate CAS (which was produced from the input CAS). From the
+ // client perspective, this Cas Multiplier Aggregate is a black box,
+ // all CASes produced here must be linked with the input CAS.
+ // Find the top ancestor of this CAS. It is the input CAS sent by the client
+ String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
+ // Modify the parent of this CAS.
+ if ( inputCasId != null && !inputCasId.equals(casStateEntry.getInputCasReferenceId())) {
+ casStateEntry.setInputCasReferenceId(inputCasId);
+ cacheEntry.setInputCasReferenceId(inputCasId);
+ }
}
- }
+ sendVMMessage(mType, endpoint, cacheEntry);
+ }
}
}
// Drop the CAS only if the client is remote and the CAS is an input CAS.
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=739415&r1=739414&r2=739415&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java Fri Jan 30 21:43:35 2009
@@ -29,6 +29,34 @@
return null;
}
+ public String lookupInputCasReferenceId(String aCasReferenceId) {
+ String parentCasReferenceId=null;
+ if ( this.containsKey(aCasReferenceId)) {
+ CasStateEntry entry = (CasStateEntry)get(aCasReferenceId);
+ if ( entry.isSubordinate()) {
+ // recursively call each parent until we get to the top of the
+ // Cas hierarchy
+ parentCasReferenceId = lookupInputCasReferenceId(entry.getInputCasReferenceId());
+ } else {
+ return aCasReferenceId;
+ }
+ }
+ return parentCasReferenceId;
+ }
+
+ public String lookupInputCasReferenceId(CasStateEntry entry) {
+ String parentCasReferenceId=null;
+ if ( entry.isSubordinate()) {
+ // recursively call each parent until we get to the top of the
+ // Cas hierarchy
+ parentCasReferenceId =
+ lookupInputCasReferenceId((CasStateEntry)get(entry.getInputCasReferenceId()));
+ } else {
+ return entry.getCasReferenceId();
+ }
+ return parentCasReferenceId;
+ }
+
public synchronized void dumpContents() {
int count=0;
if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )