You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/08/22 20:53:06 UTC

svn commit: r688174 [2/3] - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main: java/org/apache/uima/aae/ java/org/apache/uima/aae/client/ java/org/apache/uima/aae/controller/ java/org/apache/uima/aae/error/handler/ java/org/apache/uima/aa...

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Fri Aug 22 11:53:05 2008
@@ -19,6 +19,8 @@
 
 package org.apache.uima.aae.controller;
 
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -64,6 +66,7 @@
 import org.apache.uima.analysis_engine.impl.AnalysisEngineManagementImpl;
 import org.apache.uima.analysis_engine.metadata.SofaMapping;
 import org.apache.uima.cas.CAS;
+import org.apache.uima.collection.CollectionReaderDescription;
 import org.apache.uima.resource.Resource;
 import org.apache.uima.resource.ResourceCreationSpecifier;
 import org.apache.uima.resource.ResourceSpecifier;
@@ -148,6 +151,30 @@
 	
 	protected ConcurrentHashMap perCasStatistics = new ConcurrentHashMap();
 
+	private boolean casMultiplier = false;
+	
+	protected Object syncObject = new Object();
+	
+	//	Map holding outstanding CASes produced by Cas Multiplier that have to be acked
+	protected ConcurrentHashMap cmOutstandingCASes = new ConcurrentHashMap();
+	
+	private Object mux = new Object();
+	
+	private Object waitmux = new Object();
+	
+	private boolean waitingForCAS = false;
+	
+	private long startTime = System.nanoTime();
+	
+	private long totalWaitTimeForCAS = 0;
+	
+	private long lastCASWaitTimeUpdate = 0;
+
+	private Map<Long, AnalysisThreadState> threadStateMap =
+		new HashMap<Long,AnalysisThreadState>();
+	
+	
+
 	public BaseAnalysisEngineController(AnalysisEngineController aParentController, int aComponentCasPoolSize, String anEndpointName, String aDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache) throws Exception
 	{
 		this(aParentController, aComponentCasPoolSize, 0, anEndpointName, aDescriptor, aCasManager, anInProcessCache, null, null);
@@ -213,7 +240,14 @@
                 new Object[] { endpointName });
 		
 		resourceSpecifier = UimaClassFactory.produceResourceSpecifier(aDescriptor);
-
+		//	Is this service a CAS Multiplier?
+		if ( (resourceSpecifier instanceof AnalysisEngineDescription &&
+				((AnalysisEngineDescription) resourceSpecifier).getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes()) 
+				|| resourceSpecifier instanceof CollectionReaderDescription)
+		{
+			casMultiplier = true;
+		}
+		
 		paramsMap = new HashMap();
 		if ( aJmxManagement == null )
 		{
@@ -239,8 +273,9 @@
 				getUimaContextAdmin().getManagementInterface();
 			//	Override uima core jmx domain setting
 			mbean.setName(getComponentName(), getUimaContextAdmin(),jmxManagement.getJmxDomain());
-			if ( this instanceof PrimitiveAnalysisEngineController && resourceSpecifier instanceof AnalysisEngineDescription )
+			if ( resourceSpecifier instanceof AnalysisEngineDescription )
 			{
+				//	Is this service a CAS Multiplier?
 				if ( ((AnalysisEngineDescription) resourceSpecifier).getAnalysisEngineMetaData().getOperationalProperties().getOutputsNewCASes() )
 				{
 					System.out.println(getName()+"-Initializing CAS Pool for Context:"+getUimaContextAdmin().getQualifiedContextName());
@@ -350,11 +385,6 @@
 		}
 		if ( this instanceof PrimitiveAnalysisEngineController )
 		{
-//			if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.ProcessCount)) == null )
-//			{
-//				statistic = new LongNumericStatistic(Monitor.ProcessCount);
-//				getMonitor().addStatistic("", statistic);
-//			}
 			if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.ProcessErrorCount)) == null )
 			{
 				statistic = new LongNumericStatistic(Monitor.ProcessErrorCount);
@@ -450,43 +480,52 @@
 		
 		String name = "";
 		int index = getIndex(); 
-		servicePerformance = new ServicePerformance();
-//		name = getJMXDomain()+key_value_list+",name="+thisComponentName+"_"+servicePerformance.getLabel();
+		servicePerformance = new ServicePerformance(this);
 		name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+servicePerformance.getLabel();
 		
-		
 		registerWithAgent(servicePerformance, name );
-
+		servicePerformance.setIdleTime(System.nanoTime());
+		
 		ServiceInfo serviceInfo = getInputChannel().getServiceInfo();
 		ServiceInfo pServiceInfo = null;
 
 		if ( this instanceof PrimitiveAnalysisEngineController )
 		{
 			pServiceInfo = ((PrimitiveAnalysisEngineController)this).getServiceInfo();
+			servicePerformance.setProcessThreadCount(((PrimitiveAnalysisEngineController)this).getServiceInfo().getAnalysisEngineInstanceCount());
+			//	If this is a Cas Multiplier, add the key to the JMX MBean.
+			//	This will help the JMX Monitor to fetch the CM Cas Pool MBean
+			if ( isCasMultiplier() )
+			{
+				pServiceInfo.setServiceKey(getUimaContextAdmin().getQualifiedContextName());
+			}
 		}
 		else
 		{
 			pServiceInfo = 
 				((AggregateAnalysisEngineController)this).getServiceInfo();
+			pServiceInfo.setAggregate(true);
 		}
 		if ( pServiceInfo != null )
 		{
-//			name = getJMXDomain()+key_value_list+",name="+thisComponentName+"_"+serviceInfo.getLabel();
 			name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+serviceInfo.getLabel();
-			
-			
 			if ( !isTopLevelComponent() )
 			{
 				pServiceInfo.setBrokerURL("Embedded Broker");
 			}
+			else
+			{
+				pServiceInfo.setTopLevel();
+			}
+			if ( isCasMultiplier())
+			{
+				pServiceInfo.setCASMultiplier();
+			}
 			registerWithAgent(pServiceInfo, name );
 		}
 
 		serviceErrors = new ServiceErrors();
-//		name = getJMXDomain()+key_value_list+",name="+thisComponentName+"_"+serviceErrors.getLabel();
 		name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+serviceErrors.getLabel();
-		
-		
 		registerWithAgent(serviceErrors, name );
 	}
 
@@ -519,6 +558,7 @@
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
 	                "initializeComponentCasPool", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_pool_config_INFO",
 	                new Object[] { getComponentName(), getUimaContextAdmin().getQualifiedContextName(), aComponentCasPoolSize, anInitialCasHeapSize/4});
+			
 		}
 
 	}
@@ -573,6 +613,10 @@
 			sInfo.setInputQueueName(aServiceInfo.getInputQueueName());
 			sInfo.setState(aServiceInfo.getState());
 			sInfo.setDeploymentDescriptor(deploymentDescriptor);
+			if ( isCasMultiplier())
+			{
+				sInfo.setCASMultiplier();
+			}
 		}
 		else
 		{
@@ -661,23 +705,12 @@
 			registeredWithJMXServer = true;
 			registerServiceWithJMX(jmxContext, false);
 		}
