You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/08/22 20:53:06 UTC

svn commit: r688174 [1/3] - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main: java/org/apache/uima/aae/ java/org/apache/uima/aae/client/ java/org/apache/uima/aae/controller/ java/org/apache/uima/aae/error/handler/ java/org/apache/uima/aa...

Author: eae
Date: Fri Aug 22 11:53:05 2008
New Revision: 688174

URL: http://svn.apache.org/viewvc?rev=688174&view=rev
Log:
UIMA-1147 commit Jerry's work (patches) merging the post1st branch into the trunk

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/AsynchAECasManager_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatus.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatusImpl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerCallbackListener.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/AggregateServiceInfo.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/PrimitiveServiceInfo.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfo.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfoMBean.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformanceMBean.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/AsynchAECasManager_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/AsynchAECasManager_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/AsynchAECasManager_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/AsynchAECasManager_impl.java Fri Aug 22 11:53:05 2008
@@ -96,7 +96,7 @@
     contextName = aContextName;
     setInitialized(true);
     if (aPerformanceTuningSettings != null) {
-      System.out.println("CasManager Iniatialized Cas Pool:" + aContextName + ". Cas Pool Size:"
+      System.out.println("CasManager Initialized Cas Pool:" + aContextName + ". Cas Pool Size:"
               + aCasPoolSize + " Initial Cas Heap Size:"
               + aPerformanceTuningSettings.get(UIMAFramework.CAS_INITIAL_HEAP_SIZE) + " cells");
     }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java Fri Aug 22 11:53:05 2008
@@ -33,11 +33,13 @@
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.EventSubscriber;
 import org.apache.uima.aae.error.AsynchAEException;
+import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.MessageContext;
 import org.apache.uima.aae.monitor.statistics.DelegateStats;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.impl.OutOfTypeSystemData;
 import org.apache.uima.cas.impl.XmiSerializationSharedData;
+import org.apache.uima.flow.FinalStep;
 import org.apache.uima.flow.Step;
 import org.apache.uima.util.Level;
 
@@ -222,24 +224,36 @@
 	{
 		size = i;
 	}
-	public synchronized void dumpContents()
+	public synchronized void dumpContents(String aControllerName)
 	{
+		int count=0;
 		if ( UIMAFramework.getLogger().isLoggable(Level.FINEST) )
 		{
-			int count=0;
 			Iterator it = cache.keySet().iterator();
-
 			StringBuffer sb = new StringBuffer("\n");
+
 			while( it.hasNext() )
 			{
 				String key = (String) it.next();
 				CacheEntry entry = (CacheEntry)cache.get(key);
 				count++;
-				sb.append(key+"\n");
+				if ( entry.isSubordinate())
+				{
+					sb.append(key+ " Number Of Child CASes In Play:"+entry.getSubordinateCasInPlayCount()+" Parent CAS id:"+entry.getInputCasReferenceId());
+				}
+				else
+				{
+					sb.append(key+ " *** Input CAS. Number Of Child CASes In Play:"+entry.getSubordinateCasInPlayCount());
+				}
+				if ( entry.isWaitingForRelease() )
+				{
+					sb.append(" <<< Reached Final State in Controller:"+aControllerName);
+				}
+				sb.append("\n");
 			}
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
 	                "dumpContents", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_cache_entry_key__FINEST",
-	                new Object[] { count, sb.toString() });
+	                new Object[] { aControllerName, count, sb.toString() });
 			sb.setLength(0);
 /*			
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
@@ -247,7 +261,30 @@
 	                new Object[] { count });
 */
 		}
+		else if ( UIMAFramework.getLogger().isLoggable(Level.FINE) )
+		{
+			Iterator it = cache.keySet().iterator();
+			StringBuffer sb = new StringBuffer("\n");
+			int inFinalState=0;
+			
+			while( it.hasNext() )
+			{
+				String key = (String) it.next();
+				CacheEntry entry = (CacheEntry)cache.get(key);
+				count++;
+				if ( entry.isWaitingForRelease() )
+				{
+					inFinalState++;
+				}
+			}
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+	                "dumpContents", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_abbrev_cache_stats___FINE",
+	                new Object[] { aControllerName, count, inFinalState });
+		
+			
+		}
 	}
+	
 	public synchronized void remove(String aCasReferenceId)
 	{
 		if (aCasReferenceId != null && cache.containsKey(aCasReferenceId))
@@ -284,7 +321,7 @@
 		casRefEntry.deleteCAS();
 	}
 
-	public CacheEntry[] getCacheEntriesForEndpoint(String anEndpointName )
+	public synchronized CacheEntry[] getCacheEntriesForEndpoint(String anEndpointName )
 	{
 		CacheEntry[] entries;
 		ArrayList list = new ArrayList();
@@ -375,8 +412,12 @@
 		CacheEntry casRefEntry = getEntry(aCasReferenceId);
 		return casRefEntry.getOtsd();
 	}
-	private CacheEntry getEntry(String aCasReferenceId)
+	private synchronized CacheEntry getEntry(String aCasReferenceId)
 	{
+		if ( !cache.containsKey(aCasReferenceId))
+		{
+			return null;
+		}
 		return (CacheEntry) cache.get(aCasReferenceId);
 	}
 	public void addEndpoint( Endpoint anEndpoint, String aCasReferenceId)
@@ -414,31 +455,6 @@
 		CacheEntry casRefEntry = getEntry(aCasReferenceId);
 		return casRefEntry.getStartTime();
 	}
-	public synchronized String register(String anInputCasRefId, long aCurrentSequence, CAS aCAS, MessageContext aMessageContext, OutOfTypeSystemData otsd)
-	throws AsynchAEException
-	{
-		String casReferenceId = anInputCasRefId+"."+String.valueOf(aCurrentSequence); 
-
-		register(aCAS, aMessageContext, otsd, casReferenceId);
-		return casReferenceId;
-	}
-	public synchronized String register(CAS aCAS, MessageContext aMessageContext, OutOfTypeSystemData otsd)
-	throws AsynchAEException
-	{
-		//System.out.println("Register");
-		String casReferenceId = idGenerator.nextId(); 
-		register(aCAS, aMessageContext, otsd, casReferenceId);
-		return casReferenceId;
-	}
-	
-	public synchronized String register(CAS aCAS, MessageContext aMessageContext, XmiSerializationSharedData sharedData)
-	throws AsynchAEException
-	{
-		//System.out.println("Register");
-		String casReferenceId = idGenerator.nextId(); 
-		register(aCAS, aMessageContext, sharedData, casReferenceId);
-		return casReferenceId;
-	}	
 	public boolean entryExists(String aCasReferenceId) 
 	{
 		try
@@ -455,39 +471,37 @@
 		}
 		return true;
 	}
-	public synchronized void register(CAS aCAS, MessageContext aMessageContext, OutOfTypeSystemData otsd, String aCasReferenceId)
+
+	public synchronized CacheEntry register(CAS aCAS, MessageContext aMessageContext, OutOfTypeSystemData otsd)
 	throws AsynchAEException
 	{
-		cache.put(aCasReferenceId, new CacheEntry(aCAS, aCasReferenceId, aMessageContext, otsd));
+//		String casReferenceId = idGenerator.nextId(); 
+		return register(aCAS, aMessageContext, otsd, idGenerator.nextId());
+//		return casReferenceId;
 	}
 	
