You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/08/22 20:53:06 UTC

svn commit: r688174 [3/3] - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main: java/org/apache/uima/aae/ java/org/apache/uima/aae/client/ java/org/apache/uima/aae/controller/ java/org/apache/uima/aae/error/handler/ java/org/apache/uima/aa...

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Fri Aug 22 11:53:05 2008
@@ -25,6 +25,7 @@
 import org.apache.uima.aae.InProcessCache.CacheEntry;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.Endpoint_impl;
 import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.error.ErrorContext;
@@ -45,7 +46,8 @@
 public class ProcessRequestHandler_impl extends HandlerBase
 {
 	private static final Class CLASS_NAME = ProcessRequestHandler_impl.class;
-
+	private Object mux = new Object();
+	
 	public ProcessRequestHandler_impl(String aName)
 	{
 		super(aName);
@@ -58,89 +60,287 @@
 		entry.incrementTimeWaitingForCAS( aTimeWaitingForCAS);
 		entry.incrementTimeToDeserializeCAS(aTimeToDeserializeCAS);
 	}
-	
-	private void handleProcessRequestWithXMI(MessageContext aMessageContext) throws AsynchAEException
+	private boolean messageContainsXMI(MessageContext aMessageContext, String casReferenceId) throws Exception
+	{
+		//	Fetch serialized CAS from the message
+		String xmi = aMessageContext.getStringMessage();
+		//	*****************************************************************
+		// ***** NO XMI In Message. Kick this back to sender with exception
+		//	*****************************************************************
+		if ( xmi == null )
+		{
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+	                "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_message_has_no_cargo__INFO",
+	                new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+			getController().
+				getOutputChannel().
+					sendReply(new InvalidMessageException("No XMI data in message"), casReferenceId, aMessageContext.getEndpoint(),AsynchAEMessage.Process);
+			//	Dont process this empty message
+			return false;
+		}
+		return true;
+	}
+	private synchronized CAS getCAS( boolean fetchCASFromShadowCasPool, String shadowCasPoolKey, String casReceivedFrom )
 	{
 		CAS cas = null;
-		String casReferenceId = null;
+	    //	If this is a new CAS (generated by a CM), fetch a CAS from a Shadow Cas Pool associated with a CM that
+		//	produced the CAS. Each CM will have its own Shadow Cas Pool
+		if ( fetchCASFromShadowCasPool )
+		{
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+	                "getCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_cm__FINE",
+	                new Object[] {  shadowCasPoolKey });
+			//	Aggregate time spent waiting for a CAS in the shadow cas pool
+			((AggregateAnalysisEngineController)getController()).getDelegateServicePerformance(shadowCasPoolKey).beginWaitOnShadowCASPool();
+			cas = getController().getCasManagerWrapper().getNewCas(shadowCasPoolKey);
+			((AggregateAnalysisEngineController)getController()).getDelegateServicePerformance(shadowCasPoolKey).endWaitOnShadowCASPool();
+			
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+	                "getCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted_cm__FINE",
+	                new Object[] { shadowCasPoolKey });
+		}
+	    else
+	    {
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+	                "getCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas__FINE",
+	                new Object[] {  casReceivedFrom });
+
+			//	Aggregate time spent waiting for a CAS in the service cas pool
+			getController().getServicePerformance().beginWaitOnCASPool();
+			
+			cas = getController().getCasManagerWrapper().getNewCas();
+			getController().getServicePerformance().endWaitOnCASPool();
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+	                "getCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted__FINE",
+	                new Object[] { casReceivedFrom });
+	    }
+		return cas;
+	}
+	/**
+	 * 
+	 * @param casReferenceId
+	 * @param freeCasEndpoint
+	 * @param shadowCasPoolKey
+	 * @param aMessageContext
+	 * @return
+	 * @throws Exception
+	 */
+	private CacheEntry deserializeCASandRegisterWithCache( String casReferenceId, Endpoint freeCasEndpoint, String shadowCasPoolKey, MessageContext aMessageContext)
+	throws Exception
+	{
 		long inTime = System.nanoTime();
 		boolean casRegistered = false;
-		boolean requestToFreeCasSent = false;
+
+		//	Fetch serialized CAS from the message
+		String xmi = aMessageContext.getStringMessage();
+		
+		//	Time how long we wait on Cas Pool to fetch a new CAS
+		long t1 = getController().getCpuTime();
+		// ************************************************************************* 
+		//	Fetch CAS from a Cas Pool. If the CAS came from a Cas Multiplier
+		//	fetch the CAS from a shadow CAS pool. Otherwise, fetch the CAS
+		//	from the service CAS Pool.
+		// ************************************************************************* 
+
+		CAS cas = getCAS(aMessageContext.propertyExists(AsynchAEMessage.CasSequence), shadowCasPoolKey, aMessageContext.getEndpoint().getEndpoint());
+		long timeWaitingForCAS = getController().getCpuTime() - t1;
+		//	Check if we are still running
+		if ( getController().isStopped() )
+		{
+			//	The Controller is in shutdown state. 
+			getController().dropCAS(cas);
+			return null;
+		}
+		// ************************************************************************* 
+		//	Deserialize the CAS from the message
+		// ************************************************************************* 
+	    t1 = getController().getCpuTime();
+		XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+		UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
+		long timeToDeserializeCAS = getController().getCpuTime() - t1;
+		getController().incrementDeserializationTime(timeToDeserializeCAS);
+		LongNumericStatistic statistic;
+		if ( (statistic = getController().getMonitor().getLongNumericStatistic("",Monitor.TotalDeserializeTime)) != null )
+		{
+			statistic.increment(timeToDeserializeCAS);
+		}
+		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+				"handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialize_cas_time_FINE",
+				new Object[] { timeToDeserializeCAS / 1000 });
+		// ************************************************************************* 
+		//	Register the CAS with a local cache
+		// ************************************************************************* 
+		CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, casReferenceId);
+
+		//	Update Stats
+		ServicePerformance casStats = getController().getCasStatistics(casReferenceId);
+		casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
+		if ( getController().isTopLevelComponent() )
+		{
+			synchronized( mux )
+			{
+				getController().getServicePerformance().incrementCasDeserializationTime(timeToDeserializeCAS);
+			}
+		}
+		getController().saveTime(inTime, casReferenceId,  getController().getName());
+		
+		if ( getController() instanceof AggregateAnalysisEngineController )
+		{
+			//	If the message came from a Cas Multiplier, associate the input/parent CAS id with this CAS
+			if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
+			{
+				//	Fetch parent CAS id
+				String inputCasReferenceId = 
+					aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
+				if ( shadowCasPoolKey != null )
+				{
+					//	Save the key of the Cas Multiplier in the cache. It will be now known which Cas Multiplier produced this CAS
+					entry.setCasMultiplierKey(shadowCasPoolKey);
+				}
+				//	associate this subordinate CAS with the parent CAS
+				entry.setInputCasReferenceId(inputCasReferenceId);
+				//	Save a Cas Multiplier endpoint where a Free CAS notification will be sent 
+				entry.setFreeCasEndpoint(freeCasEndpoint);
+				cacheStats( inputCasReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
+			}
+			else
+			{
+				cacheStats( casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
+			}
+			DelegateStats stats = new DelegateStats();
+			if ( entry.getStat() == null )
+			{
+				entry.setStat(stats);
+				//	Add entry for self (this aggregate). MessageContext.getEndpointName()
+				//	returns the name of the queue receiving the message.
+				stats.put(getController().getServiceEndpointName(), new TimerStats());
+			}
+			else
+			{
+				if (!stats.containsKey(getController().getServiceEndpointName()))
+				{
+					stats.put(getController().getServiceEndpointName(), new DelegateStats());
+				}
+			}
+		}
+		else
+		{
+			cacheStats( casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
+		}
+		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialized_cas_ready_to_process_FINE",
+                new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+		cacheProcessCommandInClientEndpoint();
+		return entry;
+	}
+	private String getCasReferenceId( MessageContext aMessageContext ) throws Exception
+	{
+		if ( !aMessageContext.propertyExists(AsynchAEMessage.CasReference) )
+		{
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+	                "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_message_has_cas_refid__INFO",
+	                new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+			
+			getController().
+				getOutputChannel().
+					sendReply(new InvalidMessageException("No Cas Reference Id Received From Delegate In message"), null, aMessageContext.getEndpoint(),AsynchAEMessage.Process);
+			return null;
+		}
+		return aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+	}
+	/**
+	 * Handles process request from a remote client
+	 * 
+	 * @param aMessageContext - contains a message from UIMA-AS Client
+	 * @throws AsynchAEException
+	 */
+	private void handleProcessRequestWithXMI(MessageContext aMessageContext) throws AsynchAEException
+	{
+		CacheEntry entry = null;
+		String casReferenceId = null;
+		//	Check if there is a cargo in the message
+		if ( aMessageContext.getStringMessage() == null )
+		{
+			return; // No XMI just return
+		}
+
 		try
 		{
-			boolean isNewCAS = false;
+
 			String newCASProducedBy = null;
-			String remoteCasReferenceId = null;
-			//	This is only used when handling CASes produced by CAS Multiplier
-			
 			//	Get the CAS Reference Id of the input CAS
-			casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+			//	Fetch id of the CAS from the message. If it doesnt exist the method will create an entry in the log file and return null
+			casReferenceId = getCasReferenceId(aMessageContext);
+			if ( casReferenceId == null )
+			{
+				return; // 	Invalid message. Nothing to do
+			}
 			//	Initially  make both equal
 			String inputCasReferenceId = casReferenceId;
-			//	CASes generated in a Cas Multiplier will have a CasSequence property set. If such property exists
-			//	it means that the CAS has been generated by a CM.
+			//	Destination where Free Cas Notification will be sent if the CAS came from a Cas Multiplier
+			Endpoint freeCasEndpoint = null;
+			//	CASes generated by a Cas Multiplier will have a CasSequence property set. 
 			if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
 			{
-				//	Remote CM?
-				if ( aMessageContext.getEndpoint().isRemote())
-				{
-					remoteCasReferenceId = casReferenceId;
-				}
-				//	Set the flag to indicate that the CAS been generated by CM
-				isNewCAS = true;
-				//	Fetch the actual input CAS Reference Id from which the CAS being processed was generated from
+				//	Fetch an ID of the parent CAS
 				inputCasReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
-				//	Fetch input CAS Cache entry
-				CacheEntry inputCasCacheEntry = getController().
-						getInProcessCache().
-							getCacheEntryForCAS(inputCasReferenceId);
-				//	This CAS came in from the CAS Multiplier. Treat it differently than the
-				//	input CAS. First, in case the Aggregate needs to send this CAS to the
+				//	Fetch Cache entry for the parent CAS
+				CacheEntry inputCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(inputCasReferenceId);
+
+				computeStats(aMessageContext, inputCasReferenceId);
+
+				
+				// Fetch an endpoint where Free CAS Notification must be sent.
+				// This endpoint is unique per CM instance. Meaning, each 
+				//	instance of CM will have an endpoint where it expects Free CAS
+				// notifications.
+				freeCasEndpoint = aMessageContext.getEndpoint();
+				//	Clone an endpoint where Free Cas Request will be sent
+				freeCasEndpoint = (Endpoint)((Endpoint_impl)freeCasEndpoint).clone();
+				//	Reset the destination
+				aMessageContext.getEndpoint().setDestination(null);
+				//	This CAS came in from a CAS Multiplier. Treat it differently than the
+				//	input CAS. In case the Aggregate needs to send this CAS to the
 				//	client, retrieve the client destination by looking up the client endpoint
 				//	using input CAS reference id. CASes generated by the CAS multiplier will have 
 				//	the same Cas Reference id.
 				Endpoint replyToEndpoint = inputCasCacheEntry.getMessageOrigin();
-				if ( getController() instanceof AggregateAnalysisEngineController )
-				{
-					newCASProducedBy = inputCasCacheEntry.getCasMultiplierKey();
-					//	increment number of subordinate CASes that are currently being processed
-					//	The input CAS (parent) will be held by the aggregate until all of its
-					//	subordinate CASes are fully processed. Only then, the aggregate can return
-					//	it back to the client
-					synchronized( inputCasCacheEntry )
-					{
-						inputCasCacheEntry.incrementSubordinateCasInPlayCount();
-					}
-					if ( ((AggregateAnalysisEngineController)getController()).sendRequestToReleaseCas() )
-					{
-						try
-						{
-							//	Change the name of the queue where the request to free a CAS will be sent.
-							aMessageContext.getEndpoint().setEndpoint(aMessageContext.getEndpoint().getEndpoint()+"CasSync");
-
-							getController().getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, casReferenceId, aMessageContext.getEndpoint());
-							requestToFreeCasSent = true;
-						}
-						catch( Exception e){}
-					}
-				}
-				//	MessageContext contains endpoint set by the CAS Multiplier service. Overwrite
-				//	this with the endpoint of the client who sent the input CAS. In case this 
-				//	aggregate is configured to send new CASes to the client we know where to send them.
+				// The message context contains a Cas Multiplier endpoint. Since
+				// we dont want to send a generated CAS back to the CM, override 
+				//	with an endpoint provided by the client of
+				//	this service. Client endpoint is attached to an input Cas cache entry.
 				aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
 				aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
-				//	Set this to null so that the new CAS gets its own Cas Reference Id below
-				casReferenceId = null;
-			}
-			else if ( getController().isTopLevelComponent())
-			{
-				Endpoint replyToEndpoint = aMessageContext.getEndpoint(); 
 
-				if ( getController() instanceof AggregateAnalysisEngineController )
-				{
-					((AggregateAnalysisEngineController)getController()).addMessageOrigin(casReferenceId, replyToEndpoint);
+				// Before sending a CAS to Cas Multiplier, the aggregate has
+				// saved the CM key in the CAS cache entry. Fetch the key
+				// of the CM so that we can ask the right Shadow Cas Pool for
+				// a new CAS. Every Shadow Cas Pool has a unique id which
+				// corresponds to a Cas Multiplier key.
+				newCASProducedBy = inputCasCacheEntry.getCasMultiplierKey();
+				if (  getController() instanceof AggregateAnalysisEngineController )
+				{
+					Endpoint casMultiplierEndpoint =	
+						((AggregateAnalysisEngineController)getController()).lookUpEndpoint(newCASProducedBy, false);
+					if ( casMultiplierEndpoint != null )
+					{
+						//	Save the URL of the broker managing the Free Cas Notification queue.
+						//	This is needed when we try to establish a connection to the broker.
+						freeCasEndpoint.setServerURI(casMultiplierEndpoint.getServerURI());
+					}
+				}
+				// increment number of CASes produced from an input CAS
+				// The input CAS (parent) will be held by 
+				//	the aggregate until all of its subordinate CASes are 
+				//	fully processed. Only then, the aggregate can return
+				// it back to the client
+				synchronized (inputCasCacheEntry) {
+					inputCasCacheEntry.incrementSubordinateCasInPlayCount();
 				}
-
+			}
+			else if ( getController().isTopLevelComponent() && getController() instanceof AggregateAnalysisEngineController )
+			{
+				((AggregateAnalysisEngineController)getController()).addMessageOrigin(casReferenceId, aMessageContext.getEndpoint());
 			}
 			//	To prevent processing multiple messages with the same CasReferenceId, check the CAS cache
 			//	to see if the message with a given CasReferenceId is already being processed. It is, the
@@ -151,169 +351,23 @@
 			//	CasReferenceId. When the service finally comes back up, it will have multiple messages in
 			//	its queue possibly from the same client. Only the first message for any given CasReferenceId
 			//	should be processed. 
-			if ( casReferenceId == null || !getController().getInProcessCache().entryExists(casReferenceId) )
+			if ( !getController().getInProcessCache().entryExists(casReferenceId) )
 			{
-				String xmi = aMessageContext.getStringMessage();
-				
-				//	*****************************************************************
-				// ***** NO XMI In Message. Kick this back to sender with exception
-				//	*****************************************************************
-				if ( xmi == null )
-				{
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-			                "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_message_has_no_cargo__INFO",
-			                new Object[] { aMessageContext.getEndpoint().getEndpoint() });
-					getController().
-						getOutputChannel().
-							sendReply(new InvalidMessageException("No XMI data in message"), casReferenceId, aMessageContext.getEndpoint(),AsynchAEMessage.Process);
-					//	Dont process this empty message
-					return;
-				}
-				
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
-		                "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas__FINEST",
-		                new Object[] { aMessageContext.getEndpoint().getEndpoint() });
-			    long t1 = System.nanoTime();
-			    if ( isNewCAS )
-			    {
-					cas = getController().getCasManagerWrapper().getNewCas(newCASProducedBy);
-			    }
-			    else
-			    {
-				    cas = getController().getCasManagerWrapper().getNewCas();
-			    }
-				long timeWaitingForCAS = System.nanoTime() - t1;
-	
-				if ( getController().isStopped() )
-				{
-					//	The Controller is in shutdown state. 
-					getController().dropCAS(cas);
-					return;
-				}
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-		                "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted__FINE",
-		                new Object[] { aMessageContext.getEndpoint().getEndpoint() });
-			    t1 = System.nanoTime();
-				XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
-				UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
-				
-				long timeToDeserializeCAS = System.nanoTime() - t1;
-				LongNumericStatistic statistic;
-				if ( (statistic = getController().getMonitor().getLongNumericStatistic("",Monitor.TotalDeserializeTime)) != null )
-				{
-					statistic.increment(timeToDeserializeCAS);
-				}
-
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-						"handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialize_cas_time_FINE",
-						new Object[] { timeToDeserializeCAS / 1000 });
-        		ServicePerformance casStats = null;
-        
-        
-				if (casReferenceId == null)
-				{
-					if (getController() instanceof PrimitiveAnalysisEngineController)
-					{
-						inputCasReferenceId = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData);
-					}
-					else
-					{
-						casReferenceId = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData);
-						if ( inputCasReferenceId == null )
-						{
-							inputCasReferenceId = casReferenceId;
-						}
-					}
-					casStats = getController().getCasStatistics(inputCasReferenceId);
-
-				}
-				else
-				{
-					getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, casReferenceId);
-/*
-					if ( aMessageContext.propertyExists(AsynchAEMessage.InputCasReference))
-					{
-						CacheEntry cacheEntry = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId);
-						String parentCasId = aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
-						cacheEntry.setInputCasReferenceId(parentCasId);
-					}
-*/					
-					casStats = getController().getCasStatistics(casReferenceId);
-				}
-				casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
-				if ( getController().isTopLevelComponent() )
-				{
-					getController().getServicePerformance().incrementCasDeserializationTime(timeToDeserializeCAS);
-				}
-				//	Set a local flag to indicate that the CAS has been added to the cache. This will be usefull when handling an exception
-				//	If an exception happens before the CAS is added to the cache, the CAS needs to be dropped immediately.
-				casRegistered = true;
-				if ( casReferenceId == null )
-				{
-					getController().saveTime(inTime, inputCasReferenceId,  getController().getName());
-				}
-				else
-				{
-					getController().saveTime(inTime, casReferenceId,  getController().getName());
-				}
-				
-				CacheEntry entry = null;
-				if ( getController() instanceof AggregateAnalysisEngineController )
-				{
-					entry = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId);
-					if ( isNewCAS )
-					{
-						if ( newCASProducedBy != null )
-						{
-							entry.setCasMultiplierKey(newCASProducedBy);
-						}
-						if ( remoteCasReferenceId != null )
-						{
-							entry.setRemoteCMCasReferenceId(remoteCasReferenceId);
-						}
-						if ( requestToFreeCasSent )
-						{
-							entry.setSendRequestToFreeCas(false);
-						}
-						//	associate this subordinate CAS with the parent CAS
-						entry.setInputCasReferenceId(inputCasReferenceId);
-						entry.setReplyReceived();
-					}
-					DelegateStats stats = new DelegateStats();
-					if ( entry.getStat() == null )
-					{
-						entry.setStat(stats);
-						//	Add entry for self (this aggregate). MessageContext.getEndpointName()
-						//	returns the name of the queue receiving the message.
-						stats.put(getController().getServiceEndpointName(), new TimerStats());
-					}
-					else
-					{
-						if (!stats.containsKey(getController().getServiceEndpointName()))
-						{
-							stats.put(getController().getServiceEndpointName(), new DelegateStats());
-						}
-					}
-				}
-				
-				cacheStats( inputCasReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-		                "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialized_cas_ready_to_process_FINE",
-		                new Object[] { aMessageContext.getEndpoint().getEndpoint() });
-
-				
-				cacheProcessCommandInClientEndpoint();
-				
-				if ( getController().isStopped() )
+				entry = deserializeCASandRegisterWithCache( casReferenceId, freeCasEndpoint, newCASProducedBy, aMessageContext);
+				if ( getController().isStopped() || entry == null || entry.getCas() == null)
 				{
 					if ( entry != null )
 					{
-						//	The Controller is in shutdown state. 
+						//	The Controller is in shutdown state, release the CAS
 						getController().dropCAS( entry.getCasReferenceId(), true);
-						return;
+						entry = null;
 					}
+					return;
 				}