-/*		
-		if ( this instanceof AggregateAnalysisEngineController )
-		{
-			AggregateAnalysisEngineController aC = (AggregateAnalysisEngineController)this;
-			 if ( aC.requestForMetaSentToRemotes() == false && allDelegatesAreRemote )
-			 {
-				 aC.setRequestForMetaSentToRemotes();
-				 aC.sendRequestForMetadataToRemoteDelegates();
-			 }
-*/
 	}
 	public void addInputChannel( InputChannel anInputChannel )
 	{
 		if ( !inputChannelMap.containsKey(anInputChannel.getInputQueueName()))
 		{
 			inputChannelMap.put(anInputChannel.getInputQueueName(), anInputChannel);
-			
 		}
 	}
 	public InputChannel getInputChannel()
@@ -711,26 +744,7 @@
 	{
 		return replyTime;
 	}
-	
-	public long getIdleTime( String aKey )
-	{
-		return idleTime;
-	}
-	
-	public synchronized void saveIdleTime( long snapshot, String aKey, boolean accumulate )
-	{
-		if ( accumulate )
-		{
-			LongNumericStatistic statistic;
-			//	Accumulate idle time across all processing threads
-			if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.IdleTime)) != null )
-			{
-				statistic.increment(snapshot);
-			}
-		}
-		getServicePerformance().incrementIdleTime(snapshot);
-		idleTime += snapshot;
-	}
+
 	protected void handleAction( String anAction, String anEndpoint, ErrorContext anErrorContext )
 	throws Exception
 	{
@@ -865,7 +879,7 @@
 				                "dropCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_removed_cache_entry__FINE",
 				                new Object[] {aCasReferenceId, getComponentName() });
 					}
-					inProcessCache.dumpContents();
+					inProcessCache.dumpContents(getComponentName());
 				}	
 			}
 			//	Remove stats from the map maintaining CAS specific stats
@@ -1081,7 +1095,7 @@
 		}
 		else
 		{
-			casStats = new ServicePerformance();
+			casStats = new ServicePerformance(this);
 			perCasStatistics.put( aCasReferenceId, casStats);
 		}
 		return casStats;
@@ -1111,53 +1125,6 @@
 	}
 	
 	/**
-	 * Logs controller statistics in a uima log.
-	 * 
-	 * @param aComponentName 
-	 * @param aStatsMap
-	 */
-/*	
-	protected void logStats(String aComponentName, Map aStatsMap)
-	{
-		float totalIdleTime = 0;
-		long numberCASesProcessed = 0;
-		float totalDeserializeTime = 0;
-		float totalSerializeTime = 0;
-		
-		if ( aStatsMap.containsKey(Monitor.IdleTime))
-		{
-			totalIdleTime = ((Float)aStatsMap.get(Monitor.IdleTime)).floatValue();
-		}
-		if ( aStatsMap.containsKey(Monitor.ProcessCount))
-		{
-			numberCASesProcessed = ((Long)aStatsMap.get(Monitor.ProcessCount)).longValue();
-		}
-		if ( aStatsMap.containsKey(Monitor.TotalDeserializeTime))
-		{
-			totalDeserializeTime = ((Float)aStatsMap.get(Monitor.TotalDeserializeTime)).floatValue();
-		}
-		if ( aStatsMap.containsKey(Monitor.TotalDeserializeTime))
-		{
-			totalSerializeTime = ((Float)aStatsMap.get(Monitor.TotalSerializeTime)).floatValue();
-		}
-		float totalAEProcessTime=0;
-		if ( aStatsMap.containsKey(Monitor.TotalAEProcessTime))
-		{
-			totalAEProcessTime = ((Float)aStatsMap.get(Monitor.TotalAEProcessTime)).floatValue();
-		}
-
-		if ( totalAEProcessTime > 0 )
-		{
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "logStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_dump_primitive_stats__INFO", new Object[] { aComponentName, totalIdleTime, numberCASesProcessed, totalDeserializeTime, totalSerializeTime, totalAEProcessTime });
-		}
-		else
-		{
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "logStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_dump_aggregate_stats__INFO", new Object[] { aComponentName, totalIdleTime, numberCASesProcessed, totalDeserializeTime, totalSerializeTime });
-		}
-		
-	}
-*/	
-	/**
 	 * Clears controller statistics.
 	 * 
 	 */
@@ -1176,6 +1143,9 @@
 		}
 		//	Clear CAS statistics
 		perCasStatistics.clear();