-	
-	public synchronized void register(CAS aCAS, MessageContext aMessageContext, XmiSerializationSharedData sharedData, String aCasReferenceId)
+	public synchronized CacheEntry register(CAS aCAS, MessageContext aMessageContext, XmiSerializationSharedData sharedData)
 	throws AsynchAEException
 	{
-		cache.put(aCasReferenceId, new CacheEntry(aCAS, aCasReferenceId, aMessageContext, sharedData));
+//		String casReferenceId = idGenerator.nextId(); 
+		return register(aCAS, aMessageContext, sharedData, idGenerator.nextId());
+//		return casReferenceId;
 	}	
-	public void register(CAS aCAS, OutOfTypeSystemData otsd, String aCasReferenceId ) 
+	public synchronized CacheEntry register(CAS aCAS, MessageContext aMessageContext, OutOfTypeSystemData otsd, String aCasReferenceId)
 	throws AsynchAEException
 	{
-		CacheEntry casRefEntry = getEntry(aCasReferenceId);
-		if ( casRefEntry == null )
-		{
-			throw new AsynchAEException("Cas Not Found In CasManager Cache. CasReferenceId::"+aCasReferenceId+" is Invalid");
-		}
-		casRefEntry.setCas(aCAS, otsd);
+		return registerCacheEntry(aCasReferenceId, new CacheEntry(aCAS, aCasReferenceId, aMessageContext, otsd));
 	}
-	public void register(CAS aCAS, String aCasReferenceId ) 
+	public synchronized CacheEntry register(CAS aCAS, MessageContext aMessageContext, XmiSerializationSharedData sharedData, String aCasReferenceId)
 	throws AsynchAEException
 	{
-		CacheEntry casRefEntry = getEntry(aCasReferenceId);
-		if ( casRefEntry == null )
-		{
-			throw new AsynchAEException("Cas Not Found In CasManager Cache. CasReferenceId::"+aCasReferenceId+" is Invalid");
-		}
-		casRefEntry.setCas(aCAS);
+		return registerCacheEntry(aCasReferenceId, new CacheEntry(aCAS, aCasReferenceId, aMessageContext, sharedData));
+	}	
+	private CacheEntry registerCacheEntry( String aCasReferenceId, CacheEntry entry )
+	{
+		cache.put(aCasReferenceId, entry);
+		return entry;
 	}
-	
 	public int getNumberOfParallelDelegates(String aCasReferenceId)
 	throws AsynchAEException
 	{
@@ -499,6 +513,36 @@
 		return casRefEntry.getNumberOfParallelDelegates();
 	}
  
+	public synchronized boolean hasNoSubordinates(String aCasReferenceId)
+	{
+		Iterator it = cache.keySet().iterator();
+		while( it.hasNext() )
+		{
+			String key = (String) it.next();
+			CacheEntry entry = (CacheEntry)cache.get(key);
+			if ( entry.getInputCasReferenceId() != null && entry.getInputCasReferenceId().equals(aCasReferenceId))
+			{
+				return false;
+			}
+		}
+		return true;
+	}
+
+	public Endpoint getTopAncestorEndpoint(CacheEntry anEntry) throws Exception
+	{
+		if ( anEntry == null )
+		{
+			return null;
+		}
+		
+		if ( anEntry.getInputCasReferenceId() == null )
+		{
+			return anEntry.getMessageOrigin();
+		}
+		CacheEntry parentEntry = getCacheEntryForCAS(anEntry.getInputCasReferenceId());
+		return getTopAncestorEndpoint(parentEntry);
+	}
+	
 	public void setNumberOfParallelDelegates(int aParallelDelegateCount, String aCasReferenceId)
 	throws AsynchAEException
 	{
@@ -510,7 +554,7 @@
 		casRefEntry.setNumberOfParallelDelegates(aParallelDelegateCount);
 	}
 	
-	public CacheEntry getCacheEntryForCAS( String aCasReferenceId )
+	public synchronized CacheEntry getCacheEntryForCAS( String aCasReferenceId )
 	throws AsynchAEException
 	{
 		CacheEntry casRefEntry = getEntry(aCasReferenceId);
@@ -593,34 +637,23 @@
 		
 		private int state = 0;
 		
+		private long sequence = 0;
+		
+		private Endpoint freeCasEndpoint;
+		
+		private FinalStep step;
+		
+		private boolean waitingForRealease;
 		
 		protected CacheEntry(CAS aCas, String aCasReferenceId, MessageContext aMessageAccessor, OutOfTypeSystemData aotsd)
 		{
 			this(aCas, aCasReferenceId, aMessageAccessor);
 			messageAccessor = aMessageAccessor;
-/*
-			cas = aCas;
-			otsd = aotsd;
-			if ( aMessageAccessor != null )
-			{
-				messageOrigin = aMessageAccessor.getEndpoint();
-			}
-			casReferenceId = aCasReferenceId;
-*/			
 		}
 		protected CacheEntry(CAS aCas, String aCasReferenceId, MessageContext aMessageAccessor, XmiSerializationSharedData sdata)
 		{
 			this(aCas, aCasReferenceId, aMessageAccessor);
 			deserSharedData = sdata;
-/*
-			cas = aCas;
-			messageAccessor = aMessageAccessor;
-			if ( aMessageAccessor != null )
-			{
-				messageOrigin = aMessageAccessor.getEndpoint();
-			}
-			casReferenceId = aCasReferenceId;
-*/			
 		}
 		private CacheEntry(CAS aCas, String aCasReferenceId, MessageContext aMessageAccessor )
 		{
@@ -631,6 +664,17 @@
 				messageOrigin = aMessageAccessor.getEndpoint();
 			}
 			casReferenceId = aCasReferenceId;
+			try
+			{
+				if ( aMessageAccessor.propertyExists(AsynchAEMessage.CasSequence) )
+				{
+					sequence = aMessageAccessor.getMessageLongProperty(AsynchAEMessage.CasSequence);
+				}
+			}
+			catch( Exception e)
+			{
+				e.printStackTrace();
+			}
 		}
 		public String getCasReferenceId()
 		{
@@ -914,6 +958,41 @@
 		{
 			state = aState;
 		}
+		public long getCasSequence()
+		{
+			return sequence;
+		}
+		public void setCasSequence(long sequence)
+		{
+			this.sequence = sequence;
+		}
+		
+		public void setFreeCasEndpoint( Endpoint aFreeCasEndpoint )
+		{
+			freeCasEndpoint = aFreeCasEndpoint;
+		}
+		public Endpoint getFreeCasEndpoint()
+		{
+			return freeCasEndpoint;
+		}
+		
+		public void setFinalStep( FinalStep step )
+		{
+			this.step = step;
+		}
+		public FinalStep getFinalStep()
+		{
+			return step;
+		}
+		public void setWaitingForRelease(boolean flag)
+		{
+			waitingForRealease = flag;
+		}
+		
+		public boolean isWaitingForRelease()
+		{
+			return waitingForRealease;
+		}
 	}	
 
 

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/InputChannel.java Fri Aug 22 11:53:05 2008
@@ -30,7 +30,6 @@
 	public void setServerUri(String aServerUri);
 	public String getInputQueueName();
 	public ServiceInfo getServiceInfo();
-//	public void stop() throws Exception;
 	public boolean isStopped();
     
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/OutputChannel.java Fri Aug 22 11:53:05 2008
@@ -19,6 +19,7 @@
 
 package org.apache.uima.aae;
 
