You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/11/24 20:18:20 UTC

svn commit: r720263 - in /incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main: java/org/apache/uima/aae/controller/ java/org/apache/uima/aae/error/handler/ java/org/apache/uima/aae/handler/input/ java/org/apache/uima/aae/spi/transport/vm/ resou...

Author: eae
Date: Mon Nov 24 11:18:20 2008
New Revision: 720263

URL: http://svn.apache.org/viewvc?rev=720263&view=rev
Log:
UIMA-1232 commit JC patch. Note that testProcessParallelFlowWithDelegateDisable hangs with this patch.

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Mon Nov 24 11:18:20 2008
@@ -921,45 +921,45 @@
 		{
 			if (aCasReferenceId != null)
 			{
-				try
-				{
-					//	Check if a Flow object has been previously generated for the Cas.
-					if (flowMap.containsKey(aCasReferenceId))
-					{
-		         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-		           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_retrieve_flow_object__FINEST", new Object[] { aCasReferenceId });
-		         }
-						synchronized( flowMap)
-						{
-							flow = (FlowContainer) flowMap.get(aCasReferenceId);
-						}
-					}
-					else
-					{
-		         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-		           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_new_flow_object__FINEST", new Object[] { aCasReferenceId });
-		         }
-						synchronized( flowControllerContainer )
-						{
-							flow = flowControllerContainer.computeFlow(aCAS);
-						}
-						// Save the Flow Object in a cache. Flow exists in the cache
-						// 	until the CAS is fully processed or it is
-						// explicitly deleted when processing of this CAS cannot
-						// continue
-						synchronized( flowMap )
-						{
-							flowMap.put(aCasReferenceId, flow);
-						}
-	          // Check if the local cache already contains an entry for the Cas id.
-	          // A colocated Cas Multiplier may have already registered this CAS 
-	          // in the parent's controller
-	          if ( localCache.lookupEntry(aCasReferenceId) == null ) {
-	            //  Add this Cas Id to the local cache. Every input CAS goes through here
-	            localCache.createCasStateEntry(aCasReferenceId);
-	          }
-					}
-				}
+        try
+        {
+          //  Check if a Flow object has been previously generated for the Cas.
+          if (flowMap.containsKey(aCasReferenceId))
+          {
+             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+               UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_retrieve_flow_object__FINEST", new Object[] { aCasReferenceId });
+             }
+            synchronized( flowMap)
+            {
+              flow = (FlowContainer) flowMap.get(aCasReferenceId);
+            }
+          }
+          else
+          {
+             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+               UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_new_flow_object__FINEST", new Object[] { aCasReferenceId });
+             }
+            synchronized( flowControllerContainer )
+            {
+              flow = flowControllerContainer.computeFlow(aCAS);
+            }
+            // Save the Flow Object in a cache. Flow exists in the cache
+            //  until the CAS is fully processed or it is
+            // explicitly deleted when processing of this CAS cannot
+            // continue
+            synchronized( flowMap )
+            {
+              flowMap.put(aCasReferenceId, flow);
+            }
+            // Check if the local cache already contains an entry for the Cas id.
+            // A colocated Cas Multiplier may have already registered this CAS 
+            // in the parent's controller
+            if ( localCache.lookupEntry(aCasReferenceId) == null ) {
+              //  Add this Cas Id to the local cache. Every input CAS goes through here
+              localCache.createCasStateEntry(aCasReferenceId);
+            }
+          }
+        }
 				catch( Exception ex)
 				{
 					//	Any error here is automatic termination
@@ -1167,7 +1167,10 @@
 						ServiceInfo serviceInfo = endpoint.getServiceInfo();
 						PrimitiveServiceInfo pServiceInfo = new PrimitiveServiceInfo();
 						pServiceInfo.setBrokerURL(serviceInfo.getBrokerURL());
-						pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
+            pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
+            if ( endpoint.getDestination() != null ) {
+              pServiceInfo.setReplyQueueName(endpoint.getDestination().toString());
+            }
 						pServiceInfo.setState(serviceInfo.getState());
 						pServiceInfo.setAnalysisEngineInstanceCount(1);
 						
@@ -1265,6 +1268,7 @@
 			endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
 			
 			boolean sendReplyToClient = false;
+			boolean doSendReplyToClient = false;
 			
 			synchronized( super.finalStepMux)
 			{
@@ -1297,8 +1301,7 @@
           freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
           //  Lookup parent CAS in the local cache 
           parentCasStateEntry = localCache.lookupEntry(casStateEntry.getInputCasReferenceId());
-          parentCasStateEntry.decrementSubordinateCasInPlayCount();
-          casStateEntryDecremented = true;
+
           // Decrement this Cas parent child count if this is a top controller
         }
 	      //  If the CAS was generated by this component but the Flow Controller wants to drop it OR this component
@@ -1320,7 +1323,6 @@
 	          }
 	          //  Drop the CAS and remove cache entry for it
 	          dropCAS(aCasReferenceId, true);