+		
+		
+	
 	}
 	/**
 	 * Returns a copy of the controller statistics.
@@ -1278,10 +1248,12 @@
 	{
 		return getInputChannel().getInputQueueName();
 	}
+/*
 	public long getIdleTime()
 	{
 		return 0;
 	}
+*/	
 	public long getTotalTimeSpentSerializingCAS()
 	{
 		return 0;
@@ -1366,14 +1338,14 @@
 			((AggregateAnalysisEngineController_impl)this).stopTimers();
 			//	Stops ALL input channels of this service including the reply channels
 			stopInputChannels();
-			int childControllerListSize = ((AggregateAnalysisEngineController_impl)this).childControllerList.size();
+			int childControllerListSize = ((AggregateAnalysisEngineController_impl)this).getChildControllerList().size();
 			//	send terminate event to all collocated child controllers
 			if ( childControllerListSize > 0 )
 			{
 				for( int i=0; i < childControllerListSize; i++ )
 				{
 					AnalysisEngineController childController = 
-						(AnalysisEngineController)((AggregateAnalysisEngineController_impl)this).childControllerList.get(i);
+						(AnalysisEngineController)((AggregateAnalysisEngineController_impl)this).getChildControllerList().get(i);
 					
 					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "stop", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stop_delegate__INFO", new Object[] { getComponentName(), childController.getComponentName() });
 					childController.stop();
@@ -1488,23 +1460,6 @@
 			//	fully processed.
 			stopCasMultiplier();
 			stop();
-/*
-			//	If the InProcessCache is not empty ( CASes are still in play), register self
-			//	(the top level controller) to receive a callback when all CASes are fully
-			//	processed and the cache becomes empty. 
-			if ( !getInProcessCache().isEmpty() )
-			{
-				System.out.println("Controller:"+getComponentName()+" Cache Not Empty. Registering Self For Callback");			
-				getInProcessCache().dumpContents();
-				getInProcessCache().registerCallbackWhenCacheEmpty(this.getEventListener());
-			}
-			else  
-			{
-				// Cache is already empty - trigger shutdown. If this controller is an 
-			    // aggregate, it will propagate stop() down the delegate hierarchy
-				getEventListener().onCacheEmpty();
-			}
-*/		
 		}
 	}
 
@@ -1621,16 +1576,14 @@
 	
 	public AnalysisEngineController getCasMultiplierController()
 	{
-		int childControllerListSize = ((AggregateAnalysisEngineController_impl)this).childControllerList.size();
+		int childControllerListSize = ((AggregateAnalysisEngineController_impl)this).getChildControllerList().size();
 		if ( childControllerListSize > 0 )
 		{
 			for( int i=0; i < childControllerListSize; i++ )
 			{
 				AnalysisEngineController childController = 
-					(AnalysisEngineController)((AggregateAnalysisEngineController_impl)this).childControllerList.get(i);
-				if ( childController instanceof PrimitiveAnalysisEngineController  &&
-				    ((PrimitiveAnalysisEngineController)childController).isMultiplier()
-			       )
+					(AnalysisEngineController)((AggregateAnalysisEngineController_impl)this).getChildControllerList().get(i);
+				if ( childController.isCasMultiplier() )
 				{
 					return childController;
 				}
@@ -1651,7 +1604,6 @@
 		}
 		return null;
 	}
-	
 	 
 	/**
 	 * Callback method called the InProcessCache becomes empty meaning ALL CASes are processed.
@@ -1725,12 +1677,12 @@
         if ( e != null )
         {
           ((ControllerCallbackListener)controllerListeners.get(i)).
-              notifyOnInitializationFailure(e);
+              notifyOnInitializationFailure(this, e);
         }
         else
         {
           ((ControllerCallbackListener)controllerListeners.get(i)).
-              notifyOnInitializationSuccess();
+              notifyOnInitializationSuccess(this);
         }
       }
 	  }
@@ -1742,4 +1694,416 @@
 				perCasStatistics.remove(aCasReferenceId);
 		}
 	  }
+	  
+	  public boolean isCasMultiplier()
+	  {
+		  return casMultiplier;
+	  }
+	  
+		public void releaseNextCas(String casReferenceId)
+		{
+			synchronized(syncObject)
+			{
+				//	Check if the CAS is in the list of outstanding CASes and also exists in the cache
+				if ( cmOutstandingCASes.size() > 0 && cmOutstandingCASes.containsKey(casReferenceId) && getInProcessCache().entryExists(casReferenceId))
+				{
+					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+			                "releaseNextCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_release_cas_req__FINE",
+			                new Object[] { getComponentName(), casReferenceId });
+					try
+					{
+						CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(casReferenceId);
+						String parentCasReferenceId = cacheEntry.getInputCasReferenceId(); 
+						Endpoint freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
+						//	If the CAS was created by a remote Cas Multiplier, send a Free CAS Notification
+						//	to the CM.
+						if ( freeCasEndpoint != null )
+						{
+							UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+					                "releaseNextCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_sending_fcq_req__FINE",
+					                new Object[] { getComponentName(), casReferenceId, cacheEntry.getCasMultiplierKey(), freeCasEndpoint.getDestination() });
+							freeCasEndpoint.setReplyEndpoint(true);
+							getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, casReferenceId, freeCasEndpoint);
+						}
+						cacheEntry = null;
+						//	Release the CAS and remove it from the InProcess cache
+						dropCAS(casReferenceId, true);
+						//	Check if the CAS has a parent CAS
+						if ( parentCasReferenceId != null )
+						{
+							//	Fetch the parent CAS from the InProcess Cache
+							cacheEntry = getInProcessCache().getCacheEntryForCAS(parentCasReferenceId);
+							if ( cacheEntry != null  )
+							{
+								//	Decrement number of child CASes in play
+								//	Decrement has already happened in the final step before the CAS was sent to the client
+								//cacheEntry.decrementSubordinateCasInPlayCount();
+//								if ( cacheEntry.isPendingReply() && cacheEntry.getSubordinateCasInPlayCount() == 0)
+								if ( cacheEntry.isPendingReply() && getInProcessCache().hasNoSubordinates(cacheEntry.getCasReferenceId()))
+								{
+									if ( this instanceof AggregateAnalysisEngineController )
+									{
+										((AggregateAnalysisEngineController)this).finalStep( cacheEntry.getFinalStep(), parentCasReferenceId);
+									}
+									else // PrimitiveAnalysisEngineController 
+									{
+										//	Return an input CAS to the client. The input CAS is returned
+										//	to the remote client only if all of the child CASes produced
+										//	from the input CAS have been fully processed.
+										getOutputChannel().sendReply(cacheEntry.getCasReferenceId(), cacheEntry.getMessageOrigin());
+										dropCAS(cacheEntry.getCasReferenceId(), true);
+									}
+								}
+								
+							}
+						}
+						//	If debug level=FINEST dump the entire cache
+						getInProcessCache().dumpContents(getComponentName());
+
+					}
+					catch( Exception e)
+					{
+						e.printStackTrace();
+						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+				                "releaseNextCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
+				                new Object[] { e});
+					}
+				}
+			}
+		}
+		
+		private boolean validMessageForSnapshot( int msgType )
+		{
+			return ( AsynchAEMessage.Process == msgType || AsynchAEMessage.CollectionProcessComplete == msgType);
+		}
+		
+		//	Called by ServicePerformance MBean on separate thread 
+		
+		//	This is called every time a request comes
+		public void beginProcess(int msgType )
+		{
+			//	Disregard GetMeta as it comes on a non-process thread
+			if ( validMessageForSnapshot( msgType ) )
+			{
+				synchronized( mux )
+				{
+					AnalysisThreadState threadState = null;
+					if ( threadStateMap.containsKey(Thread.currentThread().getId()))
+					{
+						threadState = threadStateMap.get(Thread.currentThread().getId());
+						if (threadState.isIdle) {
+							threadState.setIdle(false);
+							threadState.incrementIdleTime(System.nanoTime()-threadState.getLastUpdate());
+							threadState.computeIdleTimeBetweenProcessCalls();
+						}
+					}
+					else
+					{
+						threadStateMap.put(Thread.currentThread().getId(), new AnalysisThreadState(Thread.currentThread().getId()));
+						
+						threadState = threadStateMap.get(Thread.currentThread().getId());
+						threadState.setIdle(false);
+						threadState.incrementIdleTime(System.nanoTime()-startTime);
+						threadState.setLastMessageDispatchTime(startTime);
+						threadState.computeIdleTimeBetweenProcessCalls();
+					}
+				}
+			}
+		}
+		//	This is called every time a request is completed
+		public void endProcess( int msgType )
+		{
+			//	Disregard GetMeta as it comes on a non-process thread
+			if ( validMessageForSnapshot( msgType ) )
+			{
+				synchronized( mux )
+				{
+					AnalysisThreadState threadState = getThreadState();					
+					threadState.setLastUpdate(System.nanoTime());
+					threadState.setIdle(true);
+					threadState.setLastMessageDispatchTime();
+				}
+			}
+		}
+		public long getIdleTimeBetweenProcessCalls(int msgType)
+		{
+			if ( validMessageForSnapshot( msgType ) )
+			{
+				synchronized( mux )
+				{
+					AnalysisThreadState threadState = getThreadState();					
+					return threadState.getIdleTimeBetweenProcessCalls();
+				}
+			}
+			return 0;
+		}
+		public long getIdleTime()
+		{
+			synchronized( mux )
+			{
+				long now = System.nanoTime();
+				long serviceIdleTime = 0;
+				Set<Long> set = threadStateMap.keySet();
+				int howManyThreads = threadStateMap.size();
+				//	Iterate over all processing threads to calculate the total amount of idle time 
+				for(Long key: set )
+				{	
+					//	Retrieve the current thread state information from the global map. The key is
+					//	the thread id.
+					 AnalysisThreadState threadState = threadStateMap.get(key);
+					 //	add this thread idle time
+					 serviceIdleTime += threadState.getIdleTime() ;
+					 
+					 //	If this thread is currently idle, compute amount of time elapsed since the last
+					 //	update. The last update has been done at the last startProcess() or endProcess() call.
+					 if ( threadState.isIdle())
+					 {
+						 //	compute idle time since the last update
+						 long delta = now - threadState.getLastUpdate();
+
+						 threadState.setLastUpdate(System.nanoTime());
+
+						 //	increment total idle time
+						 threadState.incrementIdleTime(delta);
+						 //	add the elapsed time since the last update to the total idle time
+						 serviceIdleTime += delta;
+					 }
+				}
+				//	If process CAS request has not yet been received, there are not process threads 
+				//	created yet. Simply return the delta since the service started. This is a special
+				//	case which is only executing if the client has not sent any CASes for processing.
+				if ( howManyThreads == 0)
+				{
+					return System.nanoTime()-startTime;
+				}
+				else
+				{
+					//	Return accumulated idle time from all processing threads. Divide the total idle by the 
+					//	number of process threads.
+					
+					if ( this instanceof PrimitiveAnalysisEngineController )
+					{
+						int aeInstanceCount = ((PrimitiveAnalysisEngineController)this).getAEInstanceCount();
+						serviceIdleTime += (aeInstanceCount - howManyThreads)*(System.nanoTime()-startTime);
+						return serviceIdleTime/aeInstanceCount;
+					}
+					else
+					{
+						return serviceIdleTime;
+					}
+				}
+			}
+		}
+
+		/**
+		 * Returns CPU Time with nanosecond precision (not nanosecond accuracy). If the OS/JVM
+		 * does not support reporting the CPU Time, returns the wall clock time. 
+		 */
+		public synchronized long getCpuTime() 
+		{
+			if ( ManagementFactory.getPlatformMBeanServer() != null )
+			{
+				ThreadMXBean bean = ManagementFactory.getThreadMXBean( );
+			    return bean.isCurrentThreadCpuTimeSupported( ) ? bean.getCurrentThreadCpuTime( ) : System.nanoTime();
+			}
+			return System.nanoTime();
+		}
+		private synchronized long getCpuTime(long threadId) 
+		{
+			if ( ManagementFactory.getPlatformMBeanServer() != null )
+			{
+				ThreadMXBean bean = ManagementFactory.getThreadMXBean( );
+			    return bean.isCurrentThreadCpuTimeSupported( ) ? bean.getThreadCpuTime(threadId): System.nanoTime();
+			}
+			return System.nanoTime();
+		}
+		
+		private AnalysisThreadState getFirstThreadState()
+		{
+			Set<Long> set = threadStateMap.keySet();
+			Iterator<Long> it = set.iterator();
+			return threadStateMap.get(it.next());
+
+		}
+		/**
+		 * Returns the {@link AnalysisThreadState} object associated with the current thread.
+		 * 
+		 * @return
+		 */
+		private AnalysisThreadState getThreadState()
+		{
+			AnalysisThreadState threadState;
+			if ( this instanceof AggregateAnalysisEngineController )
+			{
+				threadState = getFirstThreadState();
+			}
+			else
+			{
+				threadState = threadStateMap.get(Thread.currentThread().getId());
+				if ( threadState == null )
+				{
+					//	This may be the case if the thread processing
+					//	FreeCASRequest is returning an input CAS to the client.
+					//	This thread is different from the process thread, thus
+					//	we just return the first thread's state.
+					threadState = getFirstThreadState();
+				}
+			}
+			return threadState;
+		}
+		
+		/**
+		 * Returns the total CPU time all processing threads spent in analysis.
+		 * This method subtracts the serialization and de-serialization time from
+		 * the total. If this service is an aggregate, the return time is a sum
+		 * of CPU utilization in each colocated delegate.
+		 */
+		public long getAnalysisTime()
+		{
+			Set<Long> set = threadStateMap.keySet();
+			Iterator<Long> it = set.iterator();
+			long totalCpuProcessTime = 0;
+			//	Iterate over all processing threads
+			while( it.hasNext())
+			{
+				long threadId = it.next();
+				synchronized( mux )
+				{
+					//	Fetch the next thread's stats
+					AnalysisThreadState threadState = threadStateMap.get(threadId);
+					//	If an Aggregate service, sum up the CPU times of all collocated
+					//	delegates.
+					if ( this instanceof AggregateAnalysisEngineController_impl )
+					{
+						//	Get a list of all colocated delegate controllers from the Aggregate
+						List<AnalysisEngineController> delegateControllerList = 
+							((AggregateAnalysisEngineController_impl)this).childControllerList; 							
+						//	Iterate over all colocated delegates
+						for( int i=0; i < delegateControllerList.size(); i++)
+						{	
+							//	Get the next delegate's controller
+							AnalysisEngineController delegateController =
+								(AnalysisEngineController)delegateControllerList.get(i);
+							if ( delegateController != null && !delegateController.isStopped())
+							{
+								//	get the CPU time for all processing threads in the current controller
+								totalCpuProcessTime += delegateController.getAnalysisTime();
+							}
+						}
+					}
+					else  // Primitive Controller
+					{
+						//	Get the CPU time of a thread with a given ID
+						totalCpuProcessTime += getCpuTime(threadId);
+					}
+					//	Subtract serialization and deserialization times from the total CPU used
+					if ( totalCpuProcessTime > 0 )
+					{
+						totalCpuProcessTime -= threadState.getDeserializationTime();
+						totalCpuProcessTime -= threadState.getSerializationTime();
+					}
+				}
+			}
+			return totalCpuProcessTime;
+		}
+		/**
+		 * Increments the time this thread spent in serialization of a CAS
+		 */
+		public void incrementSerializationTime(long cpuTime)
+		{
+			synchronized( mux )
+			{
+				AnalysisThreadState threadState = getThreadState();
+				threadState.incrementSerializationTime(cpuTime);
+			}
+		}
+		/**
+		 * Increments the time this thread spent in deserialization of a CAS
+		 */
+		public void incrementDeserializationTime(long cpuTime)
+		{
+			synchronized( mux )
+			{
+				AnalysisThreadState threadState = getThreadState();
+				threadState.incrementDeserializationTime(cpuTime);
+			}
+		}
+		private class AnalysisThreadState
+		{
+			private long threadId;
+			
+			private boolean isIdle = false;
+			private long lastUpdate = 0;
+			private long totalIdleTime = 0;
+			//	Measures idle time between process CAS calls
+			private long idleTimeSinceLastProcess = 0;
+			private long lastMessageDispatchTime = 0;
+			
+			private long serializationTime = 0;
+			private long deserializationTime = 0;
+			
+			public AnalysisThreadState( long aThreadId )
+			{
+				threadId = aThreadId;
+			}
+			
+			public long getThreadId()
+			{
+				return threadId;
+			}
+			public long getSerializationTime() {
+				return serializationTime;
+			}
+			public void incrementSerializationTime(long serializationTime) {
+				this.serializationTime += serializationTime;
+			}
+			public long getDeserializationTime() {
+				return deserializationTime;
+			}
+			public void incrementDeserializationTime(long deserializationTime) {
+				this.deserializationTime += deserializationTime;
+			}
+			public boolean isIdle() {
+				return isIdle;
+			}
+			public void computeIdleTimeBetweenProcessCalls()
+			{
+				idleTimeSinceLastProcess = System.nanoTime() - lastMessageDispatchTime;
+			}
+			public void setLastMessageDispatchTime( long aTime )
+			{
+				lastMessageDispatchTime = aTime;
+			}
+			public void incrementIdleTime( long idleTime )
+			{
+				totalIdleTime += idleTime;
+			}
+			public void setIdle(boolean isIdle) {
+				this.isIdle = isIdle;
+			}
+			public long getIdleTime()
+			{
+				return totalIdleTime;
+			}
+			public void setLastMessageDispatchTime()
+			{
+				lastMessageDispatchTime = System.nanoTime();
+			}
+			public long getIdleTimeBetweenProcessCalls()
+			{
+				long val = idleTimeSinceLastProcess;
+				//	Reset so that only one reply contains a non-zero value
+				idleTimeSinceLastProcess = 0;
+				return val;
+			}
+			public long getLastUpdate() {
+				return lastUpdate;
+			}
+			public void setLastUpdate(long lastUpdate) {
+				this.lastUpdate = lastUpdate;
+			}
+			
+		}
+		
+		
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerCallbackListener.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerCallbackListener.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerCallbackListener.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/ControllerCallbackListener.java Fri Aug 22 11:53:05 2008
@@ -22,6 +22,8 @@
 public interface ControllerCallbackListener
 {
 	public void notifyOnTermination( String aMessage );
+	public void notifyOnInitializationFailure(AnalysisEngineController aController, Exception e);
+	public void notifyOnInitializationSuccess(AnalysisEngineController aController);
 	public void notifyOnInitializationFailure( Exception e);
 	public void notifyOnInitializationSuccess();
 

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint.java Fri Aug 22 11:53:05 2008
@@ -136,4 +136,8 @@
 	public void setIdleTime( long anIdleTime );
 	
 	public long getIdleTime();
+	
+	public void setEndpointServer( String anEndpointServer );
+	
+	public String getEndpointServer();
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/Endpoint_impl.java Fri Aug 22 11:53:05 2008
@@ -100,9 +100,15 @@
 	private boolean tempReplyDestination;
 		
 	private int initialHeapSize;
+	
 	private volatile boolean replyDestinationFailed;
 	
 	private long idleTime=0;
+	
+	//	This is supplied by the remote client. It needs to be
+	//	echoed back to the client. 
+	private String endpointServer = null;
+	
 	public int getCommand()
 	{
 		return command;
@@ -375,7 +381,7 @@
 	{
 		if ( serviceInfo == null )
 		{
-			serviceInfo = new ServiceInfo();
+			serviceInfo = new ServiceInfo(isCasMultiplier);
 			serviceInfo.setBrokerURL(serverURI);
 			serviceInfo.setInputQueueName(endpoint);
 			serviceInfo.setState("Active");
@@ -482,6 +488,7 @@
 	public void setIsCasMultiplier(boolean trueORfalse)
 	{
 		isCasMultiplier = trueORfalse;
+		getServiceInfo().setCASMultiplier();
 	}
 	public void setShadowCasPoolSize( int aPoolSize )
 	{
@@ -547,4 +554,13 @@
 	public String toString() {
 		return endpoint;
 	}
+	
+	public void setEndpointServer( String anEndpointServer ){
+		endpointServer = anEndpointServer;
+	}
+	
+	public String getEndpointServer() {
+		return endpointServer;
+	}
+
 }
\ No newline at end of file

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController.java Fri Aug 22 11:53:05 2008
@@ -25,10 +25,8 @@
 
 public interface PrimitiveAnalysisEngineController extends AnalysisEngineController
 {
-	public boolean isMultiplier();
-	public void releaseNextCas();
 	public void setAnalysisEngineInstancePool( AnalysisEngineInstancePool aPool);
 	public PrimitiveServiceInfo getServiceInfo();
 	public void addAbortedCasReferenceId( String aCasReferenceId );
-	
+	public int getAEInstanceCount();
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Fri Aug 22 11:53:05 2008
@@ -19,9 +19,12 @@
 
 package org.apache.uima.aae.controller;
 
+import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.management.ObjectName;
 
@@ -55,6 +58,8 @@
 import org.apache.uima.resource.metadata.impl.ConfigurationParameter_impl;
 import org.apache.uima.util.Level;
 
+import sun.management.snmp.jvminstr.JvmThreadInstanceEntryImpl.ThreadStateMap;
+
 public class PrimitiveAnalysisEngineController_impl 
 extends BaseAnalysisEngineController implements PrimitiveAnalysisEngineController
 {
@@ -70,17 +75,13 @@
 
 	private List aeList = new ArrayList();
 	
-	private List cmOutstandingCASes = new ArrayList();
-	
 	private int throttleWindow = 0;
 	
 	private Object gater = new Object();
 	
 	private long howManyBeforeReplySeen = 0;
 	
-	private boolean isMultiplier = false;
 	
-	private Object syncObject = new Object();
 	
 	private PrimitiveServiceInfo serviceInfo = null;
 
@@ -88,6 +89,7 @@
 	
 	private String abortedCASReferenceId = null;
 	
+	
 	public PrimitiveAnalysisEngineController_impl(String anEndpointName, String anAnalysisEngineDescriptor, AsynchAECasManager aCasManager, InProcessCache anInProcessCache, int aWorkQueueSize, int anAnalysisEnginePoolSize) throws Exception
 	{
 		this(null, anEndpointName, anAnalysisEngineDescriptor, aCasManager, anInProcessCache, aWorkQueueSize, anAnalysisEnginePoolSize, 0);
@@ -122,6 +124,11 @@
     this(aParentController, anEndpointName, anAnalysisEngineDescriptor, aCasManager, anInProcessCache, aWorkQueueSize, anAnalysisEnginePoolSize, 0, aJmxManagement);
   }
 
+  
+  public int getAEInstanceCount()
+  {
+	  return analysisEnginePoolSize;
+  }
   public void initialize() throws AsynchAEException
 	{
 		try
@@ -138,7 +145,7 @@
 			}
 			
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.CONFIG, getClass().getName(), "initialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_primitive_ctrl_init_info__CONFIG", new Object[] { analysisEnginePoolSize });
-
+			//	Instantiate and initialize UIMA analytics
 			for (int i = 0; i < analysisEnginePoolSize; i++)
 			{
 				AnalysisEngine ae =  UIMAFramework.produceAnalysisEngine(resourceSpecifier, paramsMap);
@@ -148,16 +155,11 @@
 				if (i == 0)
 				{
 					analysisEngineMetadata = ae.getAnalysisEngineMetaData();
-					if ( analysisEngineMetadata.getOperationalProperties().getOutputsNewCASes())
-					{
-						isMultiplier = true;
-					}
 				}
 			}
-			
 			if ( serviceInfo == null )
 			{
-				serviceInfo = new PrimitiveServiceInfo();
+				serviceInfo = new PrimitiveServiceInfo(isCasMultiplier());
 			}
 
 			serviceInfo.setAnalysisEngineInstanceCount(analysisEnginePoolSize);
@@ -176,12 +178,16 @@
 						if (isTopLevelComponent())
 						{
 							getCasManagerWrapper().initialize("PrimitiveAEService");
+							CAS cas = getCasManagerWrapper().getNewCas("PrimitiveAEService");
+							cas.release();
 						}
 					}
 					
+
 					// All internal components of this Primitive have been initialized. Open the latch
 					// so that this service can start processing requests.
 					latch.openLatch(getName(), isTopLevelComponent(), true);
+					
 				}
 				catch ( Exception e)
 				{
@@ -197,20 +203,20 @@
 		}
 		catch ( AsynchAEException e)
 		{
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "initialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+			e.printStackTrace();
 			throw e;
 		}
 		catch ( Exception e)
 		{
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "initialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+			e.printStackTrace();
 			throw new AsynchAEException(e);
 		}
 		
 		UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "initialize", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_initialized_controller__INFO", new Object[] { getComponentName() });
 		super.serviceInitialized = true;
 	}
-	public boolean isMultiplier()
-	{
-		return isMultiplier;
-	}
 
 	/**
 	 * 
@@ -218,7 +224,8 @@
 	public void collectionProcessComplete(Endpoint anEndpoint)// throws AsynchAEException
 	{
 		AnalysisEngine ae = null;
-		
+//		long start = System.nanoTime();
+		long start = super.getCpuTime();
 		try
 		{
 			ae = aeInstancePool.checkout();
@@ -227,8 +234,12 @@
 				ae.collectionProcessComplete();
 			}
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(), "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cpc_all_cases_processed__FINEST", new Object[] { getComponentName() });
+			getServicePerformance().incrementAnalysisTime(super.getCpuTime()-start);
+//			getServicePerformance().incrementAnalysisTime(System.nanoTime()-start);
 			getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, anEndpoint);
 			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "collectionProcessComplete", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cpc_completed__FINE", new Object[] { getComponentName()});
+			//getServicePerformance().reset();
+	
 		}
 		catch ( Exception e)
 		{
@@ -289,24 +300,8 @@
 		{
 			return;
 		}
-/*		
-		try
-		{
-		  //  Test to see if the connection to the reply endpoint can be created 
-		  //  If the client has died, dont waste time analyzing the CAS.
-		  getOutputChannel().bindWithClientEndpoint(anEndpoint);
-		}
-		catch( Exception e)
-		{
-		  if ( isTopLevelComponent() )
-		  {
-		    
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_client_drop_cas__INFO", new Object[] { getComponentName(), aCasReferenceId, anEndpoint.getEndpoint()});
-		    dropCAS(aCasReferenceId, true);
-		  }
-		  return;
-		}
-*/	
+		
+		boolean inputCASReturned = false;
 		boolean processingFailed = false;
 		// This is a primitive controller. No more processing is to be done on the Cas. Mark the destination as final and return CAS in reply.
 		anEndpoint.setFinal(true);
@@ -333,54 +328,46 @@
 				
 				return;
 			}
-			long time = System.nanoTime();
+			//	Get input CAS entry from the InProcess cache
+			CacheEntry inputCASEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+
+			long time = super.getCpuTime();
 			long totalProcessTime = 0;  // stored total time spent producing ALL CASes
+			
+//			super.beginAnalysis();
 			CasIterator casIterator = ae.processAndOutputNewCASes(aCAS);
+//			super.endAnalysis();
+			
 			//	Store how long it took to call processAndOutputNewCASes()
-			totalProcessTime = ( System.nanoTime() - time);
+			totalProcessTime = ( super.getCpuTime() - time);
 			long sequence = 1;
-			String newCasReferenceId = null;
 			long hasNextTime = 0;         // stores time in hasNext()
 			long getNextTime = 0;         // stores time in next();   
-			long timeToProcessCAS = 0;    // stores time in hasNext() and next() for each CAS
 			boolean moreCASesToProcess = true;
-/*			
-			
-			String parentCasReferenceId = null;
-			CacheEntry inputCasCacheEntry = null;
-			try
-			{
-				//	Fetch cache entry for the input CAS
-				inputCasCacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
-				parentCasReferenceId = inputCasCacheEntry.getInputCasReferenceId();
-			}
-			catch( Exception e )
-			{
-				//	An exception be be thrown here if the service is being stopped.
-				//	The top level controller may have already cleaned up the cache
-				//	and the getCacheEntryForCAS() will throw an exception. Ignore it
-				//	here, we are shutting down.
-			}
-*/
 			while (moreCASesToProcess)
 			{
-				hasNextTime = System.nanoTime();
+				long timeToProcessCAS = 0;    // stores time in hasNext() and next() for each CAS
+				hasNextTime = super.getCpuTime();
+//				super.beginAnalysis();
 				if ( !casIterator.hasNext() )
 				{
 					moreCASesToProcess = false;
 					//	Measure how long it took to call hasNext()
-					timeToProcessCAS = (System.nanoTime()-hasNextTime);
+					timeToProcessCAS = (super.getCpuTime()-hasNextTime);
 					totalProcessTime += timeToProcessCAS;
-					break;
+//					super.endAnalysis();
+					break;   // from while
 				}
 				//	Measure how long it took to call hasNext()
-				timeToProcessCAS = (System.nanoTime()-hasNextTime);
-				getNextTime = System.nanoTime();
+				timeToProcessCAS = (super.getCpuTime()-hasNextTime);
+				getNextTime = super.getCpuTime();
 				CAS casProduced = casIterator.next();
+//				super.endAnalysis();
 				//	Add how long it took to call next()
-				timeToProcessCAS += (System.nanoTime()- getNextTime);
+				timeToProcessCAS += (super.getCpuTime()- getNextTime);
                 //	Add time to call hasNext() and next() to the running total
 				totalProcessTime += timeToProcessCAS;
+				
 				//	If the service is stopped or aborted, stop generating new CASes and just return the input CAS
 				if ( stopped || abortGeneratingCASes(aCasReferenceId))
 				{
@@ -412,60 +399,61 @@
 				}
 				OutOfTypeSystemData otsd = getInProcessCache().getOutOfTypeSystemData(aCasReferenceId);
 				MessageContext mContext = getInProcessCache().getMessageAccessorByReference(aCasReferenceId);
-				sequence++;
-				newCasReferenceId = getInProcessCache().register( casProduced, mContext, otsd);
-				CacheEntry newEntry = getInProcessCache().getCacheEntryForCAS(newCasReferenceId);
-/*
-				if ( parentCasReferenceId == null )
-				{
-					newEntry.setInputCasReferenceId(aCasReferenceId);
-				}
-				else
-				{
-					newEntry.setInputCasReferenceId(parentCasReferenceId);
-				}
-*/
+				CacheEntry newEntry = getInProcessCache().register( casProduced, mContext, otsd);
+				//	Associate input CAS with the new CAS
 				newEntry.setInputCasReferenceId(aCasReferenceId);
+				newEntry.setCasSequence(sequence);
 				//	Add to the cache how long it took to process the generated (subordinate) CAS
-				getCasStatistics(newCasReferenceId).incrementAnalysisTime(timeToProcessCAS);
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_produced_new_cas__FINE", new Object[] { Thread.currentThread().getName(),getComponentName(),newCasReferenceId, aCasReferenceId });
+				getCasStatistics(newEntry.getCasReferenceId()).incrementAnalysisTime(timeToProcessCAS);
+				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_produced_new_cas__FINE", new Object[] { Thread.currentThread().getName(),getComponentName(),newEntry.getCasReferenceId(), aCasReferenceId });
+				//	Add the generated CAS to the outstanding CAS Map. Client notification will release
+				//	this CAS back to its pool
 				synchronized(syncObject)
 				{
-					cmOutstandingCASes.add(newCasReferenceId);
-					getOutputChannel().sendReply(casProduced, aCasReferenceId, newCasReferenceId, anEndpoint, sequence);
+					if ( isTopLevelComponent() )
+					{
+						inputCASEntry.incrementSubordinateCasInPlayCount();
+						//	Add the id of the generated CAS to the map holding outstanding CASes. This
+						//	map will be referenced when a client sends Free CAS Notification. The map
+						//	stores the id of the CAS both as a key and a value. Map is used to facilitate
+						//	quick lookup
+						cmOutstandingCASes.put(newEntry.getCasReferenceId(),newEntry.getCasReferenceId());
+					}
+					//	Send generated CAS to the client
+					getOutputChannel().sendReply(newEntry, anEndpoint);
 				}
 				//	Remove Stats from the global Map associated with the new CAS
-				//	This stats for this CAS were added to the response message
+				//	These stats for this CAS were added to the response message
 				//	and are no longer needed
-				dropCasStatistics(newCasReferenceId);
+				dropCasStatistics(newEntry.getCasReferenceId());
 				//	Increment number of CASes processed by this service
-				getServicePerformance().incrementNumberOfCASesProcessed();
-				getServicePerformance().incrementAnalysisTime(timeToProcessCAS);
-			}
-
-			LongNumericStatistic statistic = null;
-			if ( (statistic = getMonitor().getLongNumericStatistic("",Monitor.TotalAEProcessTime)) != null )
-			{
-				//	Increment how long it took to process the input CAS. This timer is exposed via JMX
-				statistic.increment(totalProcessTime);
-			}
-/*			
-			if (newCasReferenceId != null)
-			{
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_completed_analysis__FINEST", new Object[] { Thread.currentThread().getName(), getComponentName(), newCasReferenceId, (double) (System.nanoTime() - time) / (double) 1000000 });
-			}
-			else
-			{
-				UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_completed_analysis__FINEST", new Object[] { Thread.currentThread().getName(), getComponentName(), aCasReferenceId, (double) (System.nanoTime() - time) / (double) 1000000 });
-
+				sequence++;
 			}
-*/
-			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_completed_analysis__FINEST", new Object[] { Thread.currentThread().getName(), getComponentName(), aCasReferenceId, (double) (System.nanoTime() - time) / (double) 1000000 });
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_completed_analysis__FINEST", new Object[] { Thread.currentThread().getName(), getComponentName(), aCasReferenceId, (double) (super.getCpuTime() - time) / (double) 1000000 });
 			getMonitor().resetCountingStatistic("", Monitor.ProcessErrorCount);
+			
+			// Store total time spent processing this input CAS
 			getCasStatistics(aCasReferenceId).incrementAnalysisTime(totalProcessTime);
-			//	Aggregate total time spent processing the input CAS
-			getServicePerformance().incrementAnalysisTime(totalProcessTime);
-			getOutputChannel().sendReply(aCasReferenceId, anEndpoint);
+			//	Aggregate total time spent processing in this service. This is separate from per CAS stats above 
+//			getServicePerformance().incrementAnalysisTime(totalProcessTime);
+			synchronized( cmOutstandingCASes )
+			{
+				if ( cmOutstandingCASes.size() == 0)
+				{
+					inputCASReturned = true;
+					
+					//	Return an input CAS to the client if there are no outstanding child CASes in play
+					getOutputChannel().sendReply(aCasReferenceId, anEndpoint);
+				}
+				else
+				{
+					//	Change the state of the input CAS. Since the input CAS is not returned to the client
+					//	until all children of this CAS has been fully processed we keep the input in the cache.
+					//	The client will send Free CAS Notifications to release CASes produced here. When the
+					//	last child CAS is freed, the input CAS is allowed to be returned to the client.
+					inputCASEntry.setPendingReply(true);
+				}
+			}
 		}
 		catch ( Throwable e)
 		{
@@ -507,8 +495,9 @@
 
 					getInProcessCache().releaseCASesProducedFromInputCAS(aCasReferenceId);
 				}
-				else
+				else if ( inputCASReturned )
 				{
+					//	Remove input CAS cache entry if the CAS has been sent to the client
 					dropCAS(aCasReferenceId, true);
 				}
 			}
@@ -620,32 +609,6 @@
 	{
 		return super.getMetaData().getName();
 	}
-	public void releaseNextCas()
-	{
-		synchronized(syncObject)
-		{
-			if ( cmOutstandingCASes.size() > 0 )
-			{
-				try
-				{
-					String casReferenceId = (String)cmOutstandingCASes.remove(0);
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-			                "releaseNextCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_release_cas_req__FINE",
-			                new Object[] { getName(), casReferenceId });
-					if ( casReferenceId != null && getInProcessCache().entryExists(casReferenceId))
-					{
-						dropCAS(casReferenceId, true);
-					}
-				}
-				catch( Exception e)
-				{
-					UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-			                "releaseNextCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
-			                new Object[] { e});
-				}
-			}
-		}
-	}
 	
 	public void setAnalysisEngineInstancePool( AnalysisEngineInstancePool aPool)
 	{
@@ -665,6 +628,10 @@
 		}
 		
 		serviceInfo.setState("Running");