+import org.apache.uima.aae.InProcessCache.CacheEntry;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.error.AsynchAEException;
@@ -46,7 +47,9 @@
 	
 	public void sendReply( String aCasReferenceId, Endpoint anEndpoint ) throws AsynchAEException;
 
-//	public void sendReply( AnalysisEngineMetaData anAnalysisEngineMetadata, Endpoint anEndpoint, boolean serialize ) throws AsynchAEException;
+	public void sendReply( CacheEntry entry, Endpoint anEndpoint ) throws AsynchAEException;
+
+	//	public void sendReply( AnalysisEngineMetaData anAnalysisEngineMetadata, Endpoint anEndpoint, boolean serialize ) throws AsynchAEException;
 	public void sendReply( ProcessingResourceMetaData aProcessingResourceMetadata, Endpoint anEndpoint, boolean serialize ) throws AsynchAEException;
 
 	public void sendReply(Throwable t, String aCasReferenceId, Endpoint anEndpoint, int aCommand) throws AsynchAEException;

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatus.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatus.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatus.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatus.java Fri Aug 22 11:53:05 2008
@@ -23,5 +23,5 @@
 public interface UimaASProcessStatus extends EntityProcessStatus {
 	
 	public String getCasReferenceId();
-
+	public String getParentCasReferenceId();
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatusImpl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatusImpl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatusImpl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaASProcessStatusImpl.java Fri Aug 22 11:53:05 2008
@@ -47,15 +47,21 @@
 	  public boolean isProcessed = true;
 
 	  private String casReferenceId;
+
+	  private String parentCasId;
 	  
 	  public UimaASProcessStatusImpl(ProcessTrace p){
 	      this(p,null);
 	  }
 	  public UimaASProcessStatusImpl(ProcessTrace p, String aCasReferenceId) {
+	      this(p,aCasReferenceId,null);
+	  }
+
+	  public UimaASProcessStatusImpl(ProcessTrace p, String aCasReferenceId, String aParentCasReferenceId) {
 		    prT = p;
 		    casReferenceId = aCasReferenceId;
-		  }
-
+		    parentCasId = aParentCasReferenceId;
+	  }
 	  public UimaASProcessStatusImpl(ProcessTrace p, boolean aSkip) {
 	    prT = p;
 	    isSkipped = aSkip;
@@ -143,4 +149,7 @@
 	public String getCasReferenceId() {
 		return casReferenceId;
 	}
+	public String getParentCasReferenceId() {
+		return parentCasId;
+	}
 	}

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/client/UimaAsynchronousEngine.java Fri Aug 22 11:53:05 2008
@@ -74,6 +74,7 @@
 	public final String ServerUri = "ServerURI";
 	public final String Endpoint = "Endpoint";
 	public final String CasPoolSize = "CasPoolSize";
+	public final String ShadowCasPoolSize ="ShadowCasPoolSize";
 	public static final String ReplyWindow = "ReplyWindow";
 	public static final String Timeout = "Timeout";
 	public static final String CpcTimeout = "CpcTimeout";
@@ -203,10 +204,10 @@
 	   * It doesn't use call-backs through a registered application listener.  
 	   *  
 	   * @param aCAS - a CAS to analyze.
-	   * 
+	   * @return - a unique id assigned to the CAS
 	   * @throws ResourceProcessException
 	   */