-	          localCache.remove(aCasReferenceId);
 	          casDropped = true;
 
             //  If debug level=FINEST dump the entire cache
@@ -1336,22 +1338,35 @@
 	      } 
 	      else 
 	      {
-	        
-	        if ( casStateEntry.isSubordinate() && isTopLevelComponent())
-	        {
-	            casStateEntry.setWaitingForRelease(true);
+          if ( casStateEntry.isSubordinate() && isTopLevelComponent()) {
+            //  Change the state of the CAS entry to indicate that this CAS will await
+            //  explicit Release request from the client. Until the Request to Release
+            //  is received, the CAS will be parked in the Cache.
+            casStateEntry.setWaitingForRelease(true);
+          } else if ( parentCasStateEntry != null ){
+            
+            parentCasStateEntry.decrementSubordinateCasInPlayCount();
+            casStateEntryDecremented = true;
           }
-	        //  Send a reply to the Client. If the CAS is an input CAS it will be dropped
-	        cEndpoint = replyToClient( cacheEntry, casStateEntry );
-          localCache.remove(aCasReferenceId);
-          //  If debug level=FINEST dump the entire cache
-          localCache.dumpContents();
-	        if ( cEndpoint != null )
-	        {
-	          replySentToClient = true;
-	        }
+          doSendReplyToClient = true;
 	      }
 				
+			}  // synchronized
+			
+			if ( doSendReplyToClient ) {
+        //  Send a reply to the Client. If the CAS is an input CAS it will be dropped
+        cEndpoint = replyToClient( cacheEntry, casStateEntry );
+        if ( !endpoint.isRemote() ) {
+          // Remove entry from the local cache for this CAS. If the client
+          // is remote the entry was removed in replyToClient()
+          localCache.remove(aCasReferenceId);
+        }
+        //  If debug level=FINEST dump the entire cache
+        localCache.dumpContents();
+        if ( cEndpoint != null )
+        {
+          replySentToClient = true;
+        }
 			}
 
       if ( isSubordinate && releaseParentCas(casDropped, cEndpoint, parentCasStateEntry) )
@@ -1545,7 +1560,7 @@
 				}
 			}
 			//	Drop the CAS only if the client is remote and the CAS is an input CAS. 
-			//  If this CAS has a parent the client will send Realease CAS notification to release the CAS.
+			//  If this CAS has a parent the client will send Release CAS notification to release the CAS.
 			if ( endpoint.isRemote() && !casStateEntry.isSubordinate())
 			{
 				dropCAS(casStateEntry.getCasReferenceId(), true);
@@ -1558,7 +1573,11 @@
 		}
 		else
 		{
-		  System.out.println("!!!!!!!!!!!!!!! Controller:"+getComponentName()+" Origin Endpoint Not Found For Cas:"+casStateEntry.getCasReferenceId()+" Or Its Parent Cas:"+casStateEntry.getInputCasReferenceId());
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), 
+          "replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_client_endpoint_not_found__INFO", new Object[] { getComponentName(), casStateEntry.getCasReferenceId() });
+      }
+		  
 		}
 		return endpoint;
 	}
@@ -1671,8 +1690,8 @@
         UimaMessage message = 
           getTransport(anEndpoint.getEndpoint()).produceMessage(AsynchAEMessage.Process,AsynchAEMessage.Request,getName());
         message.addStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