-				invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
+				// *****************************************************************
+				//	 Process the CAS
+				// *****************************************************************
+				invokeProcess(entry.getCas(), inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
 			}
 			else
 			{
@@ -321,7 +375,6 @@
 		                "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_duplicate_request__INFO",
 		                new Object[] { casReferenceId});
 			}
-        	
 		}
 		catch ( Exception e)
 		{
@@ -333,31 +386,122 @@
 			errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
 			errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
 			errorContext.add(AsynchAEMessage.CasReference, casReferenceId );
-			if ( casRegistered == false )
+			if ( entry != null )
 			{
-				getController().dropCAS(cas);
+				getController().dropCAS(entry.getCas());
 			}
 			getController().getErrorHandlerChain().handle(e, errorContext, getController());
 		}
 
 	}
-	private void handleProcessRequestWithXCAS(MessageContext aMessageContext) throws AsynchAEException
+	private void handleProcessRequestWithCASReference(MessageContext aMessageContext) throws AsynchAEException
 	{
-		
+		boolean isNewCAS = false;
+		String newCASProducedBy = null;
+
+
 		try
 		{
-			boolean isNewCAS = false;
-			String newCASProducedBy = null;
-			
 			//	This is only used when handling CASes produced by CAS Multiplier
+			String inputCasReferenceId = null;
+			CAS cas = null;
+			String casReferenceId = getCasReferenceId(aMessageContext);
+			if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
+			{
+				isNewCAS = true;
+				Endpoint casMultiplierEndpoint = aMessageContext.getEndpoint();
+
+				if ( casMultiplierEndpoint == null )
+				{
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+			                "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO",
+			                new Object[] { casReferenceId });
+					return;
+				}
+				//	
+				if ( getController() instanceof AggregateAnalysisEngineController )
+				{
+					getController().getInProcessCache().setCasProducer(casReferenceId, casMultiplierEndpoint.getEndpoint());
+					newCASProducedBy = ((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(casMultiplierEndpoint.getEndpoint());
+					casMultiplierEndpoint.setIsCasMultiplier(true);
+//					((AggregateAnalysisEngineController)getController()).getServicePerformance(newCASProducedBy).incrementNumberOfCASesProcessed();
+					CacheEntry subordinateCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId);
+					CacheEntry inputCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(subordinateCasCacheEntry.getInputCasReferenceId());
+					if ( inputCasCacheEntry != null )
+					{
+						synchronized( inputCasCacheEntry )
+						{
+							inputCasCacheEntry.incrementSubordinateCasInPlayCount();
+						}
+					}
+				}
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+		                "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_new_cas__FINE",
+		                new Object[] { casReferenceId, newCASProducedBy });
+
+				aMessageContext.getEndpoint().setEndpoint(casMultiplierEndpoint.getEndpoint());
+				aMessageContext.getEndpoint().setServerURI(casMultiplierEndpoint.getServerURI());
+				inputCasReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
+			}
+			else
+			{
+				if ( getController() instanceof AggregateAnalysisEngineController )
+				{
+					((AggregateAnalysisEngineController)getController()).addMessageOrigin(casReferenceId, aMessageContext.getEndpoint());
+				}
+
+			}
+			cas = getController().getInProcessCache().getCasByReference(casReferenceId);
 			
+			long arrivalTime = System.nanoTime();
+			getController().saveTime(arrivalTime, casReferenceId, getController().getName());//aMessageContext.getEndpointName());
+
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+		                "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_analyzing_cas__FINE",
+		                new Object[] { casReferenceId});
+
+			//	Save Process command in the client endpoint.
+			cacheProcessCommandInClientEndpoint();
+
+			if ( getController().isStopped() )
+			{
+				return;
+			}
+	
+			if ( isNewCAS )
+			{
+				invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
+			}
+			else
+			{
+				invokeProcess(cas, casReferenceId, null,  aMessageContext, newCASProducedBy);
+			}
+		}
+		catch ( AsynchAEException e)
+		{
+			throw e;
+		}
+		catch ( Exception e)
+		{
+			throw new AsynchAEException(e);
+		}
+
+	}
+	
+	
+	private void handleProcessRequestWithXCAS(MessageContext aMessageContext) throws AsynchAEException
+	{
+		
+		try
+		{
 			//	Get the CAS Reference Id of the input CAS
-			String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+			String casReferenceId = getCasReferenceId(aMessageContext);
 			String inputCasReferenceId = casReferenceId;
+			//	This is only used when handling CASes produced by CAS Multiplier
+			String newCASProducedBy = null;
 
 			if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
 			{
-				isNewCAS = true;
 				//	This CAS came in from the CAS Multiplier. Treat it differently than the
 				//	input CAS. First, in case the Aggregate needs to send this CAS to the
 				//	client, retrieve the client destination by looking up the client endpoint
@@ -430,7 +574,8 @@
 				
 				if (casReferenceId == null)
 				{
-					casReferenceId = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData);
+					CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData);
+					casReferenceId = entry.getCasReferenceId();
 				}
 				else
 				{
@@ -466,138 +611,6 @@
 
 	}
 
-	private void handleProcessRequestWithCASReference(MessageContext aMessageContext) throws AsynchAEException
-	{
-		boolean isNewCAS = false;
-		String newCASProducedBy = null;
-
-		if ( getController().isStopped() )
-		{
-			return;
-		}
-
-		try
-		{
-			String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
-
-			if ( casReferenceId == null )
-			{
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-		                "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_message_has_cas_refid__INFO",
-		                new Object[] { aMessageContext.getEndpoint().getEndpoint() });
-				
-				getController().
-					getOutputChannel().
-						sendReply(new InvalidMessageException("No Cas Reference Id Received From Delegate In message"), null, aMessageContext.getEndpoint(),AsynchAEMessage.Process);
-				//	Dont process this empty message
-				return;
-
-			}
-//			Endpoint replyToEndpoint = aMessageContext.getEndpoint(); 
-			
-			//	This is only used when handling CASes produced by CAS Multiplier
-			String inputCasReferenceId = null;
-			CAS cas = null;
-			
-			if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
-			{
-				isNewCAS = true;
-				Endpoint casMultiplierEndpoint = aMessageContext.getEndpoint();
-
-				if ( casMultiplierEndpoint == null )
-				{
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
-			                "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO",
-			                new Object[] { casReferenceId });
-					return;
-				}
-				//	
-				if ( getController() instanceof AggregateAnalysisEngineController )
-				{
-					getController().getInProcessCache().setCasProducer(casReferenceId, casMultiplierEndpoint.getEndpoint());
-					newCASProducedBy = 
-						((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(casMultiplierEndpoint.getEndpoint());
-					casMultiplierEndpoint.setIsCasMultiplier(true);
-					((AggregateAnalysisEngineController)getController()).
-						getServicePerformance(newCASProducedBy).
-							incrementNumberOfCASesProcessed();
-					CacheEntry subordinateCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId);
-					subordinateCasCacheEntry.setReplyReceived();
-
-					CacheEntry inputCasCacheEntry = getController().
-						getInProcessCache().
-							getCacheEntryForCAS(subordinateCasCacheEntry.getInputCasReferenceId());
-					if ( inputCasCacheEntry != null )
-					{
-						synchronized( inputCasCacheEntry )
-						{
-//							System.out.println("++++++++ Incrementing Delegate:"+casReferenceId+" Count For Input Cas::"+subordinateCasCacheEntry.getInputCasReferenceId());						
-							inputCasCacheEntry.incrementSubordinateCasInPlayCount();
-						}
-					}
-				}
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-		                "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_new_cas__FINE",
-		                new Object[] { casReferenceId, newCASProducedBy });
-				//	MessageContext contains endpoint set by the CAS Multiplier service. Overwrite
-				//	this with the endpoint of the client who sent the input CAS. In case this 
-				//	aggregate is configured to send new CASes to the client we know where to send them.
-//				if ( aMessageContext.getEndpoint() != null )
-//				{
-//					aMessageContext.getEndpoint().setEndpoint(casMultiplierEndpoint.getEndpoint());
-//					aMessageContext.getEndpoint().setServerURI(casMultiplierEndpoint.getServerURI());
-//				}
-				aMessageContext.getEndpoint().setEndpoint(casMultiplierEndpoint.getEndpoint());
-				aMessageContext.getEndpoint().setServerURI(casMultiplierEndpoint.getServerURI());
-
-				inputCasReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
-			}
-			else
-			{
-				if ( getController() instanceof AggregateAnalysisEngineController )
-				{
-					((AggregateAnalysisEngineController)getController()).addMessageOrigin(casReferenceId, aMessageContext.getEndpoint());
-				}
-
-			}
-			cas = getController().getInProcessCache().getCasByReference(casReferenceId);
-			
-			long arrivalTime = System.nanoTime();
-			getController().saveTime(arrivalTime, casReferenceId, getController().getName());//aMessageContext.getEndpointName());
-
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-		                "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_analyzing_cas__FINE",
-		                new Object[] { casReferenceId});
-
-			//	Save Process command in the client endpoint.
-			cacheProcessCommandInClientEndpoint();
-
-			if ( getController().isStopped() )
-			{
-				return;
-			}
-	
-			if ( isNewCAS )
-			{
-				invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
-			}
-			else
-			{
-				invokeProcess(cas, casReferenceId, null,  aMessageContext, newCASProducedBy);
-			}
-		}
-		catch ( AsynchAEException e)
-		{
-			throw e;
-		}
-		catch ( Exception e)
-		{
-			throw new AsynchAEException(e);
-		}
-
-	}
-	
-	
 	
 	private void cacheProcessCommandInClientEndpoint()
 	{
@@ -615,32 +628,13 @@
 		getController().collectionProcessComplete(replyToEndpoint);
 	}
 