-	  public void sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException;
+	  public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException;
 	
 	  /**
 	   * Deploys a UIMA AS container and all services defined in provided deployment 

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController.java Fri Aug 22 11:53:05 2008
@@ -31,11 +31,14 @@
 import org.apache.uima.aae.jmx.ServiceErrors;
 import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.jmx.ServicePerformance;
+import org.apache.uima.flow.FinalStep;
 
 public interface AggregateAnalysisEngineController extends AnalysisEngineController
 {
 	public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException;
 
+	public void mergeTypeSystem(String aTypeSystem, String fromDestination, String fromServer) throws AsynchAEException;
+
 	public void sendRequestForMetadataToRemoteDelegates() throws AsynchAEException;
 	
 	public void addMessageOrigin( String aCasReferenceId, Endpoint anEndpoint );
@@ -60,6 +63,8 @@
 	
 	public String lookUpDelegateKey( String aDelegateEndpointName );
 	
+	public String lookUpDelegateKey( String aDelegateEndpointName, String server );
+
 	public UimaContext getChildUimaContext( String aDelegateEndpointName ) throws Exception;
 	
 //	public void retryCAS( String aCasReferenceId, Endpoint anEndpoint )throws AsynchAEException;
@@ -79,7 +84,7 @@
 	
 	public String getLastDelegateKeyFromFlow(String anInputCasReferenceId);
 
-	public boolean sendRequestToReleaseCas();
+//	public boolean sendRequestToReleaseCas();
 	
 	public void registerChildController( AnalysisEngineController aChildController, String aDelegateKey) throws Exception;
 
@@ -105,5 +110,7 @@
 
 	public ServicePerformance getServicePerformance(String aDelegateKey );
 
-	public boolean decrementCasSubordinateCount( CacheEntry aParentCasCacheEntry );
+//	public boolean decrementCasSubordinateCount( CacheEntry aParentCasCacheEntry );
+	
+	public void finalStep( FinalStep aStep, String aCasReferenceId);
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Fri Aug 22 11:53:05 2008
@@ -84,7 +84,8 @@
 
 	private Map flowMap = new HashMap();
 
-	protected ConcurrentHashMap destinationMap;
+//	protected ConcurrentHashMap destinationMap;
+	private volatile ConcurrentHashMap destinationMap;
 
 	private Map destinationToKeyMap;
 
@@ -114,15 +115,14 @@
 
 	private boolean initialized = false;
  
-	private int cmCasPoolSizeDelta = 0;
-	
 	private int counter = 0;
 	
 	private Object counterMonitor = new Object();
 	
 	protected List childControllerList = new ArrayList();
+//	private List childControllerList = new ArrayList();
 	
-	protected Map delegateStats = new HashMap();
+	private Map delegateStats = new HashMap();
 	
 	private AggregateServiceInfo serviceInfo = null;
 
@@ -130,6 +130,13 @@
 
 	private boolean requestForMetaSentToRemotes = false;
 
+	private Object mux = new Object();
+	
+	private boolean isIdle = true;
+	
+	private long lastUpdate = System.nanoTime();
+	
+	private long totalIdleTime = System.nanoTime();
 
 	private ConcurrentHashMap<String, Object[]> delegateStatMap = 
 		new ConcurrentHashMap();
@@ -189,6 +196,23 @@
 	public void addMessageOrigin(String aCasReferenceId, Endpoint anEndpoint)
 	{
 		originMap.put(aCasReferenceId, anEndpoint);
+		if ( UIMAFramework.getLogger().isLoggable(Level.FINE))
+		{
+			Iterator it = originMap.keySet().iterator();
+			StringBuffer sb = new StringBuffer();
+			while( it.hasNext())
+			{
+				String key = (String) it.next();
+				Endpoint e = (Endpoint) originMap.get(key);
+				if ( e != null )
+				{
+					sb.append("\t\nCAS:"+key+" Origin:"+e.getEndpoint());
+				}
+			}
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+	                "addMessageOrigin", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_dump_msg_origin__FINE",
+	                new Object[] {getComponentName(), sb.toString()});
+		}
 	}
 
 
@@ -361,7 +385,6 @@
 			String key = lookUpDelegateKey( aDelegateKey);
 			if ( endpoint == null )
 			{
-				
 				endpoint = (Endpoint) destinationMap.get(key);
 				if ( endpoint == null )
 				{
@@ -377,7 +400,6 @@
 			
 			if (sendReply && allDelegatesCompletedCollection() && getClientEndpoint() != null)
 			{
-				
 				sendCpcReply();
 			}
 		}
@@ -403,9 +425,33 @@
 		}
 		//	Log this controller's stats
 		logStats(getComponentName(),servicePerformance);
+		
+		endProcess(AsynchAEMessage.Process);
+
 		getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, getClientEndpoint());
 		clientEndpoint = null;
 		clearStats();
+		
+		Map delegates = ((AggregateAnalysisEngineController)this).getDestinations();
+		Set set = delegates.entrySet();
+		for( Iterator it = set.iterator(); it.hasNext();)
+		{
+			Map.Entry entry = (Map.Entry)it.next();
+			Endpoint endpoint = (Endpoint)entry.getValue();
+			if ( endpoint != null )
+			{
+				//	Fetch stats for the delegate
+				ServicePerformance delegatePerformanceStats =
+					((AggregateAnalysisEngineController)this).
+						getDelegateServicePerformance((String)entry.getKey());
+				if ( delegatePerformanceStats != null )
+				{
+					//delegatePerformanceStats.reset();
+				}
+			}
+		}
+		//getServicePerformance().reset();
+		
 	}
 	/**
 	 * 
@@ -432,7 +478,7 @@
 				if (!shownOnce)
 				{
 					shownOnce = true;
-					cache.dumpContents();
+					cache.dumpContents(getComponentName());
 				}
 
 				if (cache.isEmpty())
@@ -473,7 +519,7 @@
 		
 		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), 
 				"collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cpc__FINEST", new Object[] { getName() });
-		getInProcessCache().dumpContents();
+		getInProcessCache().dumpContents(getComponentName());
 
 		cacheClientEndpoint(anEndpoint);
 
@@ -685,7 +731,7 @@
 
 					// Save the subordinate Flow Object in a cache. Flow exists in the
 					// cache until the CAS is fully processed or it is
-					// explicitely deleted when processing of this CAS cannot continue
+					// explicitly deleted when processing of this CAS cannot continue
 					synchronized( flowMap )
 					{
 						flowMap.put(aNewCasReferenceId, flow);
@@ -896,44 +942,218 @@
 				remoteEndpoints[i].initialize();
 				remoteEndpoints[i].setController(this);
 				String key = lookUpDelegateKey(remoteEndpoints[i].getEndpoint());
-				Endpoint endpoint = ((Endpoint) destinationMap.get(key));
-				if ( key != null && endpoint != null)
+				if ( key != null && destinationMap.containsKey(key))
 				{
-					ServiceInfo serviceInfo = endpoint.getServiceInfo();
-					PrimitiveServiceInfo pServiceInfo = new PrimitiveServiceInfo();
-					pServiceInfo.setBrokerURL(serviceInfo.getBrokerURL());
-					pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
-					pServiceInfo.setState(serviceInfo.getState());
-					pServiceInfo.setAnalysisEngineInstanceCount(1);
-					
-					registerWithAgent(pServiceInfo, super.getManagementInterface().getJmxDomain()
-							+super.jmxContext+",r"+remoteIndex+"="+key+" [Remote Uima EE Service],name="+key+"_"+serviceInfo.getLabel());
+					Endpoint endpoint = ((Endpoint) destinationMap.get(key));
+					if ( key != null && endpoint != null)
+					{
+						ServiceInfo serviceInfo = endpoint.getServiceInfo();
+						PrimitiveServiceInfo pServiceInfo = new PrimitiveServiceInfo();
+						pServiceInfo.setBrokerURL(serviceInfo.getBrokerURL());
+						pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
+						pServiceInfo.setState(serviceInfo.getState());
+						pServiceInfo.setAnalysisEngineInstanceCount(1);
+						
+						registerWithAgent(pServiceInfo, super.getManagementInterface().getJmxDomain()
+								+super.jmxContext+",r"+remoteIndex+"="+key+" [Remote Uima EE Service],name="+key+"_"+serviceInfo.getLabel());
+
+						ServicePerformance servicePerformance = new ServicePerformance();
+						//servicePerformance.setIdleTime(System.nanoTime());
+						servicePerformance.setRemoteDelegate();
+
+						registerWithAgent(servicePerformance, super.getManagementInterface().getJmxDomain()+super.jmxContext+",r"+remoteIndex+"="+key+" [Remote Uima EE Service],name="+key+"_"+servicePerformance.getLabel());
+
+						ServiceErrors serviceErrors = new ServiceErrors();
+						
+						registerWithAgent(serviceErrors, super.getManagementInterface().getJmxDomain()+super.jmxContext+",r"+remoteIndex+"="+key+" [Remote Uima EE Service],name="+key+"_"+serviceErrors.getLabel());
+						remoteIndex++;
+
+						serviceErrorMap.put(key, serviceErrors);
+						Object[] delegateStatsArray = 
+							new Object[] { pServiceInfo, servicePerformance, serviceErrors }; 
 
-					ServicePerformance servicePerformance = new ServicePerformance();
-					
-					registerWithAgent(servicePerformance, super.getManagementInterface().getJmxDomain()+super.jmxContext+",r"+remoteIndex+"="+key+" [Remote Uima EE Service],name="+key+"_"+servicePerformance.getLabel());
+						delegateStatMap.put( key, delegateStatsArray);					
+					}
+					dispatchMetadataRequest(remoteEndpoints[i]);
+				}
+			}
+		}
+	}
 
-					ServiceErrors serviceErrors = new ServiceErrors();
-					
-					registerWithAgent(serviceErrors, super.getManagementInterface().getJmxDomain()+super.jmxContext+",r"+remoteIndex+"="+key+" [Remote Uima EE Service],name="+key+"_"+serviceErrors.getLabel());
-					remoteIndex++;
+	public void finalStep(FinalStep aStep,  String aCasReferenceId)
+	{
+		Endpoint endpoint=null;
+		boolean casDropped = false;
+
+		boolean subordinateCasInPlayCountDecremented=false;
+		CacheEntry cacheEntry = null;
+		Endpoint freeCasEndpoint = null;
+		
+		try
+		{
+			//	Get entry from the cache for a given CAS Id. This throws an exception if
+			//	an entry doesnt exist in the cache
+			cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+			//	Mark the entry to indicate that the CAS reached a final step. This CAS
+			//	may still have children and will not be returned to the client until
+			//	all of them are fully processed. This state info will aid in the
+			//	internal bookkeeping when the final child is processed.
+			cacheEntry.setState(CacheEntry.FINAL_STATE);
+
+		}
+		catch(Exception e)
+		{
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+			return;
+		}
+		//	Found the entry in the cache for a given CAS id
+		try
+		{
+			endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
+			//	Check if this CAS has children (subordinates)
+			if ( getInProcessCache().hasNoSubordinates(aCasReferenceId) == false)
+			{
+				//	This CAS has child CASes still in play. This CAS will remain in the cache
+				//	until all its children are fully processed. 
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), 
+						"finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step_parent_cas_child_count__FINEST", new Object[] { getComponentName(),aCasReferenceId,cacheEntry.getSubordinateCasInPlayCount()});
+				// Leave input CAS in pending state. It will be returned to the client
+	    		// *only* if the last subordinate CAS is fully processed.
+	    		cacheEntry.setPendingReply(true);
+	    		cacheEntry.setFinalStep(aStep);
+	    		//	Done here. There are subordinate CASes still being processed.
+				return;
+			}
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), 
+					"finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step_parent_cas_no_children__FINEST", new Object[] { getComponentName(),aCasReferenceId});
 
-					serviceErrorMap.put(key, serviceErrors);
-					Object[] delegateStatsArray = 
-						new Object[] { pServiceInfo, servicePerformance, serviceErrors }; 
+			//	If this CAS has a parent, save the destination of a CM that produced it and where we may need to send Free Cas Notification
+			if ( cacheEntry.isSubordinate())
+			{
+				freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
+			}
 
-					delegateStatMap.put( key, delegateStatsArray);					
+			CacheEntry parentCASCacheEntry = null;
+			//	If this service is not a Cas Multiplier and a given CAS has a parent
+			//	decrement a number of children the parent CAS has in play. The child
+			//	CAS will be dropped. If this aggragate *is* a cas multiplier, the client
+			//	will send it a Release CAS request and only than the child count of the
+			//	parent CAS can be decremented.
+			if ( cacheEntry.isSubordinate() && isTopLevelComponent())
+			{
+				//	 This is a subordinate CAS. First get cache entry for the input (parent) CAS
+				parentCASCacheEntry = getInProcessCache().getCacheEntryForCAS(cacheEntry.getInputCasReferenceId());
+				parentCASCacheEntry.decrementSubordinateCasInPlayCount();
+				//	Save this state in case an exception happens below, the error handler will not decrement children again
+				subordinateCasInPlayCountDecremented = true;
+			}
+			Endpoint clientEndpoint = null;
+			//	If the CAS was generated by this component but the Flow Controller wants to drop it OR this component
+			//	is not a Cas Multiplier
+			if ( forceToDropTheCas( cacheEntry, aStep ) )
+			{
+				if ( cacheEntry.isReplyReceived() ) //|| isTopLevelComponent())
+				{
+					//	Drop the CAS and remove cache entry for it
+					dropCAS(aCasReferenceId, true);
+					casDropped = true;
+					//	If debug level=FINEST dump the entire cache
+					getInProcessCache().dumpContents(getComponentName());
 				}
-				dispatchMetadataRequest(remoteEndpoints[i]);
+			} 
+			else 
+			{
+				if ( cacheEntry.isSubordinate())
+				{
+					cacheEntry.setWaitingForRelease(true);
+				}
+				//	Send a reply to the Client. If the CAS is an input CAS it will be dropped
+				clientEndpoint = replyToClient( cacheEntry );
+			}
+			//	Now check if the CASes parent CAS is ready for a finalStep. The parent CAS may 
+			//	have been processed already but	it is cached since its children are still 
+			//	in play.
+			if ( releaseParentCas(casDropped, clientEndpoint, parentCASCacheEntry) )
+			{
+				//	All subordinate CASes have been processed. Process the parent CAS recursively.
+				finalStep(parentCASCacheEntry.getFinalStep(), parentCASCacheEntry.getCasReferenceId());
 			}
 		}
+		catch( Exception e)
+		{
+			HashMap map = new HashMap();
+			map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
+			map.put(AsynchAEMessage.CasReference, aCasReferenceId);
+			//	If the subordinate count has been decremented, let the error handler know
+			//	so that it doesn't decrement the count again. The default action in the
+			//	error handler is to decrement number of subordinates responding. An exception
+			//	that is no subject to retry will be counted as a response.
+			if (subordinateCasInPlayCountDecremented)
+			{
+				map.put(AsynchAEMessage.SkipSubordinateCountUpdate, true);
+			}
+			if ( endpoint != null )
+			{
+				map.put(AsynchAEMessage.Endpoint, endpoint);
+			}
+			handleError(map, e);
+		}
+		finally
+		{
+			removeMessageOrigin(aCasReferenceId);
+			dropStats(aCasReferenceId, super.getName());
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), 
+					"finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step_show_internal_stats__FINEST", new Object[] { getName(), flowMap.size(),getInProcessCache().getSize(),originMap.size(), super.statsMap.size()});
+			//	freeCasEndpoint is a special endpoint for sending Free CAS Notification.
+			if (  casDropped && freeCasEndpoint != null )
+			{
+				freeCasEndpoint.setReplyEndpoint(true);
+				try
+				{
+					//	send Free CAS Notification to a Cas Multiplier
+					getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId, freeCasEndpoint);
+				}
+				catch( Exception e) 
+				{
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+				}
+			}
+		}
+	}
+	private boolean releaseParentCas(boolean casDropped, Endpoint clientEndpoint, CacheEntry parentCASCacheEntry) 
+	{
+		return (
+				(casDropped || (clientEndpoint != null && !clientEndpoint.isRemote() )) 
+				&& parentCASCacheEntry != null  
+			    && parentCASCacheEntry.isReplyReceived()
+			    && parentCASCacheEntry.isPendingReply()
+			    && parentCASCacheEntry.getState() == CacheEntry.FINAL_STATE
+				&& parentCASCacheEntry.getSubordinateCasInPlayCount() == 0
+			);
 	}
 
-	private void finalStep(FinalStep aStep, String aCasReferenceId)// throws AsynchAEException
+	private boolean forceToDropTheCas( CacheEntry cacheEntry, FinalStep aStep)
+	{
+		//	Get the key of the Cas Producer
+		String casProducer = cacheEntry.getCasProducerAggregateName();
+		//	CAS is considered new from the point of view of this service IF it was produced by it
+		boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(casProducer));
+		//	If the CAS was generated by this component but the Flow Controller wants to drop the CAS OR this component
+		//	is not a Cas Multiplier
+		if (  isNewCas && ( aStep.getForceCasToBeDropped() || !isCasMultiplier()) )
+		{
+			return true;
+		}
+		return false;
+	}
+	
+	/*
+	protected void finalStep(FinalStep aStep,  String aCasReferenceId)// throws AsynchAEException
 	{
 		Endpoint endpoint=null;
 		boolean subordinateCasInPlayCountDecremented=false;
 		CacheEntry cacheEntry = null;
+		Endpoint freeCasEndpoint = null;
 		try
 		{
 			cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
@@ -944,7 +1164,8 @@
 		}
 		catch(Exception e)
 		{
-				return;
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+			return;
 		}
 		
 		try
@@ -972,26 +1193,26 @@
 						// Leave input CAS in pending state. It will be returned to the client
 			    		// *only* if the last subordinate CAS is fully processed.
 			    		cacheEntry.setPendingReply(true);
-						//	Done here. There are subordinate CASes still being processed.
+			    		cacheEntry.setFinalStep(aStep);
+			    		//	Done here. There are subordinate CASes still being processed.
 						return;
 					}
 					else
 					{
 						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), 
 								"finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step_parent_cas_no_children__FINEST", new Object[] { getComponentName(),aCasReferenceId});
-
 						//	All subordinates have been fully processed. Set the flag so that
 						//	the input is returned back to the client.
 						replyWithInputCAS = true;
 					}
 				}
 			}
-			else //if ( isTopLevelComponent())
+			else
 			{
 				//	 This is a subordinate CAS. First get cache entry for the input (parent) CAS
 				parentCASCacheEntry = 
 					getInProcessCache().getCacheEntryForCAS(cacheEntry.getInputCasReferenceId());
-				if ( getMessageOrigin(aCasReferenceId) == null )
+				if ( getMessageOrigin(aCasReferenceId) == null && !isCasMultiplier())
 				{
 					replyWithInputCAS = decrementCasSubordinateCount( parentCASCacheEntry);
 					if ( parentCASCacheEntry != null )
@@ -1006,9 +1227,8 @@
 				{
 					replyWithInputCAS = true;
 				}
-				
 			}
-			
+			freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
 			// Cas Processing has been completed. Check if the CAS should be sent to
 			// the client.
 			// Any of the two following conditions will prevent this aggregate from
@@ -1025,49 +1245,27 @@
 			// New CASes must be dropped if aggregate doesn't output them or if flow controller has ActionAfterCasMultiplier="drop"
 			if (isNewCas && (aStep.getForceCasToBeDropped() || !aggregateMetadata.getOperationalProperties().getOutputsNewCASes()))
 			{
+				
 				endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
 				if ( cacheEntry.isReplyReceived())
 				{
 					dropCAS(aCasReferenceId, true);
 				}
-				
-				if ( parentCASCacheEntry != null //&& parentCASCacheEntry.isSubordinate() 
-					    && parentCASCacheEntry.isReplyReceived()
-					    && parentCASCacheEntry.getState() == CacheEntry.FINAL_STATE
-						&& parentCASCacheEntry.getSubordinateCasInPlayCount() == 0)
-				{
-					//	All subordinate CASes have been processed. Process the parent
-					//	CAS recursively.
-					finalStep(aStep, parentCASCacheEntry.getCasReferenceId());
-				}
-			}
-/*			
-			if ( replyWithInputCAS && parentCASCacheEntry != null )
-			{
-				//	Reply with the input CAS
-				replyToClient( parentCASCacheEntry.getCasReferenceId() );
-			}
-			else
-			{
-				replyToClient( aCasReferenceId );
-			}
-*/			
-			if ( replyWithInputCAS && getMessageOrigin(aCasReferenceId) != null)
-			{
-				//	Reply with the input CAS
-				replyToClient( aCasReferenceId );
-			}
-			
-			String casMultiplierKey = cacheEntry.getCasMultiplierKey();
-			if ( isNewCas  && casMultiplierKey != null ) //&& cacheEntry.shouldSendRequestToFreeCas())
-			{
-				endpoint = lookUpEndpoint(casMultiplierKey, true);
-				if ( endpoint != null && endpoint.isRemote() && endpoint.isCasMultiplier() ) //&& cacheEntry.shouldSendRequestToFreeCas() )
-				{
-					endpoint.setEndpoint(endpoint.getEndpoint()+"__CasSync");
-					getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, endpoint );
-				}
-				endpoint = null;
+			} 
+			else if (aggregateMetadata.getOperationalProperties().getOutputsNewCASes() ||
+					( replyWithInputCAS && getMessageOrigin(aCasReferenceId) != null) )
+			{
+				//	Send a reply to the Client. If the CAS is an input CAS it will be dropped
+				replyToClient( aCasReferenceId, cacheEntry );
+			}
+			if ( parentCASCacheEntry != null  
+				    && parentCASCacheEntry.isReplyReceived()
+				    && parentCASCacheEntry.getState() == CacheEntry.FINAL_STATE
+					&& parentCASCacheEntry.getSubordinateCasInPlayCount() == 0)
+			{
+				//	All subordinate CASes have been processed. Process the parent
+				//	CAS recursively.
+				finalStep(aStep, parentCASCacheEntry.getCasReferenceId());
 			}
 			removeMessageOrigin(aCasReferenceId);
 			dropStats(aCasReferenceId, super.getName());