-          //  Send reply back to the client. Use internal (non-jms) transport
-         getTransport(anEndpoint.getEndpoint()).getUimaMessageDispatcher().dispatch(message);
+        getTransport(anEndpoint.getEndpoint()).getUimaMessageDispatcher().dispatch(message);
+
       }
       catch( Exception e)
       {

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Mon Nov 24 11:18:20 2008
@@ -480,7 +480,6 @@
        UimaAsContext uimaAsContext2 = new UimaAsContext();
        // Set up as many reply threads as there are threads to process requests
        uimaAsContext2.setConcurrentConsumerCount(concurrentReplyConsumers);
-//       uimaAsContext2.setConcurrentConsumerCount(parentControllerReplyConsumerCount);
        uimaAsContext2.put("EndpointName", endpointName);
        UimaTransport parentVmTransport = parentController.getTransport(uimaAsContext2, endpointName);
        parentVmTransport.produceUimaMessageDispatcher(this, vmTransport);
@@ -489,7 +488,6 @@
        parentListener.initialize(uimaAsContext2);
        // Creates delegate's dispatcher. It is wired to send replies to the parent's listener.
        vmTransport.produceUimaMessageDispatcher(parentController,parentVmTransport);
-       //transports.put(parentController.getName(), parentVmTransport);
      }
 
 	 }
@@ -683,12 +681,6 @@
 		{
 			pServiceInfo = ((PrimitiveAnalysisEngineController)this).getServiceInfo();
 			servicePerformance.setProcessThreadCount(((PrimitiveAnalysisEngineController)this).getServiceInfo().getAnalysisEngineInstanceCount());
-			//	If this is a Cas Multiplier, add the key to the JMX MBean.
-			//	This will help the JMX Monitor to fetch the CM Cas Pool MBean
-			if ( isCasMultiplier() )
-			{
-				pServiceInfo.setServiceKey(getUimaContextAdmin().getQualifiedContextName());
-			}
 		}
 		else
 		{
@@ -696,6 +688,14 @@
 				((AggregateAnalysisEngineController)this).getServiceInfo();
 			pServiceInfo.setAggregate(true);
 		}