+		if ( isCasMultiplier() )
+		{
+			serviceInfo.setCASMultiplier();
+		}
 		return serviceInfo;
 	}
 	
@@ -683,7 +650,6 @@
 		if ( cmOutstandingCASes != null )
 		{
 			cmOutstandingCASes.clear();
-			cmOutstandingCASes = null;
 		}
 		if ( aeList != null )
 		{
@@ -691,4 +657,5 @@
 			aeList = null;
 		}
 	}
+	
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/UimacppServiceController.java Fri Aug 22 11:53:05 2008
@@ -205,7 +205,7 @@
       {
         //  If there is an exception, notify listener with failure
         if ( e != null ) {
-          (this.listeners.get(i)).notifyOnInitializationFailure(e);
+          (this.listeners.get(i)).notifyOnInitializationFailure( e);
         }
         // else, Success!
         else {

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Fri Aug 22 11:53:05 2008
@@ -138,6 +138,7 @@
 	}
 	public boolean handleError(Throwable t, ErrorContext anErrorContext, AnalysisEngineController aController)
 	{
+		CacheEntry parentCasCacheEntry = null;
 		
 		if ( !isHandlerForError(anErrorContext, AsynchAEMessage.Process))
 		{
@@ -372,7 +373,6 @@
 			  }
 			  catch( Exception exc) {}
 			}
-	
 			//	Check if the caller has already decremented number of subordinates. This property is only
 			//	set in the Aggregate's finalStep() method before the CAS is sent back to the client. If
 			//	there was a problem sending the CAS to the client, we dont want to update the counter 
@@ -388,12 +388,11 @@
 					{
 						try
 						{
-							CacheEntry parentCasCacheEntry = aController.getInProcessCache().
+							parentCasCacheEntry = aController.getInProcessCache().
 																getCacheEntryForCAS(parentCasReferenceId);
 							synchronized( parentCasCacheEntry )
 							{
-								((AggregateAnalysisEngineController)aController).
-								decrementCasSubordinateCount( parentCasCacheEntry);
+								parentCasCacheEntry.decrementSubordinateCasInPlayCount();
 								if ( parentCasCacheEntry.getSubordinateCasInPlayCount() == 0 &&
 									 parentCasCacheEntry.isPendingReply())
 								{
@@ -499,6 +498,11 @@
 			}			
 			if ( casReferenceId != null && aController instanceof AggregateAnalysisEngineController )
 			{
+				if ( parentCasCacheEntry != null && parentCasCacheEntry.getSubordinateCasInPlayCount() == 0 &&
+						 parentCasCacheEntry.isPendingReply())
+					{
+					((AggregateAnalysisEngineController)aController).finalStep(parentCasCacheEntry.getFinalStep(), parentCasCacheEntry.getCasReferenceId());
+					}
 				//	Cleanup state information from local caches
 				((AggregateAnalysisEngineController)aController).dropFlow(casReferenceId, true);
 				((AggregateAnalysisEngineController)aController).removeMessageOrigin(casReferenceId);

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/HandlerBase.java Fri Aug 22 11:53:05 2008
@@ -21,11 +21,14 @@
 
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.InProcessCache.CacheEntry;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
+import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.error.ErrorContext;
+import org.apache.uima.aae.jmx.ServicePerformance;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.MessageContext;
 import org.apache.uima.aae.message.UIMAMessage;
@@ -165,6 +168,7 @@
 
 	public Handler getDelegate()
     {
+//		System.out.println("In getDelegate() - Returning:"+delegateHandler.getName());
     	return delegateHandler;
     }
     
@@ -217,5 +221,156 @@
 	{
 	}
 	
+	protected synchronized void aggregateDelegateStats(MessageContext aMessageContext, String aCasReferenceId) throws AsynchAEException
+	{
+		String delegateKey = "";
+		try
+		{
+			
+			delegateKey = ((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+			CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+			if ( entry == null )
+			{
+				throw new AsynchAEException("CasReferenceId:"+aCasReferenceId+" Not Found in the Cache.");
+			}
+			CacheEntry inputCasEntry = null;
+			String inputCasReferenceId = entry.getInputCasReferenceId();
+			ServicePerformance casStats = 
+				((AggregateAnalysisEngineController)getController()).getCasStatistics(aCasReferenceId);
+			if ( inputCasReferenceId != null && 
+				 getController().getInProcessCache().entryExists(inputCasReferenceId) )
+			{
+				String casProducerKey = entry.getCasProducerKey();
+				if ( casProducerKey != null &&
+					((AggregateAnalysisEngineController)getController()).
+						isDelegateKeyValid(casProducerKey) )
+				{
+					//	Get entry for the input CAS
+					inputCasEntry = getController().
+								getInProcessCache().
+									getCacheEntryForCAS(inputCasReferenceId);
+				}
+				
+			}
+			ServicePerformance delegateServicePerformance =
+				((AggregateAnalysisEngineController)getController()).getServicePerformance(delegateKey);
+
+			if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS))
+			{
+				long timeToSerializeCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
+				if ( timeToSerializeCAS > 0)
+				{
+					if ( delegateServicePerformance != null )
+					{
+						delegateServicePerformance.
+						incrementCasSerializationTime(timeToSerializeCAS);
+					}
+				}
+			}
+			if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS))
+			{
+				long timeToDeserializeCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
+				if ( timeToDeserializeCAS > 0 )
+				{
+					if ( delegateServicePerformance != null )
+					{
+						delegateServicePerformance.
+							incrementCasDeserializationTime(timeToDeserializeCAS);
+					}
+				}
+			}
+
+			if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime))
+			{
+				long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime)).longValue();
+				if ( idleTime > 0 && delegateServicePerformance != null )
+				{
+					Endpoint endp = aMessageContext.getEndpoint();
+					if ( endp != null && endp.isRemote())
+					{
+						delegateServicePerformance.incrementIdleTime(idleTime);
+					}
+				}
+			}
+			
+			if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS))
+			{
+				long timeWaitingForCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
+				if ( aMessageContext.getEndpoint().isRemote())
+				{
+					entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+					if ( inputCasEntry != null )
+					{
+						inputCasEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
+					}
+				}
+			}
+			if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS))
+			{
+				long timeInProcessCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
+				Endpoint endp = aMessageContext.getEndpoint();
+				if ( endp != null && endp.isRemote())
+				{
+					if ( delegateServicePerformance != null )
+					{
+						delegateServicePerformance.incrementAnalysisTime(timeInProcessCAS);
+					}
+				}
+				else 
+				{
+					getController().getServicePerformance().incrementAnalysisTime(timeInProcessCAS);
+				}
+				if ( inputCasReferenceId != null )
+				{
+					ServicePerformance inputCasStats = 
+						((AggregateAnalysisEngineController)getController()).
+							getCasStatistics(inputCasReferenceId);
+					// Update processing time for this CAS
+					if ( inputCasStats != null )
+					{
+						inputCasStats.incrementAnalysisTime(timeInProcessCAS);
+					}
+				}
+			}
+		}
+		catch( AsynchAEException e)
+		{
+			throw e;
+		}
+		catch( Exception e)
+		{
+			throw new AsynchAEException(e);
+		}
+	}
+	protected void computeStats(MessageContext aMessageContext, String aCasReferenceId) throws AsynchAEException
+	{
+		if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService))
+		{
+			long departureTime = getController().getTime(aCasReferenceId, aMessageContext.getEndpoint().getEndpoint());
+			long currentTime = System.nanoTime();
+			long roundTrip = currentTime - departureTime;
+			long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
+			long totalTimeInComms = currentTime - (departureTime - timeInService);
+
+			
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+	                "computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_roundtrip_time__FINE",
+	                new Object[] { aCasReferenceId, aMessageContext.getEndpoint(),(double) roundTrip / (double) 1000000 });
+
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+	                "computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_delegate__FINE",
+	                new Object[] { aCasReferenceId, (double) timeInService / (double) 1000000, aMessageContext.getEndpoint() });
+
+			UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+	                "computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_comms__FINE",
+	                new Object[] { aCasReferenceId, (double) totalTimeInComms / (double) 1000000, aMessageContext.getEndpoint() });
+		}
+		
+			if ( getController() instanceof AggregateAnalysisEngineController )
+			{
+				aggregateDelegateStats( aMessageContext, aCasReferenceId );
+			}			
+	}
+
 
 }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataRequestHandler_impl.java Fri Aug 22 11:53:05 2008