@@ -1094,16 +1292,27 @@
 			}
 			handleError(map, e);
 		}
-/*
 		finally
 		{
-			if ( aCasReferenceId != null && originMap.containsKey(aCasReferenceId))
+			//	freeCasEndpoint is a special endpoint for sending Free CAS Notification.
+			//	This endpoint will be set for each CAS generated in a Cas Multiplier.
+			if ( !isCasMultiplier() && freeCasEndpoint != null )
 			{
-				originMap.remove(aCasReferenceId);
+				freeCasEndpoint.setReplyEndpoint(true);
+				try
+				{
+					//	send Free CAS Notification to a Cas Multiplier
+					getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId, freeCasEndpoint);
+				}
+				catch( Exception e) 
+				{
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+				}
 			}
 		}
-*/		
 	}
+	 */
+/*	
 	public boolean decrementCasSubordinateCount( CacheEntry aParentCasCacheEntry )
 	{
 		if ( aParentCasCacheEntry != null )
@@ -1128,33 +1337,43 @@
 		}
 		return false;
 	}
-	private void replyToClient(String aCasReferenceId ) throws Exception
+*/	
+//	private void replyToClient(String aCasReferenceId, CacheEntry cacheEntry ) throws Exception
+	private Endpoint replyToClient(CacheEntry cacheEntry ) throws Exception
 	{
-		Endpoint endpoint;
+		Endpoint endpoint = null;
 		
 		// Get the endpoint that represents a client that send the request
 		// to this service. If the first arg to getEndpoint() is null, the method
 		// should return the origin.
 		if (isTopLevelComponent())
 		{
-			endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
+			if ( cacheEntry.isSubordinate()) //getCasSequence() > 0 )
+			{
+				endpoint = getInProcessCache().getTopAncestorEndpoint(cacheEntry);	
+//				endpoint = getInProcessCache().getEndpoint(null, cacheEntry.getInputCasReferenceId());
+			}
+			else
+			{
+				endpoint = getInProcessCache().getEndpoint(null, cacheEntry.getCasReferenceId());
+			}
 		}
 		else
 		{
-			endpoint = getMessageOrigin(aCasReferenceId);
-			dropFlow(aCasReferenceId, false);
+			endpoint = getMessageOrigin(cacheEntry.getCasReferenceId());
+			dropFlow(cacheEntry.getCasReferenceId(), false);
 		}
 		if ( endpoint != null )
 		{
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), 
-					"finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step__FINEST", new Object[] { aCasReferenceId, (double) (System.nanoTime() - endpoint.getEntryTime()) / (double) 1000000 });
+					"replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step__FINEST", new Object[] { cacheEntry.getCasReferenceId(), (double) (System.nanoTime() - endpoint.getEntryTime()) / (double) 1000000 });
 
 			if (endpoint.getEndpoint() == null)
 			{
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_reply_destination__INFO", new Object[] { aCasReferenceId });
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_reply_destination__INFO", new Object[] { cacheEntry.getCasReferenceId() });
 				HashMap map = new HashMap();
 				map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
-				map.put(AsynchAEMessage.CasReference, aCasReferenceId);
+				map.put(AsynchAEMessage.CasReference, cacheEntry.getCasReferenceId());
 				handleError(map, new UnknownDestinationException());
 
 			}
@@ -1164,23 +1383,40 @@
 				
 				if ( !isStopped() )
 				{
-					// Send response to the given endpoint
-					getOutputChannel().sendReply(aCasReferenceId, endpoint);
+					//	Check if this CAS is new, meaning it has a parent and this component is a Cas Multiplier
+					if ( cacheEntry.isSubordinate() && isCasMultiplier())
+					{
+						//	Add the generated CAS to the outstanding CAS Map. Client notification will release
+						//	this CAS back to its pool
+						synchronized(syncObject)
+						{
+							cmOutstandingCASes.put(cacheEntry.getCasReferenceId(),cacheEntry.getCasReferenceId());
+						}
+
+						// Send response to a given endpoint
+						//getOutputChannel().sendReply(cacheEntry.getCas(), cacheEntry.getInputCasReferenceId(), aCasReferenceId, endpoint, cacheEntry.getCasSequence());
+						getOutputChannel().sendReply(cacheEntry, endpoint);
+					}
+					else
+					{
+						// Send response to a given endpoint
+						getOutputChannel().sendReply(cacheEntry.getCasReferenceId(), endpoint);
+					}
 				}
 			}
-			// If the destination for the reply is in the same jvm dont remove
-			// the entry from the cache. The client may need to retrive CAS by reference
-			// to do some post-processing. The client will remove the entry when done
-			// post-processing CAS.
-			if ( !endpoint.getServerURI().startsWith("vm:"))
-			{
-				// Message was fully processed, remove state info related to the
-				// previous CAS from the cache
-				InProcessCache cache = getInProcessCache();
-				
-				dropCAS(aCasReferenceId, true);
+			//	Drop the CAS only if the client is remote and the CAS is an input CAS. 
+			//  If this CAS has a parent the client will send Realease CAS notification to release the CAS.
+			if ( endpoint.isRemote() && !cacheEntry.isSubordinate())
+			{
+				dropCAS(cacheEntry.getCasReferenceId(), true);
+				//	If the cache is empty change the state of the Aggregate to idle
+				if ( getInProcessCache().isEmpty() )
+				{
+					endProcess(AsynchAEMessage.Process);
+				}
 			}
 		}
+		return endpoint;
 	}
 	private void executeFlowStep(FlowContainer aFlow, String aCasReferenceId, boolean newCAS) throws AsynchAEException
 	{
@@ -1228,6 +1464,19 @@
 			}
 			else if (step instanceof FinalStep)
 			{
+				//	Special case: check if this CAS has just been produced by a Cas Multiplier.
+				//	If so, we received a new CAS but there are no delegates in the pipeline. 
+				//	The CM was the last in the flow. In this case, set a property in the cache
+				//	to simulate receipt of the reply to this CAS. This is so that the CAS is
+				//	released in the finalStep() when the Aggregate is not a Cas Multiplier.
+				if ( newCAS)
+				{
+					CacheEntry entry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+					if ( entry != null )
+					{
+						entry.setReplyReceived();
+					}
+				}
 				finalStep((FinalStep) step, aCasReferenceId);
 			}
 
@@ -1301,13 +1550,58 @@
 	
 	public String lookUpDelegateKey(String anEndpointName)
 	{
-
+		return lookUpDelegateKey(anEndpointName, null);
+	}
+	/**
+	 * Returns a delegate key given an endpoint (queue) name and a server uri.
+	 * If a server is null, only the endpoint name will be used for matching.
+	 */
+	public String lookUpDelegateKey(String anEndpointName, String server)
+	{
+		String key = null;
 		if (destinationToKeyMap.containsKey(anEndpointName))
 		{
-			return (String) destinationToKeyMap.get(anEndpointName);
+			Set keys = destinationMap.keySet();
+			Iterator it = keys.iterator();
+			//	Find an endpoint for the GetMeta reply. To succeed, match the endpoint (queue) name
+			//	as well as the server URI. We allow endpoints managed by different servers to have
+			//	the same queue name.
+			//	iterate over all endpoints until a match [queue,server] is found.
+			while( it.hasNext())
+			{
+				key = (String)it.next();
+				Endpoint_impl endp = (Endpoint_impl) destinationMap.get(key);
+
+				//	Check if a queue name matches
+				if ( endp != null &&  endp.getEndpoint().equalsIgnoreCase(anEndpointName))
+				{
+					//	Check if server match is requested as well
+					if ( server != null )
+					{
+						//	server URIs must match
+						if (  endp.getServerURI() != null && endp.getServerURI().equalsIgnoreCase(server) )
+						{
+							//	found a match for [queue,server]
+							break;
+						}
+						//	Not found yet. Reset the key
+						key = null;
+						continue;
+					}
+					//	found a match for [queue]
+					break;
+				}
+				//	Not found yet. Reset the key
+				key = null;
+			}
 		}
 
-		return null;
+//		if (destinationToKeyMap.containsKey(anEndpointName))
+//		{
+//			return (String) destinationToKeyMap.get(anEndpointName);
+//		}
+
+		return key;
 	}
 
 	public Endpoint lookUpEndpoint(String anAnalysisEngineKey, boolean clone) throws AsynchAEException
@@ -1354,13 +1648,34 @@
     	}
     	return null;
     }
-
     public void mergeTypeSystem(String aTypeSystem, String fromDestination) throws AsynchAEException
+    {
+    	mergeTypeSystem(aTypeSystem, fromDestination, null);
+    }
+    public void mergeTypeSystem(String aTypeSystem, String fromDestination, String fromServer) throws AsynchAEException
 	{
 		try
 		{
-			String key = findKeyForValue(fromDestination);
-			Endpoint_impl endpoint = (Endpoint_impl) destinationMap.get(key);
+			Set keys = destinationMap.keySet();
+			Iterator it = keys.iterator();
+			Endpoint_impl endpoint = null;
+			String key = null;
+			//	Find an endpoint for the GetMeta reply. To succeed, match the endpoint (queue) name
+			//	as well as the server URI. We allow endpoints managed by different servers to have
+			//	the same queue name.
+			//	iterate over all endpoints until a match [queue,server] is found.
+			while( it.hasNext())
+			{
+				key = (String)it.next();
+				Endpoint_impl endp = (Endpoint_impl) destinationMap.get(key);
+
+				if ( endp.getServerURI() != null && endp.getServerURI().equalsIgnoreCase(fromServer) && endp.getEndpoint().equalsIgnoreCase(fromDestination))
+				{
+					endpoint = endp;
+					break;
+				}
+				
+			}
 
 			if (endpoint == null)
 			{
@@ -1373,6 +1688,7 @@
 				endpoint.cancelTimer();
 				boolean collocatedAggregate = false;
 				ResourceMetaData resource = null;
+				ServiceInfo remoteDelegateServiceInfo = null;
 				if (aTypeSystem.trim().length() > 0)
 				{
 					if ( endpoint.isRemote() )
@@ -1395,29 +1711,11 @@
 					{
 						endpoint.setIsCasMultiplier(true);
 						remoteCasMultiplierList.add(key);
-						if ( endpoint.isRemote())
-						{
-							int remoteCasPoolSize = 0;
-							Object o = null;
-							if ( ( o = ((ProcessingResourceMetaData) resource).getConfigurationParameterSettings().getParameterValue(AnalysisEngineController.CasPoolSize)) != null )
-							{
-								remoteCasPoolSize = ((Integer)o).intValue();
-								System.out.println(">>>>>>>>>>>>>> Remote CAS Pool Size:::"+remoteCasPoolSize);
-								if ( remoteCasPoolSize > endpoint.getShadowPoolSize() )
-								{
-									System.out.println(">>>>> Remote Cas Multiplier Cas Pool Size Exceeds the Size of the Local Cas Pool Size <<<<<<");
-									UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, CLASS_NAME.getName(), "mergeTypeSystem", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_invalid_cp_size__CONFIG", new Object[] {getName(), fromDestination, remoteCasPoolSize, endpoint.getShadowPoolSize() });
-									throw new ResourceConfigurationException(UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,"UIMAEE_invalid_cp_size__CONFIG", new Object[] {getName(), fromDestination, remoteCasPoolSize, endpoint.getShadowPoolSize()});
-								}
-								cmCasPoolSizeDelta = endpoint.getShadowPoolSize()-remoteCasPoolSize;
-							}
-						}
 					}
-					
 					if ( endpoint.isRemote())
 					{
 						Object o = null;
-						ServiceInfo remoteDelegateServiceInfo =
+						remoteDelegateServiceInfo =
 							getDelegateServiceInfo(key);
 						if ( remoteDelegateServiceInfo != null && remoteDelegateServiceInfo instanceof PrimitiveServiceInfo &&
 							 ( o = ((ProcessingResourceMetaData) resource).getConfigurationParameterSettings().getParameterValue(AnalysisEngineController.AEInstanceCount)) != null )
@@ -1432,7 +1730,6 @@
 				}
 
 				endpoint.setInitialized(true);
-
 				//	If getMeta request not yet sent, send meta request to all remote delegate
 				//	Special case when all delegates are remote is handled in the setInputChannel
 
@@ -1447,15 +1744,6 @@
 							unregisteredDelegateList.remove(i);
 						}
 					}
-/*
-					//	When all collocated delegates reply with metadata send request for meta to
-					//	remote delegates.
-					if ( unregisteredDelegateList.size() == 0 )
-					{
-						requestForMetaSentToRemotes = true;
-						sendRequestForMetadataToRemoteDelegates();
-					}
-					*/
 				}
 
 				
@@ -1472,6 +1760,10 @@
 							{
 								System.out.println("Setting Shadow Pool of Size:"+endpt.getShadowPoolSize()+" For Cas Multiplier:"+(String)remoteCasMultiplierList.get(i));						
 								getCasManagerWrapper().initialize(endpt.getShadowPoolSize(),(String)remoteCasMultiplierList.get(i));
+								if ( remoteDelegateServiceInfo != null )
+								{
+									remoteDelegateServiceInfo.setCASMultiplier();
+								}
 							}
 						}
 						if ( !isStopped() )
@@ -1619,9 +1911,11 @@
 				pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
 				pServiceInfo.setState(serviceInfo.getState());
 				pServiceInfo.setAnalysisEngineInstanceCount(1);
-
 				ServicePerformance servicePerformance = new ServicePerformance();
-
+				if ( anEndpoint.isRemote() )
+				{
+					servicePerformance.setRemoteDelegate();
+				}
 				ServiceErrors serviceErrors = new ServiceErrors();
 
 				serviceErrorMap.put(key, serviceErrors);
@@ -1735,20 +2029,6 @@
 			
 		}
 	}
-	
-	public boolean sendRequestToReleaseCas()
-	{
-		
-		synchronized( counterMonitor )
-		{
-			if ( cmCasPoolSizeDelta > 0 && counter < cmCasPoolSizeDelta )
-			{
-				counter++;
-				return true;
-			}
-			return false;
-		}
-	}
 	public ServiceErrors getServiceErrors(String aDelegateKey)
 	{
 		if ( !serviceErrorMap.containsKey(aDelegateKey ))
@@ -1761,7 +2041,7 @@
 	{
 		if ( serviceInfo == null )
 		{
-			serviceInfo = new AggregateServiceInfo();
+			serviceInfo = new AggregateServiceInfo(isCasMultiplier());
 			if ( getInputChannel() != null )
 			{
 				serviceInfo.setInputQueueName(getInputChannel().getName());
@@ -1860,22 +2140,8 @@
 		super.stop();
 		cleanUp();
 	}
-/*	
-	public ServicePerformance getCasStatistics( String aCasReferenceId )
+	protected List getChildControllerList()
 	{
-		ServicePerformance casStats = null;
-		if ( perCasStatistics.containsKey(aCasReferenceId) )
-		{
-			casStats = (ServicePerformance)perCasStatistics.get(aCasReferenceId);
-		}
-		else
-		{
-			casStats = new ServicePerformance();
-			perCasStatistics.put( aCasReferenceId, casStats);
-			System.out.println("########## AggregateController.getCasStatistics()-Controller:"+getComponentName()+" Creating New ServicePerformance Object for Cas:"+aCasReferenceId+" Map HashCode:"+perCasStatistics.hashCode()+" Map Size:"+perCasStatistics.size());
-		}
-		return casStats;
+		return childControllerList;
 	}
-*/
-
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineController.java Fri Aug 22 11:53:05 2008
@@ -58,10 +58,6 @@
 	public InputChannel getInputChannel();
 
 	public InputChannel getInputChannel(String aQueueName );
-	
-	public long getIdleTime( String aKey );
-	
-	public void saveIdleTime( long snapshot, String aKey, boolean accumulate );
 
 	public void saveReplyTime( long snapshot, String aKey );
 	
@@ -142,6 +138,8 @@
 	public void addTimeSnapshot( long snapshot, String aKey );
 	
 	public ServicePerformance getServicePerformance();
+
+	public ServiceInfo getServiceInfo();
 	
 	public long getTimeSnapshot( String aKey );
 
@@ -162,5 +160,29 @@
     public void notifyListenersWithInitializationStatus(Exception e);
   
 	public ServicePerformance getCasStatistics( String aCasReferenceId );
+	
+    public boolean isCasMultiplier();
+
+	public void releaseNextCas(String aCasReferenceId);
+
+//	public long getTotalIdleTime();
+	public long getIdleTime();
+	
+	//	This is called every time a request comes
+	public void beginProcess(int msgType);
+
+	//	This is called every time a request is completed
+	public void endProcess(int msgType);
+	
+	//	Returns the idle time between process CAS calls
+	public long getIdleTimeBetweenProcessCalls(int msgType);
+
+	public long getCpuTime();
+	
+	public long getAnalysisTime();
+	
+	public void incrementSerializationTime(long cpuTime);
+	
+	public void incrementDeserializationTime(long cpuTime);
 
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java Fri Aug 22 11:53:05 2008
@@ -61,7 +61,7 @@
 	 */
 	public void checkin(AnalysisEngine anAnalysisEngine) throws Exception
 	{
-		// 
+		aeInstanceMap.put(Thread.currentThread().getName(), anAnalysisEngine);
 	}
 
 	/**
@@ -72,7 +72,7 @@
 	 *  
 	 * @see org.apache.uima.aae.controller.AnalysisEngineInstancePool#checkout()
 	 **/
-	public AnalysisEngine checkout() throws Exception
+	public synchronized AnalysisEngine checkout() throws Exception
 	{
 		AnalysisEngine ae = null;
 		
@@ -95,13 +95,24 @@
 			}
 		}
 		//	ae may have been assigned above already, no need to fetch it again
+		
+		/*
 		if ( ae == null )
 		{
 			//	Fetch ae instance from the map using thread name as key. This mechanism assures that a thread
 			//	uses the same ae instance every time.
 			ae = (AnalysisEngine)aeInstanceMap.get(Thread.currentThread().getName()) ;
 		}
+		
 		return ae;
+		*/
+		
+		if ( aeInstanceMap.containsKey(Thread.currentThread().getName()) )
+		{
+			return (AnalysisEngine)aeInstanceMap.remove(Thread.currentThread().getName()) ;
+		}
+		else
+			return null;
 	}
 
 	/* (non-Javadoc)