+    //  If this is a Cas Multiplier, add the key to the JMX MBean.
+    //  This will help the JMX Monitor to fetch the CM Cas Pool MBean
+    if ( isCasMultiplier() )
+    {
+      pServiceInfo.setServiceKey(getUimaContextAdmin().getQualifiedContextName());
+    }
+
+		
 		if ( pServiceInfo != null )
 		{
 			name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+serviceInfo.getLabel();
@@ -1997,10 +1997,6 @@
 					{
 						//	Release the CAS and remove a corresponding entry from the InProcess cache.
 						dropCAS(casReferenceId, true);
-	          if ( this instanceof AggregateAnalysisEngineController ) {
-	            ((AggregateAnalysisEngineController)this).getLocalCache().remove(casReferenceId);
-	          }
-						
 						//  Remove the Cas from the outstanding CAS list. The id of the Cas was
 						//	added to this list by the Cas Multiplier before the Cas was sent to 
 						//	to the client. 
@@ -2048,7 +2044,10 @@
 							synchronized( finalStepMux )
 							{
 		            if ( this instanceof AggregateAnalysisEngineController ) {
-	                casHasNoSubordinates = casStateEntry.getSubordinateCasInPlayCount() == 0;
+		              //  Decrement number of children for this CAS since we just released one above.
+		              casStateEntry.decrementSubordinateCasInPlayCount();
+
+		              casHasNoSubordinates = casStateEntry.getSubordinateCasInPlayCount() == 0;
 	                casPendingReply = casStateEntry.isPendingReply();
 		            } else {
 	                casHasNoSubordinates = getInProcessCache().hasNoSubordinates(cacheEntry.getCasReferenceId());
@@ -2275,50 +2274,50 @@
 		 */
 		public long getAnalysisTime()
 		{
-			Set<Long> set = threadStateMap.keySet();
-			Iterator<Long> it = set.iterator();
-			long totalCpuProcessTime = 0;
-			//	Iterate over all processing threads
-			while( it.hasNext())
-			{
-				long threadId = it.next();
-				synchronized( mux )
-				{
-					//	Fetch the next thread's stats
-					AnalysisThreadState threadState = threadStateMap.get(threadId);
-					//	If an Aggregate service, sum up the CPU times of all collocated
-					//	delegates.
-					if ( this instanceof AggregateAnalysisEngineController_impl )
-					{
-						//	Get a list of all colocated delegate controllers from the Aggregate
-						List<AnalysisEngineController> delegateControllerList = 
-							((AggregateAnalysisEngineController_impl)this).childControllerList; 							
-						//	Iterate over all colocated delegates
-						for( int i=0; i < delegateControllerList.size(); i++)
-						{	
-							//	Get the next delegate's controller
-							AnalysisEngineController delegateController =
-								(AnalysisEngineController)delegateControllerList.get(i);
-							if ( delegateController != null && !delegateController.isStopped())
-							{
-								//	get the CPU time for all processing threads in the current controller
-								totalCpuProcessTime += delegateController.getAnalysisTime();
-							}
-						}
-					}
-					else  // Primitive Controller
-					{
-						//	Get the CPU time of a thread with a given ID
-						totalCpuProcessTime += getCpuTime(threadId);
-					}
-					//	Subtract serialization and deserialization times from the total CPU used
-					if ( totalCpuProcessTime > 0 )
-					{
-						totalCpuProcessTime -= threadState.getDeserializationTime();
-						totalCpuProcessTime -= threadState.getSerializationTime();
-					}
-				}
-			}
+      long totalCpuProcessTime = 0;
+      synchronized( mux )
+      {
+        Set<Long> set = threadStateMap.keySet();
+        Iterator<Long> it = set.iterator();
+        //  Iterate over all processing threads
+        while( it.hasNext())
+        {
+          long threadId = it.next();
+            //  Fetch the next thread's stats
+            AnalysisThreadState threadState = threadStateMap.get(threadId);
+            //  If an Aggregate service, sum up the CPU times of all collocated
+            //  delegates.
+            if ( this instanceof AggregateAnalysisEngineController_impl )
+            {
+              //  Get a list of all colocated delegate controllers from the Aggregate
+              List<AnalysisEngineController> delegateControllerList = 
+                ((AggregateAnalysisEngineController_impl)this).childControllerList;               
+              //  Iterate over all colocated delegates
+              for( int i=0; i < delegateControllerList.size(); i++)
+              { 
+                //  Get the next delegate's controller
+                AnalysisEngineController delegateController =
+                  (AnalysisEngineController)delegateControllerList.get(i);
+                if ( delegateController != null && !delegateController.isStopped())
+                {
+                  //  get the CPU time for all processing threads in the current controller
+                  totalCpuProcessTime += delegateController.getAnalysisTime();
+                }
+              }
+            }
+            else  // Primitive Controller
+            {
+              //  Get the CPU time of a thread with a given ID
+              totalCpuProcessTime += getCpuTime(threadId);
+            }
+            //  Subtract serialization and deserialization times from the total CPU used
+            if ( totalCpuProcessTime > 0 )
+            {
+              totalCpuProcessTime -= threadState.getDeserializationTime();
+              totalCpuProcessTime -= threadState.getSerializationTime();
+            }
+        }
+      }
 			return totalCpuProcessTime;
 		}
 		/**

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Mon Nov 24 11:18:20 2008
@@ -324,6 +324,7 @@
 		}
 		//  Create a new entry in the local cache for the input CAS
     CasStateEntry parentCasStateEntry = getLocalCache().createCasStateEntry(aCasReferenceId);
+    long totalProcessTime = 0;  // stored total time spent producing ALL CASes
 		
 		boolean inputCASReturned = false;
 		boolean processingFailed = false;
@@ -338,7 +339,6 @@
 			//	Get input CAS entry from the InProcess cache
 			CacheEntry inputCASEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
 			long time = super.getCpuTime();
-			long totalProcessTime = 0;  // stored total time spent producing ALL CASes
 			
 			CasIterator casIterator = ae.processAndOutputNewCASes(aCAS);
 			
@@ -457,7 +457,6 @@
               message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
               long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process); 
               message.addLongProperty(AsynchAEMessage.IdleTime, iT );
-              
 		          getTransport(getName()).getUimaMessageDispatcher().dispatch(message);
 	      }
 	      else
@@ -465,14 +464,19 @@
 	          //  Send generated CAS to the client
 	          getOutputChannel().sendReply(newEntry, anEndpoint);
 	      }
-        //  Remove the new CAS state entry from the local cache
-        localCache.remove(newEntry.getCasReferenceId());
+        // Remove the new CAS state entry from the local cache if this a top level primitive.
+	      // If not top level, the client (an Aggregate) will remove this entry when this new
+	      // generated CAS reaches Final State.
+	      if ( isTopLevelComponent() ) {
+	        localCache.remove(newEntry.getCasReferenceId());
+	      }
 				
 				//	Remove Stats from the global Map associated with the new CAS
 				//	These stats for this CAS were added to the response message
 				//	and are no longer needed
 				dropCasStatistics(newEntry.getCasReferenceId());
-			}
+			} // while
+			
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_completed_analysis__FINEST", new Object[] { Thread.currentThread().getName(), getComponentName(), aCasReferenceId, (double) (super.getCpuTime() - time) / (double) 1000000 });
       }
@@ -522,6 +526,10 @@
            getOutputChannel().sendReply(aCasReferenceId, anEndpoint);
          }
 			}
+      //  Remove input CAS state entry from the local cache
+      if ( !isTopLevelComponent() ) {
+        localCache.remove(aCasReferenceId);
+      }
 		}
 		catch ( Throwable e)
 		{

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Mon Nov 24 11:18:20 2008
@@ -316,7 +316,7 @@
 		}
 
 		//	Dont increment errors for destinations that are clients of this service.
-		if ( !aController.isStopped() && (isRequest || !isEndpointTheClient ) )
+		if ( key != null && !aController.isStopped() && (isRequest || !isEndpointTheClient ) )
 		{
 			synchronized( monitor )
 			{

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java Mon Nov 24 11:18:20 2008
@@ -143,7 +143,7 @@
 
 	}
 
-	private synchronized void handleProcessResponseFromRemoteDelegate(MessageContext aMessageContext, String aDelegateKey)
+	private void handleProcessResponseFromRemoteDelegate(MessageContext aMessageContext, String aDelegateKey)
 	{
 		CAS cas = null;
 		String casReferenceId = null;
@@ -265,7 +265,6 @@
           }
         }
       }
-
       long timeToDeserializeCAS = getController().getCpuTime() - t1;
 
       getController().getServicePerformance().incrementCasDeserializationTime(timeToDeserializeCAS);
@@ -338,7 +337,7 @@
 	}
 	
 	
-	private synchronized void handleProcessResponseWithCASReference(MessageContext aMessageContext )
+	private void handleProcessResponseWithCASReference(MessageContext aMessageContext )
 	{
 		String casReferenceId = null;
 		CacheEntry cacheEntry = null;

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java Mon Nov 24 11:18:20 2008
@@ -73,7 +73,6 @@
     int requestType = 0;
     try {
       latch.await();
-
       if (UimaMessageValidator.isValidMessage(aMessage, controller)) {
         MessageContext msgContext = aMessage.toMessageContext(controller.getName());
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
@@ -83,7 +82,7 @@
         }
         if (!concurrentThreads.containsKey(Thread.currentThread().getId())) {
           Thread.currentThread().setName(
-                  Thread.currentThread().getName() + "::" + controller.getComponentName());
+                  Thread.currentThread().getName() + "::" + controller.getComponentName()+ "::"+Thread.currentThread().getId());
           // Store the thread identifier in the map. The value stored is not important. All
           // we want is to save the fact that the thread name has been changed. And we only
           // want to change it once

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java Mon Nov 24 11:18:20 2008
@@ -124,10 +124,9 @@
       int concurrentConsumerCount = context.getConcurrentConsumerCount();
       // Create a ThreadPoolExecutor with as many threads as needed. The pool has 
       // a fixed number of threads that never expire and are never passivated.
-      executor = new ThreadPoolExecutor(1, concurrentConsumerCount, Long.MAX_VALUE,
+      executor = new ThreadPoolExecutor(concurrentConsumerCount, concurrentConsumerCount, Long.MAX_VALUE,
               TimeUnit.NANOSECONDS, workQueue);
-      // executor.prestartAllCoreThreads();
-
+      executor.prestartAllCoreThreads();
     }
     return executor;
   }

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties Mon Nov 24 11:18:20 2008
@@ -174,3 +174,6 @@
 UIMAEE_cas_is_invalid_remove_from_cache_failed__FINE = >>> Cas Id: {0} Not in Cache. Must Have Been Already Removed
 UIMAEE_disable_endpoint__INFO = Controller: {0} Disabling Delegate: {1} Due To Excessive Errors
 UIMAEE_process_exception__INFO = Controller: {0} Handling Exception. Delegate: {1} Cas Id: {2}
+UIMAEE_client_endpoint_not_found__INFO = Controller: {0} Unable to Send CAS: {1} to Client. Client Endpoint Not Found.
+UIMAEE_local_cache_increment_child_count__FINEST = Controller: {0} Incremented CAS: {1} Child Count. Current Count: {2}
+UIMAEE_local_cache_increment_child_count__FINEST = Controller: {0} Decremented CAS: {1} Child Count. Current Count: {2}