-	private void handleReleaseCASRequest(MessageContext aMessageContext)
+	private void handleReleaseCASRequest(MessageContext aMessageContext) throws AsynchAEException
 	{
-		
-		if ( getController() instanceof PrimitiveAnalysisEngineController )
-		{
-			( (PrimitiveAnalysisEngineController)getController()).releaseNextCas(); 
-		}
-/*
-		try
-		{
-			String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-	                "handleReleaseCASRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_release_cas_req__FINE",
-	                new Object[] { getController().getName(), casReferenceId });
-System.out.println(getController().getName()+" ::::::: Processing Release CAS Request:"+casReferenceId);	
-
-			if ( casReferenceId != null && getController().getInProcessCache().entryExists(casReferenceId))
-			{
-				getController().dropCAS(casReferenceId, true);
-			}
-		}
-		catch( Exception e)
-		{
-			getController().getErrorHandlerChain().handle(e, HandlerBase.populateErrorContext( (MessageContext)aMessageContext ), getController());			
-		}
-*/		
+		String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+                "handleReleaseCASRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_release_cas_req__FINE",
+                new Object[] { getController().getName(), casReferenceId });
+		getController().releaseNextCas(casReferenceId); 
 	}
 	
 	private void handleStopRequest(MessageContext aMessageContext)
@@ -679,23 +673,43 @@
 
 				getController().getControllerLatch().waitUntilInitialized();
 
-        // If a Process Request, increment number of docs processed
-        if (messageContext.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request
-                && command == AsynchAEMessage.Process) {
-          // Increment number of CASes processed by this service
-          getController().getServicePerformance().incrementNumberOfCASesProcessed();
-        }
+				// If a Process Request, increment number of CASes processed
+				if (messageContext.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request
+						&& command == AsynchAEMessage.Process &&!messageContext.propertyExists(AsynchAEMessage.CasSequence)) {
+					// Increment number of CASes processed by this service
+					getController().getServicePerformance().incrementNumberOfCASesProcessed();
+				}
+				if ( getController().isStopped() )
+				{
+					return;
+				}
 				
 				if (AsynchAEMessage.CASRefID == payload)
 				{
+					//	Fetch id of the CAS from the message. 
+					if ( getCasReferenceId(messageContext) == null )
+					{
+						return; // 	Invalid message. Nothing to do
+					}
+
 					handleProcessRequestWithCASReference(messageContext);
 				}
 				else if (AsynchAEMessage.XMIPayload == payload)
 				{
+					//	Fetch id of the CAS from the message. 
+					if ( getCasReferenceId(messageContext) == null )
+					{
+						return; // 	Invalid message. Nothing to do
+					}
 					handleProcessRequestWithXMI(messageContext);
 				}
 				else if (AsynchAEMessage.XCASPayload == payload)
 				{
+					//	Fetch id of the CAS from the message. 
+					if ( getCasReferenceId(messageContext) == null )
+					{
+						return; // 	Invalid message. Nothing to do
+					}
 					handleProcessRequestWithXCAS(messageContext);
 				}
 				else if ( AsynchAEMessage.None == payload && AsynchAEMessage.CollectionProcessComplete == command)
