You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/01/30 22:43:35 UTC

svn commit: r739415 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller: AggregateAnalysisEngineController_impl.java LocalCache.java

Author: cwiklik
Date: Fri Jan 30 21:43:35 2009
New Revision: 739415

URL: http://svn.apache.org/viewvc?rev=739415&view=rev
Log:
 UIMA-1285  Fixes Flow Object Not In Flow Cache

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=739415&r1=739414&r2=739415&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Fri Jan 30 21:43:35 2009
@@ -1576,54 +1576,51 @@
 				handleError(map, new UnknownDestinationException());
 
 			}
+			// Dont send a reply to the client if the client is a CAS multiplier
 			else if ( !endpoint.isCasMultiplier() )
 			{
 				endpoint.setFinal(true);
 				
 				if ( !isStopped() )
 				{
-					//	Check if this CAS is new, meaning it has a parent and this component is a Cas Multiplier
-					if ( casStateEntry.isSubordinate() && isCasMultiplier())
-					{
-					  if ( endpoint.isRemote())
-            {
-					    //	Add the generated CAS to the outstanding CAS Map. Client notification will release
-					    //	this CAS back to its pool
-					    synchronized(syncObject)
-					    {
-					      cmOutstandingCASes.put(casStateEntry.getCasReferenceId(),casStateEntry.getCasReferenceId());
-	            }
-						}
-					  // If the client is in the same JVM use the non-jms queue for messaging
-				    if ( !endpoint.isRemote() )
-				    {
-              int mType = AsynchAEMessage.Response;
-				      //  Check if the CAS was produced in this aggregate by any of its delegates
-              //  If so, send the CAS as a request. Otherwise, the CAS is an input CAS and
-              //  needs to return as reply.
-				      if ( getComponentName().equals(cacheEntry.getCasProducerAggregateName()) ) {
-	              mType = AsynchAEMessage.Request;
-				      }
-              sendVMMessage(mType, endpoint, cacheEntry);
-				    }
-				    else
-				    {
-	            // Send response to a given endpoint
-	            getOutputChannel().sendReply(cacheEntry, endpoint);
-				    }
-					}
-					else
-					{
-            if ( !endpoint.isRemote())
-            {
-              sendVMMessage(AsynchAEMessage.Response, endpoint, cacheEntry);
+          if ( endpoint.isRemote())
+          {
+            //  Check if this CAS is new, meaning it has a parent and this component is a Cas Multiplier
+            if ( casStateEntry.isSubordinate() && isCasMultiplier()) {
+              //  Add the generated CAS to the outstanding CAS Map. Client notification will release
+              //  this CAS back to its pool
+              synchronized(syncObject)
+              {
+                cmOutstandingCASes.put(casStateEntry.getCasReferenceId(),casStateEntry.getCasReferenceId());
+              }
             }
-            else if ( casStateEntry != null )
-            {
-              // Send response to a given endpoint
-              getOutputChannel().sendReply(casStateEntry.getCasReferenceId(), endpoint);
+            // Send response to a given endpoint
+            getOutputChannel().sendReply(cacheEntry, endpoint);
+          } else {
+            int mType = AsynchAEMessage.Response;
+            //  Check if the CAS was produced in this aggregate by any of its delegates
+            //  If so, send the CAS as a request. Otherwise, the CAS is an input CAS and
+            //  needs to return as reply.
+            if ( casStateEntry.isSubordinate() && 
+                    isCasMultiplier() && 
+                      getComponentName().equals(cacheEntry.getCasProducerAggregateName()) ) {
+              //  this is a Cas Multiplier, send this CAS to the client in a request message.
+              mType = AsynchAEMessage.Request;
+              //  Return the CAS to the colocated client. First make sure that this CAS
+              //  is associated with the input CAS. This CAS may have been produced from
+              //  an intermediate CAS (which was produced from the input CAS). From the
+              //  client perspective, this Cas Multiplier Aggregate is a black box,
+              //  all CASes produced here must be linked with the input CAS.
+              //  Find the top ancestor of this CAS. It is the input CAS sent by the client
+              String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
+              //  Modify the parent of this CAS. 
+              if ( inputCasId != null && !inputCasId.equals(casStateEntry.getInputCasReferenceId())) {
+                casStateEntry.setInputCasReferenceId(inputCasId);
+                cacheEntry.setInputCasReferenceId(inputCasId);
+              }
             }
-					}
+            sendVMMessage(mType, endpoint, cacheEntry);
+          }
 				}
 			}
 			//	Drop the CAS only if the client is remote and the CAS is an input CAS. 

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java?rev=739415&r1=739414&r2=739415&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/LocalCache.java Fri Jan 30 21:43:35 2009
@@ -29,6 +29,34 @@
     return null;
   }
   
+  public String lookupInputCasReferenceId(String aCasReferenceId) {
+    String parentCasReferenceId=null;
+    if ( this.containsKey(aCasReferenceId)) {
+      CasStateEntry entry = (CasStateEntry)get(aCasReferenceId);
+      if ( entry.isSubordinate()) {
+        //  recursively call each parent until we get to the top of the 
+        //  Cas hierarchy
+        parentCasReferenceId = lookupInputCasReferenceId(entry.getInputCasReferenceId());
+      } else {
+        return aCasReferenceId;
+      }
+    }
+    return parentCasReferenceId;
+  }
+
+  public String lookupInputCasReferenceId(CasStateEntry entry) {
+    String parentCasReferenceId=null;
+    if ( entry.isSubordinate()) {
+      //  recursively call each parent until we get to the top of the 
+      //  Cas hierarchy
+      parentCasReferenceId = 
+        lookupInputCasReferenceId((CasStateEntry)get(entry.getInputCasReferenceId()));
+    } else {
+      return entry.getCasReferenceId();
+    }
+    return parentCasReferenceId;
+  }
+  
   public synchronized void dumpContents() {
     int count=0;
     if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )