You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/11/24 20:18:20 UTC
svn commit: r720263 - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main:
java/org/apache/uima/aae/controller/ java/org/apache/uima/aae/error/handler/
java/org/apache/uima/aae/handler/input/
java/org/apache/uima/aae/spi/transport/vm/ resou...
Author: eae
Date: Mon Nov 24 11:18:20 2008
New Revision: 720263
URL: http://svn.apache.org/viewvc?rev=720263&view=rev
Log:
UIMA-1232 commit JC patch. Note that testProcessParallelFlowWithDelegateDisable hangs with this patch.
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Mon Nov 24 11:18:20 2008
@@ -921,45 +921,45 @@
{
if (aCasReferenceId != null)
{
- try
- {
- // Check if a Flow object has been previously generated for the Cas.
- if (flowMap.containsKey(aCasReferenceId))
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_retrieve_flow_object__FINEST", new Object[] { aCasReferenceId });
- }
- synchronized( flowMap)
- {
- flow = (FlowContainer) flowMap.get(aCasReferenceId);
- }
- }
- else
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_new_flow_object__FINEST", new Object[] { aCasReferenceId });
- }
- synchronized( flowControllerContainer )
- {
- flow = flowControllerContainer.computeFlow(aCAS);
- }
- // Save the Flow Object in a cache. Flow exists in the cache
- // until the CAS is fully processed or it is
- // explicitly deleted when processing of this CAS cannot
- // continue
- synchronized( flowMap )
- {
- flowMap.put(aCasReferenceId, flow);
- }
- // Check if the local cache already contains an entry for the Cas id.
- // A colocated Cas Multiplier may have already registered this CAS
- // in the parent's controller
- if ( localCache.lookupEntry(aCasReferenceId) == null ) {
- // Add this Cas Id to the local cache. Every input CAS goes through here
- localCache.createCasStateEntry(aCasReferenceId);
- }
- }
- }
+ try
+ {
+ // Check if a Flow object has been previously generated for the Cas.
+ if (flowMap.containsKey(aCasReferenceId))
+ {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_retrieve_flow_object__FINEST", new Object[] { aCasReferenceId });
+ }
+ synchronized( flowMap)
+ {
+ flow = (FlowContainer) flowMap.get(aCasReferenceId);
+ }
+ }
+ else
+ {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_new_flow_object__FINEST", new Object[] { aCasReferenceId });
+ }
+ synchronized( flowControllerContainer )
+ {
+ flow = flowControllerContainer.computeFlow(aCAS);
+ }
+ // Save the Flow Object in a cache. Flow exists in the cache
+ // until the CAS is fully processed or it is
+ // explicitly deleted when processing of this CAS cannot
+ // continue
+ synchronized( flowMap )
+ {
+ flowMap.put(aCasReferenceId, flow);
+ }
+ // Check if the local cache already contains an entry for the Cas id.
+ // A colocated Cas Multiplier may have already registered this CAS
+ // in the parent's controller
+ if ( localCache.lookupEntry(aCasReferenceId) == null ) {
+ // Add this Cas Id to the local cache. Every input CAS goes through here
+ localCache.createCasStateEntry(aCasReferenceId);
+ }
+ }
+ }
catch( Exception ex)
{
// Any error here is automatic termination
@@ -1167,7 +1167,10 @@
ServiceInfo serviceInfo = endpoint.getServiceInfo();
PrimitiveServiceInfo pServiceInfo = new PrimitiveServiceInfo();
pServiceInfo.setBrokerURL(serviceInfo.getBrokerURL());
- pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
+ pServiceInfo.setInputQueueName(serviceInfo.getInputQueueName());
+ if ( endpoint.getDestination() != null ) {
+ pServiceInfo.setReplyQueueName(endpoint.getDestination().toString());
+ }
pServiceInfo.setState(serviceInfo.getState());
pServiceInfo.setAnalysisEngineInstanceCount(1);
@@ -1265,6 +1268,7 @@
endpoint = getInProcessCache().getEndpoint(null, aCasReferenceId);
boolean sendReplyToClient = false;
+ boolean doSendReplyToClient = false;
synchronized( super.finalStepMux)
{
@@ -1297,8 +1301,7 @@
freeCasEndpoint = cacheEntry.getFreeCasEndpoint();
// Lookup parent CAS in the local cache
parentCasStateEntry = localCache.lookupEntry(casStateEntry.getInputCasReferenceId());
- parentCasStateEntry.decrementSubordinateCasInPlayCount();
- casStateEntryDecremented = true;
+
// Decrement this Cas parent child count if this is a top controller
}
// If the CAS was generated by this component but the Flow Controller wants to drop it OR this component
@@ -1320,7 +1323,6 @@
}
// Drop the CAS and remove cache entry for it
dropCAS(aCasReferenceId, true);
- localCache.remove(aCasReferenceId);
casDropped = true;
// If debug level=FINEST dump the entire cache
@@ -1336,22 +1338,35 @@
}
else
{
-
- if ( casStateEntry.isSubordinate() && isTopLevelComponent())
- {
- casStateEntry.setWaitingForRelease(true);
+ if ( casStateEntry.isSubordinate() && isTopLevelComponent()) {
+ // Change the state of the CAS entry to indicate that this CAS will await
+ // explicit Release request from the client. Until the Request to Release
+ // is received, the CAS will be parked in the Cache.
+ casStateEntry.setWaitingForRelease(true);
+ } else if ( parentCasStateEntry != null ){
+
+ parentCasStateEntry.decrementSubordinateCasInPlayCount();
+ casStateEntryDecremented = true;
}
- // Send a reply to the Client. If the CAS is an input CAS it will be dropped
- cEndpoint = replyToClient( cacheEntry, casStateEntry );
- localCache.remove(aCasReferenceId);
- // If debug level=FINEST dump the entire cache
- localCache.dumpContents();
- if ( cEndpoint != null )
- {
- replySentToClient = true;
- }
+ doSendReplyToClient = true;
}
+ } // synchronized
+
+ if ( doSendReplyToClient ) {
+ // Send a reply to the Client. If the CAS is an input CAS it will be dropped
+ cEndpoint = replyToClient( cacheEntry, casStateEntry );
+ if ( !endpoint.isRemote() ) {
+ // Remove entry from the local cache for this CAS. If the client
+ // is remote the entry was removed in replyToClient()
+ localCache.remove(aCasReferenceId);
+ }
+ // If debug level=FINEST dump the entire cache
+ localCache.dumpContents();
+ if ( cEndpoint != null )
+ {
+ replySentToClient = true;
+ }
}
if ( isSubordinate && releaseParentCas(casDropped, cEndpoint, parentCasStateEntry) )
@@ -1545,7 +1560,7 @@
}
}
// Drop the CAS only if the client is remote and the CAS is an input CAS.
- // If this CAS has a parent the client will send Realease CAS notification to release the CAS.
+ // If this CAS has a parent the client will send Release CAS notification to release the CAS.
if ( endpoint.isRemote() && !casStateEntry.isSubordinate())
{
dropCAS(casStateEntry.getCasReferenceId(), true);
@@ -1558,7 +1573,11 @@
}
else
{
- System.out.println("!!!!!!!!!!!!!!! Controller:"+getComponentName()+" Origin Endpoint Not Found For Cas:"+casStateEntry.getCasReferenceId()+" Or Its Parent Cas:"+casStateEntry.getInputCasReferenceId());
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_client_endpoint_not_found__INFO", new Object[] { getComponentName(), casStateEntry.getCasReferenceId() });
+ }
+
}
return endpoint;
}
@@ -1671,8 +1690,8 @@
UimaMessage message =
getTransport(anEndpoint.getEndpoint()).produceMessage(AsynchAEMessage.Process,AsynchAEMessage.Request,getName());
message.addStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
- // Send reply back to the client. Use internal (non-jms) transport
- getTransport(anEndpoint.getEndpoint()).getUimaMessageDispatcher().dispatch(message);
+ getTransport(anEndpoint.getEndpoint()).getUimaMessageDispatcher().dispatch(message);
+
}
catch( Exception e)
{
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Mon Nov 24 11:18:20 2008
@@ -480,7 +480,6 @@
UimaAsContext uimaAsContext2 = new UimaAsContext();
// Set up as many reply threads as there are threads to process requests
uimaAsContext2.setConcurrentConsumerCount(concurrentReplyConsumers);
-// uimaAsContext2.setConcurrentConsumerCount(parentControllerReplyConsumerCount);
uimaAsContext2.put("EndpointName", endpointName);
UimaTransport parentVmTransport = parentController.getTransport(uimaAsContext2, endpointName);
parentVmTransport.produceUimaMessageDispatcher(this, vmTransport);
@@ -489,7 +488,6 @@
parentListener.initialize(uimaAsContext2);
// Creates delegate's dispatcher. It is wired to send replies to the parent's listener.
vmTransport.produceUimaMessageDispatcher(parentController,parentVmTransport);
- //transports.put(parentController.getName(), parentVmTransport);
}
}
@@ -683,12 +681,6 @@
{
pServiceInfo = ((PrimitiveAnalysisEngineController)this).getServiceInfo();
servicePerformance.setProcessThreadCount(((PrimitiveAnalysisEngineController)this).getServiceInfo().getAnalysisEngineInstanceCount());
- // If this is a Cas Multiplier, add the key to the JMX MBean.
- // This will help the JMX Monitor to fetch the CM Cas Pool MBean
- if ( isCasMultiplier() )
- {
- pServiceInfo.setServiceKey(getUimaContextAdmin().getQualifiedContextName());
- }
}
else
{
@@ -696,6 +688,14 @@
((AggregateAnalysisEngineController)this).getServiceInfo();
pServiceInfo.setAggregate(true);
}
+ // If this is a Cas Multiplier, add the key to the JMX MBean.
+ // This will help the JMX Monitor to fetch the CM Cas Pool MBean
+ if ( isCasMultiplier() )
+ {
+ pServiceInfo.setServiceKey(getUimaContextAdmin().getQualifiedContextName());
+ }
+
+
if ( pServiceInfo != null )
{
name = jmxManagement.getJmxDomain()+key_value_list+",name="+thisComponentName+"_"+serviceInfo.getLabel();
@@ -1997,10 +1997,6 @@
{
// Release the CAS and remove a corresponding entry from the InProcess cache.
dropCAS(casReferenceId, true);
- if ( this instanceof AggregateAnalysisEngineController ) {
- ((AggregateAnalysisEngineController)this).getLocalCache().remove(casReferenceId);
- }
-
// Remove the Cas from the outstanding CAS list. The id of the Cas was
// added to this list by the Cas Multiplier before the Cas was sent to
// to the client.
@@ -2048,7 +2044,10 @@
synchronized( finalStepMux )
{
if ( this instanceof AggregateAnalysisEngineController ) {
- casHasNoSubordinates = casStateEntry.getSubordinateCasInPlayCount() == 0;
+ // Decrement number of children for this CAS since we just released one above.
+ casStateEntry.decrementSubordinateCasInPlayCount();
+
+ casHasNoSubordinates = casStateEntry.getSubordinateCasInPlayCount() == 0;
casPendingReply = casStateEntry.isPendingReply();
} else {
casHasNoSubordinates = getInProcessCache().hasNoSubordinates(cacheEntry.getCasReferenceId());
@@ -2275,50 +2274,50 @@
*/
public long getAnalysisTime()
{
- Set<Long> set = threadStateMap.keySet();
- Iterator<Long> it = set.iterator();
- long totalCpuProcessTime = 0;
- // Iterate over all processing threads
- while( it.hasNext())
- {
- long threadId = it.next();
- synchronized( mux )
- {
- // Fetch the next thread's stats
- AnalysisThreadState threadState = threadStateMap.get(threadId);
- // If an Aggregate service, sum up the CPU times of all collocated
- // delegates.
- if ( this instanceof AggregateAnalysisEngineController_impl )
- {
- // Get a list of all colocated delegate controllers from the Aggregate
- List<AnalysisEngineController> delegateControllerList =
- ((AggregateAnalysisEngineController_impl)this).childControllerList;
- // Iterate over all colocated delegates
- for( int i=0; i < delegateControllerList.size(); i++)
- {
- // Get the next delegate's controller
- AnalysisEngineController delegateController =
- (AnalysisEngineController)delegateControllerList.get(i);
- if ( delegateController != null && !delegateController.isStopped())
- {
- // get the CPU time for all processing threads in the current controller
- totalCpuProcessTime += delegateController.getAnalysisTime();
- }
- }
- }
- else // Primitive Controller
- {
- // Get the CPU time of a thread with a given ID
- totalCpuProcessTime += getCpuTime(threadId);
- }
- // Subtract serialization and deserialization times from the total CPU used
- if ( totalCpuProcessTime > 0 )
- {
- totalCpuProcessTime -= threadState.getDeserializationTime();
- totalCpuProcessTime -= threadState.getSerializationTime();
- }
- }
- }
+ long totalCpuProcessTime = 0;
+ synchronized( mux )
+ {
+ Set<Long> set = threadStateMap.keySet();
+ Iterator<Long> it = set.iterator();
+ // Iterate over all processing threads
+ while( it.hasNext())
+ {
+ long threadId = it.next();
+ // Fetch the next thread's stats
+ AnalysisThreadState threadState = threadStateMap.get(threadId);
+ // If an Aggregate service, sum up the CPU times of all collocated
+ // delegates.
+ if ( this instanceof AggregateAnalysisEngineController_impl )
+ {
+ // Get a list of all colocated delegate controllers from the Aggregate
+ List<AnalysisEngineController> delegateControllerList =
+ ((AggregateAnalysisEngineController_impl)this).childControllerList;
+ // Iterate over all colocated delegates
+ for( int i=0; i < delegateControllerList.size(); i++)
+ {
+ // Get the next delegate's controller
+ AnalysisEngineController delegateController =
+ (AnalysisEngineController)delegateControllerList.get(i);
+ if ( delegateController != null && !delegateController.isStopped())
+ {
+ // get the CPU time for all processing threads in the current controller
+ totalCpuProcessTime += delegateController.getAnalysisTime();
+ }
+ }
+ }
+ else // Primitive Controller
+ {
+ // Get the CPU time of a thread with a given ID
+ totalCpuProcessTime += getCpuTime(threadId);
+ }
+ // Subtract serialization and deserialization times from the total CPU used
+ if ( totalCpuProcessTime > 0 )
+ {
+ totalCpuProcessTime -= threadState.getDeserializationTime();
+ totalCpuProcessTime -= threadState.getSerializationTime();
+ }
+ }
+ }
return totalCpuProcessTime;
}
/**
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/PrimitiveAnalysisEngineController_impl.java Mon Nov 24 11:18:20 2008
@@ -324,6 +324,7 @@
}
// Create a new entry in the local cache for the input CAS
CasStateEntry parentCasStateEntry = getLocalCache().createCasStateEntry(aCasReferenceId);
+ long totalProcessTime = 0; // stored total time spent producing ALL CASes
boolean inputCASReturned = false;
boolean processingFailed = false;
@@ -338,7 +339,6 @@
// Get input CAS entry from the InProcess cache
CacheEntry inputCASEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
long time = super.getCpuTime();
- long totalProcessTime = 0; // stored total time spent producing ALL CASes
CasIterator casIterator = ae.processAndOutputNewCASes(aCAS);
@@ -457,7 +457,6 @@
message.addLongProperty(AsynchAEMessage.TimeInProcessCAS, casStats.getRawAnalysisTime());
long iT = getIdleTimeBetweenProcessCalls(AsynchAEMessage.Process);
message.addLongProperty(AsynchAEMessage.IdleTime, iT );
-
getTransport(getName()).getUimaMessageDispatcher().dispatch(message);
}
else
@@ -465,14 +464,19 @@
// Send generated CAS to the client
getOutputChannel().sendReply(newEntry, anEndpoint);
}
- // Remove the new CAS state entry from the local cache
- localCache.remove(newEntry.getCasReferenceId());
+ // Remove the new CAS state entry from the local cache if this a top level primitive.
+ // If not top level, the client (an Aggregate) will remove this entry when this new
+ // generated CAS reaches Final State.
+ if ( isTopLevelComponent() ) {
+ localCache.remove(newEntry.getCasReferenceId());
+ }
// Remove Stats from the global Map associated with the new CAS
// These stats for this CAS were added to the response message
// and are no longer needed
dropCasStatistics(newEntry.getCasReferenceId());
- }
+ } // while
+
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, getClass().getName(), "process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_completed_analysis__FINEST", new Object[] { Thread.currentThread().getName(), getComponentName(), aCasReferenceId, (double) (super.getCpuTime() - time) / (double) 1000000 });
}
@@ -522,6 +526,10 @@
getOutputChannel().sendReply(aCasReferenceId, anEndpoint);
}
}
+ // Remove input CAS state entry from the local cache
+ if ( !isTopLevelComponent() ) {
+ localCache.remove(aCasReferenceId);
+ }
}
catch ( Throwable e)
{
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/error/handler/ProcessCasErrorHandler.java Mon Nov 24 11:18:20 2008
@@ -316,7 +316,7 @@
}
// Dont increment errors for destinations that are clients of this service.
- if ( !aController.isStopped() && (isRequest || !isEndpointTheClient ) )
+ if ( key != null && !aController.isStopped() && (isRequest || !isEndpointTheClient ) )
{
synchronized( monitor )
{
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java Mon Nov 24 11:18:20 2008
@@ -143,7 +143,7 @@
}
- private synchronized void handleProcessResponseFromRemoteDelegate(MessageContext aMessageContext, String aDelegateKey)
+ private void handleProcessResponseFromRemoteDelegate(MessageContext aMessageContext, String aDelegateKey)
{
CAS cas = null;
String casReferenceId = null;
@@ -265,7 +265,6 @@
}
}
}
-
long timeToDeserializeCAS = getController().getCpuTime() - t1;
getController().getServicePerformance().incrementCasDeserializationTime(timeToDeserializeCAS);
@@ -338,7 +337,7 @@
}
- private synchronized void handleProcessResponseWithCASReference(MessageContext aMessageContext )
+ private void handleProcessResponseWithCASReference(MessageContext aMessageContext )
{
String casReferenceId = null;
CacheEntry cacheEntry = null;
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/UimaVmMessageListener.java Mon Nov 24 11:18:20 2008
@@ -73,7 +73,6 @@
int requestType = 0;
try {
latch.await();
-
if (UimaMessageValidator.isValidMessage(aMessage, controller)) {
MessageContext msgContext = aMessage.toMessageContext(controller.getName());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
@@ -83,7 +82,7 @@
}
if (!concurrentThreads.containsKey(Thread.currentThread().getId())) {
Thread.currentThread().setName(
- Thread.currentThread().getName() + "::" + controller.getComponentName());
+ Thread.currentThread().getName() + "::" + controller.getComponentName()+ "::"+Thread.currentThread().getId());
// Store the thread identifier in the map. The value stored is not important. All
// we want is to save the fact that the thread name has been changed. And we only
// want to change it once
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/spi/transport/vm/VmTransport.java Mon Nov 24 11:18:20 2008
@@ -124,10 +124,9 @@
int concurrentConsumerCount = context.getConcurrentConsumerCount();
// Create a ThreadPoolExecutor with as many threads as needed. The pool has
// a fixed number of threads that never expire and are never passivated.
- executor = new ThreadPoolExecutor(1, concurrentConsumerCount, Long.MAX_VALUE,
+ executor = new ThreadPoolExecutor(concurrentConsumerCount, concurrentConsumerCount, Long.MAX_VALUE,
TimeUnit.NANOSECONDS, workQueue);
- // executor.prestartAllCoreThreads();
-
+ executor.prestartAllCoreThreads();
}
return executor;
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=720263&r1=720262&r2=720263&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties Mon Nov 24 11:18:20 2008
@@ -174,3 +174,6 @@
UIMAEE_cas_is_invalid_remove_from_cache_failed__FINE = >>> Cas Id: {0} Not in Cache. Must Have Been Already Removed
UIMAEE_disable_endpoint__INFO = Controller: {0} Disabling Delegate: {1} Due To Excessive Errors
UIMAEE_process_exception__INFO = Controller: {0} Handling Exception. Delegate: {1} Cas Id: {2}
+UIMAEE_client_endpoint_not_found__INFO = Controller: {0} Unable to Send CAS: {1} to Client. Client Endpoint Not Found.
+UIMAEE_local_cache_increment_child_count__FINEST = Controller: {0} Incremented CAS: {1} Child Count. Current Count: {2}
+UIMAEE_local_cache_increment_child_count__FINEST = Controller: {0} Decremented CAS: {1} Child Count. Current Count: {2}