@@ -718,6 +732,7 @@
 		}
 		catch( Exception e)
 		{
+			e.printStackTrace();
 			getController().getErrorHandlerChain().handle(e, HandlerBase.populateErrorContext( (MessageContext)anObjectToHandle ), getController());			
 		}
 	}

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java Fri Aug 22 11:53:05 2008
@@ -40,6 +40,7 @@
 import org.apache.uima.aae.jmx.ServicePerformance;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UIMAMessage;
 import org.apache.uima.aae.monitor.Monitor;
 import org.apache.uima.aae.monitor.statistics.DelegateStats;
 import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
@@ -59,162 +60,6 @@
 		super(aName);
 	}
 
-	private void aggregateDelegateStats(MessageContext aMessageContext, String aCasReferenceId) throws AsynchAEException
-	{
-		String delegateKey = "";
-		try
-		{
-			
-			delegateKey = ((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
-			CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
-			if ( entry == null )
-			{
-				throw new AsynchAEException("CasReferenceId:"+aCasReferenceId+" Not Found in the Cache.");
-			}
-			CacheEntry inputCasEntry = null;
-			String inputCasReferenceId = entry.getInputCasReferenceId();
-			ServicePerformance casStats = 
-				((AggregateAnalysisEngineController)getController()).getCasStatistics(aCasReferenceId);
-			if ( inputCasReferenceId != null && 
-				 getController().getInProcessCache().entryExists(inputCasReferenceId) )
-			{
-				String casProducerKey = entry.getCasProducerKey();
-				if ( casProducerKey != null &&
-					((AggregateAnalysisEngineController)getController()).
-						isDelegateKeyValid(casProducerKey) )
-				{
-					//	Get entry for the input CAS
-					inputCasEntry = getController().
-								getInProcessCache().
-									getCacheEntryForCAS(inputCasReferenceId);
-				}
-				
-			}
-			ServicePerformance delegateServicePerformance =
-				((AggregateAnalysisEngineController)getController()).getServicePerformance(delegateKey);
-
-			if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS))
-			{
-				long timeToSerializeCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
-				if ( timeToSerializeCAS > 0)
-				{
-					casStats.incrementCasSerializationTime(timeToSerializeCAS);
-					if ( delegateServicePerformance != null )
-					{
-						delegateServicePerformance.
-						incrementCasSerializationTime(timeToSerializeCAS);
-					}
-					getController().getServicePerformance().
-						incrementCasSerializationTime(timeToSerializeCAS);
-				}
-			}
-			if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS))
-			{
-				long timeToDeserializeCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
-				if ( timeToDeserializeCAS > 0 )
-				{
-					casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
-
-					if ( delegateServicePerformance != null )
-					{
-						delegateServicePerformance.
-							incrementCasDeserializationTime(timeToDeserializeCAS);
-					}
-					getController().getServicePerformance().
-						incrementCasDeserializationTime(timeToDeserializeCAS);
-				}
-			}
-
-			if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime))
-			{
-				long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime)).longValue();
-				if ( idleTime > 0 )
-				{
-					casStats.incrementIdleTime(idleTime);
-					if ( delegateServicePerformance != null )
-					{
-						delegateServicePerformance.
-							incrementIdleTime(idleTime);
-					}
-				}
-			}
-			
-			if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS))
-			{
-				long timeWaitingForCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
-				if ( aMessageContext.getEndpoint().isRemote())
-				{
-					entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
-					if ( inputCasEntry != null )
-					{
-						inputCasEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
-					}
-				}
-			}
-			if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS))
-			{
-				long timeInProcessCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
-				casStats.incrementAnalysisTime(timeInProcessCAS);
-				if ( delegateServicePerformance != null )
-				{
-					delegateServicePerformance.
-						incrementAnalysisTime(timeInProcessCAS);
-				}
-				//	Accumulate processing time
-				getController().getServicePerformance().
-					incrementAnalysisTime(timeInProcessCAS);
-				if ( inputCasReferenceId != null )
-				{
-					ServicePerformance inputCasStats = 
-						((AggregateAnalysisEngineController)getController()).
-							getCasStatistics(inputCasReferenceId);
-					// Update processing time for this CAS
-					if ( inputCasStats != null )
-					{
-						inputCasStats.incrementAnalysisTime(timeInProcessCAS);
-					}
-				}
-			}
-		}
-		catch( AsynchAEException e)
-		{
-			throw e;
-		}
-		catch( Exception e)
-		{
-			throw new AsynchAEException(e);
-		}
-	}
-	private void computeStats(MessageContext aMessageContext, String aCasReferenceId) throws AsynchAEException
-	{
-		if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService))
-		{
-			long departureTime = getController().getTime(aCasReferenceId, aMessageContext.getEndpoint().getEndpoint());
-			long currentTime = System.nanoTime();
-			long roundTrip = currentTime - departureTime;
-			long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
-			long totalTimeInComms = currentTime - (departureTime - timeInService);
-
-			
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-	                "computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_roundtrip_time__FINE",
-	                new Object[] { aCasReferenceId, aMessageContext.getEndpoint(),(double) roundTrip / (double) 1000000 });
-
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-	                "computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_delegate__FINE",
-	                new Object[] { aCasReferenceId, (double) timeInService / (double) 1000000, aMessageContext.getEndpoint() });
-
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-	                "computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_comms__FINE",
-	                new Object[] { aCasReferenceId, (double) totalTimeInComms / (double) 1000000, aMessageContext.getEndpoint() });
-		}
-		
-			if ( getController() instanceof AggregateAnalysisEngineController )
-			{
-				aggregateDelegateStats( aMessageContext, aCasReferenceId );
-			}			
-	}
-
 	private Endpoint lookupEndpoint(String anEndpointName, String aCasReferenceId)
 	{
 		return getController().getInProcessCache().getEndpoint(anEndpointName, aCasReferenceId);
@@ -367,39 +212,45 @@
 		                "handleProcessResponseWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_rcvd_reply_FINEST",
 		                new Object[] { aMessageContext.getEndpoint().getEndpoint(), casReferenceId, xmi });
 			}