@@ -78,7 +78,6 @@
 						UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
 				                "handle", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_msg_for_next_handler__FINEST",
 				                new Object[] { messageType });
-
 						super.getDelegate().handle(anObjectToHandle);
 					}
 					else
@@ -92,6 +91,7 @@
 			}
 			catch( Exception e)
 			{
+				e.printStackTrace();
 				getController().getErrorHandlerChain().handle(e, HandlerBase.populateErrorContext( (MessageContext)anObjectToHandle ), getController());			
 			}
 		}

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/MetadataResponseHandler_impl.java Fri Aug 22 11:53:05 2008
@@ -26,6 +26,7 @@
 import org.apache.uima.aae.handler.HandlerBase;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UIMAMessage;
 import org.apache.uima.util.Level;
 
 public class MetadataResponseHandler_impl extends HandlerBase
@@ -65,7 +66,18 @@
 
 						String analysisEngineMetadata = ((MessageContext)anObjectToHandle).getStringMessage();
 						String fromEndpoint = ((MessageContext)anObjectToHandle).getMessageStringProperty(AsynchAEMessage.MessageFrom);
-						((AggregateAnalysisEngineController) getController()).mergeTypeSystem(analysisEngineMetadata, fromEndpoint);
+						String fromServer = null;
+						if ( ((MessageContext)anObjectToHandle).propertyExists(AsynchAEMessage.EndpointServer))
+						{
+							
+							fromServer =((MessageContext)anObjectToHandle).getMessageStringProperty(AsynchAEMessage.EndpointServer); 
+
+						}
+						else if ( ((MessageContext)anObjectToHandle).propertyExists(UIMAMessage.ServerURI)) 
+						{
+							fromServer = ((MessageContext)anObjectToHandle).getMessageStringProperty(UIMAMessage.ServerURI);
+						}
+						((AggregateAnalysisEngineController) getController()).mergeTypeSystem(analysisEngineMetadata, fromEndpoint, fromServer);
 					}
 				}
 				else