-			
-			long t1 = System.nanoTime();
-			
-			synchronized (monitor)
-			{
-				XmiSerializationSharedData deserSharedData;
-				if (totalNumberOfParallelDelegatesProcessingCas > 1 && cacheEntry.howManyDelegatesResponded() > 0)
-				{
-          // process secondary reply from a parallel step
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+			long t1 = getController().getCpuTime();
+			/* --------------------- */
+			/** DESERIALIZE THE CAS. */ 
+			/* --------------------- */
+			
+			//	check if the CAS is part of the Parallel Step
+			if (totalNumberOfParallelDelegatesProcessingCas > 1 )
+			{
+				//	Synchronized because replies are merged into the same CAS.
+				synchronized (monitor)
+				{
+					//	Check if this a secondary reply in a parallel step. If it is the first reply, deserialize the CAS
+					//	using a standard approach. There is no merge to be done yet. Otherwise, we need to
+					//	merge the CAS with previous results.
+					if ( cacheEntry.howManyDelegatesResponded() > 0 )
+					{
+						// process secondary reply from a parallel step
+						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
 			                "handleProcessResponseWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_delegate_responded_count_FINEST",
 			                new Object[] { cacheEntry.howManyDelegatesResponded(), casReferenceId});
-					
-					int highWaterMark = cacheEntry.getHighWaterMark();
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+				
+						int highWaterMark = cacheEntry.getHighWaterMark();
+						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
 			                "handleProcessResponseWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_high_water_mark_FINEST",
 			                new Object[] { highWaterMark, casReferenceId });
-
-          deserSharedData = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId).getDeserSharedData();
-					UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, highWaterMark);
-				}
-        else // general case, or first reply from a parallel step
-				{
-					//	Processing the reply from a standard, non-parallel delegate
-          deserSharedData = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId).getDeserSharedData();
-          if (deserSharedData == null) {
-            deserSharedData = new XmiSerializationSharedData();
-            getController().getInProcessCache().getCacheEntryForCAS(casReferenceId).setXmiSerializationData(deserSharedData);
-          }
-          UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
+						deserialize( xmi, cas, casReferenceId, highWaterMark);
+					}
+					else
+					{
+						//	first reply from a parallel step
+						deserialize(xmi, cas, casReferenceId);
+					}
 				}
 			}
-
+			else // general case
+			{
+				//	Processing a reply from a standard, non-parallel delegate
+				deserialize(xmi, cas, casReferenceId);
+			}
 			
 			if ( cacheEntry != null && totalNumberOfParallelDelegatesProcessingCas > 1 )
 			{
@@ -409,7 +260,7 @@
 				}
 			}
 		
-			long timeToDeserializeCAS = System.nanoTime() - t1;
+			long timeToDeserializeCAS = getController().getCpuTime() - t1;
 
             getController().
             	getServicePerformance().
@@ -455,6 +306,23 @@
 		}
 
 	}
+	private void deserialize( String xmi, CAS cas, String casReferenceId, int highWaterMark ) throws Exception
+	{
+		XmiSerializationSharedData deserSharedData;
+		deserSharedData = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId).getDeserSharedData();
+		UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, highWaterMark);
+	}
+	private void deserialize( String xmi, CAS cas, String casReferenceId ) throws Exception
+	{
+		//	Processing the reply from a standard, non-parallel delegate
+		XmiSerializationSharedData deserSharedData;
+		deserSharedData = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId).getDeserSharedData();
+		if (deserSharedData == null) {
+			deserSharedData = new XmiSerializationSharedData();
+			getController().getInProcessCache().getCacheEntryForCAS(casReferenceId).setXmiSerializationData(deserSharedData);
+		}
+		UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
+	}
 	private void handleProcessResponseWithCASReference(MessageContext aMessageContext )
 	{
 		String casReferenceId = null;
@@ -472,11 +340,12 @@
 			ServicePerformance delegateServicePerformance = 
 				((AggregateAnalysisEngineController)getController()).
 					getServicePerformance(delegateKey);
+			/*
 			if ( delegateServicePerformance != null )
 			{
 				delegateServicePerformance.incrementNumberOfCASesProcessed();
 			}
-
+*/
 			//CAS cas = getController().getInProcessCache().getCasByReference(casReferenceId);
 			if (cas != null)
 			{
@@ -576,7 +445,7 @@
 		}
 		return true;
 	}
-	private void handleProcessResponseWithException(MessageContext aMessageContext)
+	private void handleProcessResponseWithException(MessageContext aMessageContext, String delegateKey)
 	{
 		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                 "handleProcessResponseWithException", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_handling_exception_from_delegate_FINE",
@@ -617,7 +486,7 @@
 			{
 				isCpCError = true;
 				((AggregateAnalysisEngineController)getController()).
-					processCollectionCompleteReplyFromDelegate(aMessageContext.getEndpoint().getEndpoint(), false);
+					processCollectionCompleteReplyFromDelegate(delegateKey, false);
 			}
 			else
 			{
@@ -653,11 +522,10 @@
 
 	}
 
-	private void handleCollectionProcessCompleteReply(MessageContext aMessageContext)
+	private void handleCollectionProcessCompleteReply(MessageContext aMessageContext, String delegateKey)
 	{
 		try
 		{
-			String delegateKey = ((Endpoint)aMessageContext.getEndpoint()).getEndpoint();
 			if ( getController() instanceof AggregateAnalysisEngineController )
 			{
 				((AggregateAnalysisEngineController) getController())
@@ -692,9 +560,24 @@
 			int command = messageContext.getMessageIntProperty(AsynchAEMessage.Command);
 			String delegate = ((Endpoint)messageContext.getEndpoint()).getEndpoint();
 			String key = null;
+			String fromServer = null;
 			if ( getController() instanceof AggregateAnalysisEngineController )
 			{
-				key = ((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(delegate);
+				if ( ((Endpoint)messageContext.getEndpoint()).isRemote() )
+				{
+					if ( ((MessageContext)anObjectToHandle).propertyExists(AsynchAEMessage.EndpointServer))
+					{
+						
+						fromServer =((MessageContext)anObjectToHandle).getMessageStringProperty(AsynchAEMessage.EndpointServer); 
+
+					}
+					else if ( ((MessageContext)anObjectToHandle).propertyExists(UIMAMessage.ServerURI)) 
+					{
+						fromServer = ((MessageContext)anObjectToHandle).getMessageStringProperty(UIMAMessage.ServerURI);
+					}
+				}
+
+				key = ((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(delegate, fromServer);
 			}
 			if (AsynchAEMessage.CASRefID == payload)
 			{
@@ -715,11 +598,19 @@
 			}
 			else if (AsynchAEMessage.Exception == payload)
 			{
-				handleProcessResponseWithException(messageContext);
+				if ( key == null )
+				{
+					key = ((Endpoint)messageContext.getEndpoint()).getEndpoint();
+				}
+				handleProcessResponseWithException(messageContext, key);
 			}
 			else if (AsynchAEMessage.None == payload && AsynchAEMessage.CollectionProcessComplete == command)
 			{
-				handleCollectionProcessCompleteReply(messageContext);
+				if ( key == null )
+				{
+					key = ((Endpoint)messageContext.getEndpoint()).getEndpoint();
+				}
+				handleCollectionProcessCompleteReply(messageContext, key);
 			}
 			else if (AsynchAEMessage.None == payload && AsynchAEMessage.ACK == command)
 			{

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/AggregateServiceInfo.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/AggregateServiceInfo.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/AggregateServiceInfo.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/AggregateServiceInfo.java Fri Aug 22 11:53:05 2008
@@ -26,4 +26,14 @@
 	/**
 	 * 
 	 */
+	
+	public AggregateServiceInfo()
+	{
+		super(false);
+	}
+	
+	public AggregateServiceInfo( boolean isaCasMultiplier )
+	{
+		super(isaCasMultiplier);
+	}
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/PrimitiveServiceInfo.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/PrimitiveServiceInfo.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/PrimitiveServiceInfo.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/PrimitiveServiceInfo.java Fri Aug 22 11:53:05 2008
@@ -30,7 +30,16 @@
 	/**
 	 * 
 	 */
+	public PrimitiveServiceInfo()
+	{
+		super(false);
+	}
 	
+	public PrimitiveServiceInfo( boolean isaCasMultiplier )
+	{
+		super(isaCasMultiplier);
+	}
+
 	private int instanceCount = 0;
 	
 	public int getAnalysisEngineInstanceCount()

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfo.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfo.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfo.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfo.java Fri Aug 22 11:53:05 2008
@@ -28,9 +28,23 @@
 	private static final String label="Service Info";
 	private String brokerURL="";
 	private String inputQueueName="";
+	private String replyQueueName="";
 	private String state="";
 	private String[] deploymentDescriptor= new String[] {""};
+	private boolean casMultiplier;
+	private boolean topLevel;
+	private String serviceKey;
+	private boolean aggregate;
 	
+	public ServiceInfo()
+	{
+		this(false);
+	}
+	
+	public ServiceInfo( boolean isaCasMultiplier )
+	{
+		casMultiplier = isaCasMultiplier;
+	}
 	public String getLabel()
 	{
 		return label;
@@ -68,6 +82,47 @@
 	{
 		state = aState;
 	}
+	public boolean isCASMultiplier()
+	{
+		return casMultiplier;
+	}
+	public void setCASMultiplier()
+	{
+		casMultiplier = true;
+	}
+	public void setTopLevel()
+	{
+		topLevel = true;
+	}
+	public boolean isTopLevel()
+	{
+		return topLevel;
+	}
 
+	public String getServiceKey() {
+		return serviceKey;
+	}
+
+	public void setServiceKey(String serviceKey) {
+		this.serviceKey = serviceKey;
+	}
+	
+	public String getReplyQueueName()
+	{
+		return replyQueueName;
+	}
+	
+	public void setReplyQueueName(String aReplyQueueName)
+	{
+		replyQueueName = aReplyQueueName;
+	}
+
+	public boolean isAggregate() {
+		return aggregate;
+	}
+
+	public void setAggregate(boolean aggregate) {
+		this.aggregate = aggregate;
+	}
 
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfoMBean.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfoMBean.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfoMBean.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfoMBean.java Fri Aug 22 11:53:05 2008
@@ -25,6 +25,11 @@
 {
 	public String getState();
 	public String getInputQueueName();
+	public String getReplyQueueName();
 	public String getBrokerURL();
 	public String[] getDeploymentDescriptor();
+	public boolean isCASMultiplier();
+	public boolean isTopLevel();
+	public String getServiceKey();
+	public boolean isAggregate();
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java Fri Aug 22 11:53:05 2008
@@ -23,12 +23,14 @@
 import java.text.FieldPosition;
 import java.util.Formatter;
 
+import org.apache.uima.aae.controller.AnalysisEngineController;
+
 
 public class ServicePerformance implements ServicePerformanceMBean
 {
 	private static final long serialVersionUID = 1L;
 	private static final String label="Service Performance";
-	private long accumulatedIdleTime=0;
+	private long idleTime=0;
 	private long numberOfCASesProcessed=0;
 	private long casDeserializationTime=0;
 	private long casSerializationTime=0;
@@ -36,9 +38,46 @@
 	private long maxSerializationTime=0;
 	private long maxDeserializationTime=0;
 	private long maxAnalysisTime=0;
-	
+	private long casPoolWaitTime=0;
+	private long shadowCasPoolWaitTime=0;
+	private long timeSpentInCMGetNext = 0;
 	private Object sem = new Object();
+	private AnalysisEngineController controller;
+	private boolean isRemoteDelegate = false;
+	private long uptime = System.nanoTime();
+	long lastUpdate=System.nanoTime();
+	private int processThreadCount=1;
+	
+	private Object waitmux = new Object();
+	
+	private boolean waitingForCAS = false;
+	
+	private long totalWaitTimeForCAS = 0;
+	
+	private long lastCASWaitTimeUpdate = 0;
 
+	private Object shadowPoolMux = new Object();
+	private boolean waitingForSPCAS = false;
+	private long lastSPCASWaitTimeUpdate = 0;
+	
+	private Object getNextMux = new Object();
+	private boolean waitingInGetNext = false;
+	private long lastGetNextWaitTimeUpdate = 0;
+	private long totalGetNextWaitTime = 0;
+	
+	public ServicePerformance()
+	{
+	}
+
+	public ServicePerformance(AnalysisEngineController aController)
+	{
+		controller = aController;
+	}
+	
+	public void setRemoteDelegate()
+	{
+		isRemoteDelegate = true;
+	}
 	public String getLabel()
 	{
 		return label;
@@ -46,32 +85,56 @@
 
 	public synchronized void reset()
 	{
-		accumulatedIdleTime = 0;
+		idleTime = 0;
 		numberOfCASesProcessed=0;
 		casDeserializationTime=0;
 		casSerializationTime=0;
+		casPoolWaitTime = 0;
+		shadowCasPoolWaitTime=0;
 		analysisTime=0;
+		maxSerializationTime=0;
+		maxDeserializationTime=0;
+		maxAnalysisTime=0;
+		timeSpentInCMGetNext = 0;
+		uptime = System.nanoTime();
+	}
+	
+	
+	public void setIdleTime( long anIdleTime )
+	{
+		synchronized( sem )
+		{
+			idleTime = anIdleTime;
+		}
 	}
 	public double getIdleTime()
 	{
-		if ( accumulatedIdleTime != 0)
+		
+		if ( controller != null )
+		{
+			//	Force update of the idle time
+			return ((double)controller.getIdleTime()/(double) 1000000);
+		}
+		else
+		{
 			synchronized( sem )
 			{
-				return((double)accumulatedIdleTime/(double) 1000000);
+				return ((double)idleTime/(double) 1000000);
 			}
-		else
-			return 0;
+			
+		}
 	}
 
 	public long getRawIdleTime()
 	{
-		return accumulatedIdleTime;
+		return idleTime;
 	}
 	public void incrementIdleTime(long anIdleTime)
 	{
 		synchronized( sem )
 		{
-			accumulatedIdleTime += anIdleTime;
+			idleTime += anIdleTime;
+			lastUpdate = System.nanoTime();
 		}
 	}
 
@@ -89,7 +152,22 @@
 	
 	public double getAnalysisTime()
 	{
-		return (double)analysisTime/(double)1000000;
+//		return (double)analysisTime/(double)1000000;
+		
+		if ( controller != null )
+		{
+			return ((double)controller.getAnalysisTime()/(double) 1000000);
+		}
+		else
+		{
+			synchronized( sem )
+			{
+				return (double)analysisTime/(double)1000000;
+			}
+			
+		}
+		
+		
 	}
 	
 	public long getRawAnalysisTime()
@@ -157,4 +235,158 @@
 	{
 		return (double)maxAnalysisTime / (double)1000000;
 	}
+	public void incrementCasPoolWaitTime(long aCasPoolsWaitTime )
+	{
+		synchronized (sem ) 
+		{
+			casPoolWaitTime += aCasPoolsWaitTime;
+		}
+	}
+	public double getCasPoolWaitTime()
+	{
+		synchronized (sem ) 
+		{
+			return (double)getTimeWaitingForCAS()/(double)1000000;
+		}
+	}
+	public double getShadowCasPoolWaitTime()
+	{
+		return ((double)getTimeWaitingForShadowPoolCAS()/(double) 1000000);
+	}
+	
+	public double getTimeSpentInCMGetNext()
+	{
+		//	Force update of the wait time
+		return ((double)getTimeWaitingInGetNext()/(double) 1000000);
+	}
+
+	public void beginWaitOnCASPool()
+	{
+		synchronized( waitmux )
+		{
+			if ( !waitingForCAS )
+			{
+				waitingForCAS = true;
+				lastCASWaitTimeUpdate = System.nanoTime();
+			}
+		}
+	}
+	public void endWaitOnCASPool()
+	{
+		synchronized( waitmux ) 
+		{
+			long delta= (System.nanoTime() - lastCASWaitTimeUpdate); 
+			totalWaitTimeForCAS += delta;
+			waitingForCAS = false;
+		}
+	}
+
+
+	
+	public long getTimeWaitingForCAS()
+	{
+		synchronized( waitmux )
+		{
+			long now = System.nanoTime();
+			if ( waitingForCAS )
+			{
+				long delta= (System.nanoTime() - lastCASWaitTimeUpdate); 
+				totalWaitTimeForCAS += delta;
+				lastCASWaitTimeUpdate = now;					
+			}
+			return totalWaitTimeForCAS;
+		}
+	}
+	
+	
+	
+	public void beginWaitOnShadowCASPool()
+	{
+		synchronized( shadowPoolMux )
+		{
+			if ( !waitingForSPCAS )
+			{
+				waitingForSPCAS = true;
+				lastSPCASWaitTimeUpdate = System.nanoTime();
+			}
+		}
+	}
+	public void endWaitOnShadowCASPool()
+	{
+		synchronized( shadowPoolMux ) 
+		{
+			long delta= (System.nanoTime() - lastSPCASWaitTimeUpdate); 
+			shadowCasPoolWaitTime += delta;
+			waitingForSPCAS = false;
+		}
+	}
+	
+	public long getTimeWaitingForShadowPoolCAS()
+	{
+		synchronized( shadowPoolMux )
+		{
+			long now = System.nanoTime();
+			if ( waitingForSPCAS )
+			{
+				long delta= (System.nanoTime() - lastSPCASWaitTimeUpdate); 
+				shadowCasPoolWaitTime += delta;
+				lastSPCASWaitTimeUpdate = now;					
+			}
+			return shadowCasPoolWaitTime;
+		}
+		
+	}
+	
+	
+	public void beginGetNextWait()
+	{
+		synchronized( getNextMux )
+		{
+			if ( !waitingInGetNext )
+			{
+				waitingInGetNext = true;
+				lastGetNextWaitTimeUpdate = System.nanoTime();
+			}
+			else
+			{
+				
+			}
+		}
+	}
+	public void endGetNextWait()
+	{
+		synchronized( getNextMux )
+		{
+			long delta= (System.nanoTime() - lastGetNextWaitTimeUpdate); 
+			totalGetNextWaitTime += delta;
+			waitingInGetNext = false;
+		}
+	}
+	
+	public long getTimeWaitingInGetNext()
+	{
+		synchronized( getNextMux )
+		{
+			long now = System.nanoTime();
+			if ( waitingInGetNext )
+			{
+				long delta= (System.nanoTime() - lastGetNextWaitTimeUpdate); 
+				totalGetNextWaitTime += delta;
+				lastGetNextWaitTimeUpdate = now;					
+			}
+			return totalGetNextWaitTime;
+		}
+
+	}
+
+	public int getProcessThreadCount() {
+		return processThreadCount;
+	}
+
+	public void setProcessThreadCount(int processThreadCount) {
+		this.processThreadCount = processThreadCount;
+	}
+	
+	
+	
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformanceMBean.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformanceMBean.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformanceMBean.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformanceMBean.java Fri Aug 22 11:53:05 2008
@@ -32,5 +32,8 @@
 	public double getCasDeserializationTime();
 	public double getCasSerializationTime();
 	public double getAnalysisTime();
-	
+	public double getCasPoolWaitTime();
+	public double getShadowCasPoolWaitTime();
+	public double getTimeSpentInCMGetNext();
+	public int getProcessThreadCount();
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java Fri Aug 22 11:53:05 2008
@@ -23,7 +23,6 @@
 public interface AsynchAEMessage
 {
 	public static final String TotalTimeSpentInAnalytic = "TimeInAnalytic";
-	public static final String SendTime = "SendTime";
 	public static final String TimeInService = "TimeInService";
 	public static final String Endpoint = "Endpoint";
 	public static final String DelegateStats = "DelegateStats";
@@ -31,14 +30,13 @@
 	
 	public static final String CasReference = "CasReference";
 	public static final String InputCasReference = "InputCasReference";
-//	public static final String CasReferenceId = "CasReferenceId";
 	public static final String MessageFrom = "MessageFrom";
 	public static final String XCASREFId = "XCASRefId";
 	public static final String XCas = "XCas";
 	public static final String AEMetadata = "Metadata";
 	public static final String CasSequence = "CasSequence";
 	public static final String ReplyToEndpoint = "ReplyToEndpoint";
-	
+	public static final String EndpointServer = "EndpointServer";
 	public static final String ServerIP = "ServerIP";
 	public static final String RemoveEndpoint = "RemoveEndpoint";
 	public static final String Aborted = "Aborted";

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties Fri Aug 22 11:53:05 2008
@@ -61,6 +61,9 @@
 UIMAEE_worker_thread_terminated__INFO = Worker Thread: {0} Terminated
 UIMAEE_request_cas__FINE = Requesting CAS from CasPool For CAS Deserialization. Cas Sent From: {0}
 UIMAEE_request_cas_granted__FINE = Obtained CAS from CasPool For CAS Deserialization. Cas Sent From: {0}
+UIMAEE_request_cas_cm__FINE = Requesting CAS from Cas Multipliers Shadow Cas Pool Identified By Key: {0} For CAS Deserialization. 
+UIMAEE_request_cas_granted_cm__FINE = Obtained CAS from Cas Multipliers Shadow Cas Pool Identified By Key: {0} For CAS Deserialization. 
+
 UIMAEE_deserialized_cas_ready_to_process_FINE = Deserialized CAS from XMI. CAS Sent From: {0}
 UIMAEE_got_cas__FINEST = Got Cas From: {0} Cas Received Contains: {1}
 UIMAEE_duplicate_request__INFO = Duplicate Request With Cas Reference Id: {0} Received. Ignoring Duplicate.
@@ -121,7 +124,7 @@
 UIMAEE_cas_retries_exceeded__FINE = Controller: {0} Process CAS Retries Exceeded Or Not Configured. Delegate: {1} Cas Reference Id: {2}
 UIMAEE_process_cas_exceeded_threshold__INFO = Controller: {0} Process CAS Threshold Exceeded Configured Maximum.Delegate: {1} Cas Reference Id: {2} Threshold: {3} Action To Take: {4}
 UIMAEE_ignore_error__INFO = Controller: {0} Ignoring Error Of Class: {1} 
-UIMAEE_show_cache_entry_key__FINEST = InProcessCache Contains: {0} Entries: {1}
+UIMAEE_show_cache_entry_key__FINEST = Controller: {0} InProcessCache Contains: {1} Entries: {2}
 UIMAEE_remove_cache_entry_for_cas__FINEST = Removing Cache Entry For Cas Reference Id: {0}
 UIMAEE_cas_is_null_remove_from_cache_failed__FINEST = Unable to Remove Cache Entry. Cas Reference Id is Null
 UIMAEE_cas_is_invalid_remove_from_cache_failed__FINEST = Unable to Remove Cache Entry. Provided Cas Reference Id: {0} is not a valid key.
@@ -148,3 +151,10 @@
 UIMAEE_final_step_parent_cas_child_count__FINEST = Controller: {0} Final Step - Parent Cas Reference Id: {1} Has In-Play Subordinate CASes. Current Subordinate CAS Count: {2}
 UIMAEE_final_step_parent_cas_no_children__FINEST = Controller: {0} Final Step - Parent Cas Reference Id: {1} Has No Subordinate CASes Being Processed.
 UIMAEE_unable_to_check_ae_back_to_pool__WARNING = Controller: {0} Unable to Check In an Instance Of AE While Processing CPC. Exception {1} 
+UIMAEE_sending_fcq_req__FINE = Controller: {0} Sending Request To Release CAS: {1} To Cas Multiplier:{2} Queue: {3}
+UIMAEE_remove_cache_entry__INFO = Controller: {0} Releasing CASes Produced From Input CAS: {1}
+UIMAEE_dump_msg_origin__FINE = Controller: {0} Origin Map Dump {1}
+UIMAEE_show_abbrev_cache_stats___FINE = Controller: {0} Number of CASes In the Cache: {1} Number of CASes in Final State: {2}  
+UIMAEE_service_idle_time_cas_pool_INFO = \tTimestamp:\t{0}\t[ {1} ]\tCM:\t{2}\tRemote:\t{3}\tIdle\t{4}\tCASes\t{5}\tInQDepth\t{6}\tRQDepth\t{7}\tCP  Wait\t{8}\tAnalysis:\t{9}\tThreadCnt:\t{10}\tCMFreeCasCount:\t{11}
+UIMAEE_service_idle_time_shadow_cas_pool_INFO = \tTimestamp:\t{0}\t[ {1} ]\tCM:\t{2}\tRemote:\t{3}\tIdle\t{4}\tCASes\t{5}\tInQDepth\t{6}\tRQDepth\t{7}\tSCP Wait\t{8}\tAnalysis:\t{9}\tThreadCnt:\t{10}\tCMFreeCasCount:\t{11}
+UIMAEE_marker_INFO = ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++