You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by ea...@apache.org on 2008/08/22 20:53:06 UTC
svn commit: r688174 [3/3] - in
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main:
java/org/apache/uima/aae/ java/org/apache/uima/aae/client/
java/org/apache/uima/aae/controller/
java/org/apache/uima/aae/error/handler/ java/org/apache/uima/aa...
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Fri Aug 22 11:53:05 2008
@@ -25,6 +25,7 @@
import org.apache.uima.aae.InProcessCache.CacheEntry;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.controller.Endpoint_impl;
import org.apache.uima.aae.controller.PrimitiveAnalysisEngineController;
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.error.ErrorContext;
@@ -45,7 +46,8 @@
public class ProcessRequestHandler_impl extends HandlerBase
{
private static final Class CLASS_NAME = ProcessRequestHandler_impl.class;
-
+ private Object mux = new Object();
+
public ProcessRequestHandler_impl(String aName)
{
super(aName);
@@ -58,89 +60,287 @@
entry.incrementTimeWaitingForCAS( aTimeWaitingForCAS);
entry.incrementTimeToDeserializeCAS(aTimeToDeserializeCAS);
}
-
- private void handleProcessRequestWithXMI(MessageContext aMessageContext) throws AsynchAEException
+ private boolean messageContainsXMI(MessageContext aMessageContext, String casReferenceId) throws Exception
+ {
+ // Fetch serialized CAS from the message
+ String xmi = aMessageContext.getStringMessage();
+ // *****************************************************************
+ // ***** NO XMI In Message. Kick this back to sender with exception
+ // *****************************************************************
+ if ( xmi == null )
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_message_has_no_cargo__INFO",
+ new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ getController().
+ getOutputChannel().
+ sendReply(new InvalidMessageException("No XMI data in message"), casReferenceId, aMessageContext.getEndpoint(),AsynchAEMessage.Process);
+ // Dont process this empty message
+ return false;
+ }
+ return true;
+ }
+ private synchronized CAS getCAS( boolean fetchCASFromShadowCasPool, String shadowCasPoolKey, String casReceivedFrom )
{
CAS cas = null;
- String casReferenceId = null;
+ // If this is a new CAS (generated by a CM), fetch a CAS from a Shadow Cas Pool associated with a CM that
+ // produced the CAS. Each CM will have its own Shadow Cas Pool
+ if ( fetchCASFromShadowCasPool )
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "getCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_cm__FINE",
+ new Object[] { shadowCasPoolKey });
+ // Aggregate time spent waiting for a CAS in the shadow cas pool
+ ((AggregateAnalysisEngineController)getController()).getDelegateServicePerformance(shadowCasPoolKey).beginWaitOnShadowCASPool();
+ cas = getController().getCasManagerWrapper().getNewCas(shadowCasPoolKey);
+ ((AggregateAnalysisEngineController)getController()).getDelegateServicePerformance(shadowCasPoolKey).endWaitOnShadowCASPool();
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "getCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted_cm__FINE",
+ new Object[] { shadowCasPoolKey });
+ }
+ else
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "getCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas__FINE",
+ new Object[] { casReceivedFrom });
+
+ // Aggregate time spent waiting for a CAS in the service cas pool
+ getController().getServicePerformance().beginWaitOnCASPool();
+
+ cas = getController().getCasManagerWrapper().getNewCas();
+ getController().getServicePerformance().endWaitOnCASPool();
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "getCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted__FINE",
+ new Object[] { casReceivedFrom });
+ }
+ return cas;
+ }
+ /**
+ *
+ * @param casReferenceId
+ * @param freeCasEndpoint
+ * @param shadowCasPoolKey
+ * @param aMessageContext
+ * @return
+ * @throws Exception
+ */
+ private CacheEntry deserializeCASandRegisterWithCache( String casReferenceId, Endpoint freeCasEndpoint, String shadowCasPoolKey, MessageContext aMessageContext)
+ throws Exception
+ {
long inTime = System.nanoTime();
boolean casRegistered = false;
- boolean requestToFreeCasSent = false;
+
+ // Fetch serialized CAS from the message
+ String xmi = aMessageContext.getStringMessage();
+
+ // Time how long we wait on Cas Pool to fetch a new CAS
+ long t1 = getController().getCpuTime();
+ // *************************************************************************
+ // Fetch CAS from a Cas Pool. If the CAS came from a Cas Multiplier
+ // fetch the CAS from a shadow CAS pool. Otherwise, fetch the CAS
+ // from the service CAS Pool.
+ // *************************************************************************
+
+ CAS cas = getCAS(aMessageContext.propertyExists(AsynchAEMessage.CasSequence), shadowCasPoolKey, aMessageContext.getEndpoint().getEndpoint());
+ long timeWaitingForCAS = getController().getCpuTime() - t1;
+ // Check if we are still running
+ if ( getController().isStopped() )
+ {
+ // The Controller is in shutdown state.
+ getController().dropCAS(cas);
+ return null;
+ }
+ // *************************************************************************
+ // Deserialize the CAS from the message
+ // *************************************************************************
+ t1 = getController().getCpuTime();
+ XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+ UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
+ long timeToDeserializeCAS = getController().getCpuTime() - t1;
+ getController().incrementDeserializationTime(timeToDeserializeCAS);
+ LongNumericStatistic statistic;
+ if ( (statistic = getController().getMonitor().getLongNumericStatistic("",Monitor.TotalDeserializeTime)) != null )
+ {
+ statistic.increment(timeToDeserializeCAS);
+ }
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialize_cas_time_FINE",
+ new Object[] { timeToDeserializeCAS / 1000 });
+ // *************************************************************************
+ // Register the CAS with a local cache
+ // *************************************************************************
+ CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, casReferenceId);
+
+ // Update Stats
+ ServicePerformance casStats = getController().getCasStatistics(casReferenceId);
+ casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
+ if ( getController().isTopLevelComponent() )
+ {
+ synchronized( mux )
+ {
+ getController().getServicePerformance().incrementCasDeserializationTime(timeToDeserializeCAS);
+ }
+ }
+ getController().saveTime(inTime, casReferenceId, getController().getName());
+
+ if ( getController() instanceof AggregateAnalysisEngineController )
+ {
+ // If the message came from a Cas Multiplier, associate the input/parent CAS id with this CAS
+ if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
+ {
+ // Fetch parent CAS id
+ String inputCasReferenceId =
+ aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
+ if ( shadowCasPoolKey != null )
+ {
+ // Save the key of the Cas Multiplier in the cache. It will be now known which Cas Multiplier produced this CAS
+ entry.setCasMultiplierKey(shadowCasPoolKey);
+ }
+ // associate this subordinate CAS with the parent CAS
+ entry.setInputCasReferenceId(inputCasReferenceId);
+ // Save a Cas Multiplier endpoint where a Free CAS notification will be sent
+ entry.setFreeCasEndpoint(freeCasEndpoint);
+ cacheStats( inputCasReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
+ }
+ else
+ {
+ cacheStats( casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
+ }
+ DelegateStats stats = new DelegateStats();
+ if ( entry.getStat() == null )
+ {
+ entry.setStat(stats);
+ // Add entry for self (this aggregate). MessageContext.getEndpointName()
+ // returns the name of the queue receiving the message.
+ stats.put(getController().getServiceEndpointName(), new TimerStats());
+ }
+ else
+ {
+ if (!stats.containsKey(getController().getServiceEndpointName()))
+ {
+ stats.put(getController().getServiceEndpointName(), new DelegateStats());
+ }
+ }
+ }
+ else
+ {
+ cacheStats( casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
+ }
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialized_cas_ready_to_process_FINE",
+ new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ cacheProcessCommandInClientEndpoint();
+ return entry;
+ }
+ private String getCasReferenceId( MessageContext aMessageContext ) throws Exception
+ {
+ if ( !aMessageContext.propertyExists(AsynchAEMessage.CasReference) )
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_message_has_cas_refid__INFO",
+ new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+
+ getController().
+ getOutputChannel().
+ sendReply(new InvalidMessageException("No Cas Reference Id Received From Delegate In message"), null, aMessageContext.getEndpoint(),AsynchAEMessage.Process);
+ return null;
+ }
+ return aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+ }
+ /**
+ * Handles process request from a remote client
+ *
+ * @param aMessageContext - contains a message from UIMA-AS Client
+ * @throws AsynchAEException
+ */
+ private void handleProcessRequestWithXMI(MessageContext aMessageContext) throws AsynchAEException
+ {
+ CacheEntry entry = null;
+ String casReferenceId = null;
+ // Check if there is a cargo in the message
+ if ( aMessageContext.getStringMessage() == null )
+ {
+ return; // No XMI just return
+ }
+
try
{
- boolean isNewCAS = false;
+
String newCASProducedBy = null;
- String remoteCasReferenceId = null;
- // This is only used when handling CASes produced by CAS Multiplier
-
// Get the CAS Reference Id of the input CAS
- casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+ // Fetch id of the CAS from the message. If it doesnt exist the method will create an entry in the log file and return null
+ casReferenceId = getCasReferenceId(aMessageContext);
+ if ( casReferenceId == null )
+ {
+ return; // Invalid message. Nothing to do
+ }
// Initially make both equal
String inputCasReferenceId = casReferenceId;
- // CASes generated in a Cas Multiplier will have a CasSequence property set. If such property exists
- // it means that the CAS has been generated by a CM.
+ // Destination where Free Cas Notification will be sent if the CAS came from a Cas Multiplier
+ Endpoint freeCasEndpoint = null;
+ // CASes generated by a Cas Multiplier will have a CasSequence property set.
if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
{
- // Remote CM?
- if ( aMessageContext.getEndpoint().isRemote())
- {
- remoteCasReferenceId = casReferenceId;
- }
- // Set the flag to indicate that the CAS been generated by CM
- isNewCAS = true;
- // Fetch the actual input CAS Reference Id from which the CAS being processed was generated from
+ // Fetch an ID of the parent CAS
inputCasReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
- // Fetch input CAS Cache entry
- CacheEntry inputCasCacheEntry = getController().
- getInProcessCache().
- getCacheEntryForCAS(inputCasReferenceId);
- // This CAS came in from the CAS Multiplier. Treat it differently than the
- // input CAS. First, in case the Aggregate needs to send this CAS to the
+ // Fetch Cache entry for the parent CAS
+ CacheEntry inputCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(inputCasReferenceId);
+
+ computeStats(aMessageContext, inputCasReferenceId);
+
+
+ // Fetch an endpoint where Free CAS Notification must be sent.
+ // This endpoint is unique per CM instance. Meaning, each
+ // instance of CM will have an endpoint where it expects Free CAS
+ // notifications.
+ freeCasEndpoint = aMessageContext.getEndpoint();
+ // Clone an endpoint where Free Cas Request will be sent
+ freeCasEndpoint = (Endpoint)((Endpoint_impl)freeCasEndpoint).clone();
+ // Reset the destination
+ aMessageContext.getEndpoint().setDestination(null);
+ // This CAS came in from a CAS Multiplier. Treat it differently than the
+ // input CAS. In case the Aggregate needs to send this CAS to the
// client, retrieve the client destination by looking up the client endpoint
// using input CAS reference id. CASes generated by the CAS multiplier will have
// the same Cas Reference id.
Endpoint replyToEndpoint = inputCasCacheEntry.getMessageOrigin();
- if ( getController() instanceof AggregateAnalysisEngineController )
- {
- newCASProducedBy = inputCasCacheEntry.getCasMultiplierKey();
- // increment number of subordinate CASes that are currently being processed
- // The input CAS (parent) will be held by the aggregate until all of its
- // subordinate CASes are fully processed. Only then, the aggregate can return
- // it back to the client
- synchronized( inputCasCacheEntry )
- {
- inputCasCacheEntry.incrementSubordinateCasInPlayCount();
- }
- if ( ((AggregateAnalysisEngineController)getController()).sendRequestToReleaseCas() )
- {
- try
- {
- // Change the name of the queue where the request to free a CAS will be sent.
- aMessageContext.getEndpoint().setEndpoint(aMessageContext.getEndpoint().getEndpoint()+"CasSync");
-
- getController().getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, casReferenceId, aMessageContext.getEndpoint());
- requestToFreeCasSent = true;
- }
- catch( Exception e){}
- }
- }
- // MessageContext contains endpoint set by the CAS Multiplier service. Overwrite
- // this with the endpoint of the client who sent the input CAS. In case this
- // aggregate is configured to send new CASes to the client we know where to send them.
+ // The message context contains a Cas Multiplier endpoint. Since
+ // we dont want to send a generated CAS back to the CM, override
+ // with an endpoint provided by the client of
+ // this service. Client endpoint is attached to an input Cas cache entry.
aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
- // Set this to null so that the new CAS gets its own Cas Reference Id below
- casReferenceId = null;
- }
- else if ( getController().isTopLevelComponent())
- {
- Endpoint replyToEndpoint = aMessageContext.getEndpoint();
- if ( getController() instanceof AggregateAnalysisEngineController )
- {
- ((AggregateAnalysisEngineController)getController()).addMessageOrigin(casReferenceId, replyToEndpoint);
+ // Before sending a CAS to Cas Multiplier, the aggregate has
+ // saved the CM key in the CAS cache entry. Fetch the key
+ // of the CM so that we can ask the right Shadow Cas Pool for
+ // a new CAS. Every Shadow Cas Pool has a unique id which
+ // corresponds to a Cas Multiplier key.
+ newCASProducedBy = inputCasCacheEntry.getCasMultiplierKey();
+ if ( getController() instanceof AggregateAnalysisEngineController )
+ {
+ Endpoint casMultiplierEndpoint =
+ ((AggregateAnalysisEngineController)getController()).lookUpEndpoint(newCASProducedBy, false);
+ if ( casMultiplierEndpoint != null )
+ {
+ // Save the URL of the broker managing the Free Cas Notification queue.
+ // This is needed when we try to establish a connection to the broker.
+ freeCasEndpoint.setServerURI(casMultiplierEndpoint.getServerURI());
+ }
+ }
+ // increment number of CASes produced from an input CAS
+ // The input CAS (parent) will be held by
+ // the aggregate until all of its subordinate CASes are
+ // fully processed. Only then, the aggregate can return
+ // it back to the client
+ synchronized (inputCasCacheEntry) {
+ inputCasCacheEntry.incrementSubordinateCasInPlayCount();
}
-
+ }
+ else if ( getController().isTopLevelComponent() && getController() instanceof AggregateAnalysisEngineController )
+ {
+ ((AggregateAnalysisEngineController)getController()).addMessageOrigin(casReferenceId, aMessageContext.getEndpoint());
}
// To prevent processing multiple messages with the same CasReferenceId, check the CAS cache
// to see if the message with a given CasReferenceId is already being processed. It is, the
@@ -151,169 +351,23 @@
// CasReferenceId. When the service finally comes back up, it will have multiple messages in
// its queue possibly from the same client. Only the first message for any given CasReferenceId
// should be processed.
- if ( casReferenceId == null || !getController().getInProcessCache().entryExists(casReferenceId) )
+ if ( !getController().getInProcessCache().entryExists(casReferenceId) )
{
- String xmi = aMessageContext.getStringMessage();
-
- // *****************************************************************
- // ***** NO XMI In Message. Kick this back to sender with exception
- // *****************************************************************
- if ( xmi == null )
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_message_has_no_cargo__INFO",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
- getController().
- getOutputChannel().
- sendReply(new InvalidMessageException("No XMI data in message"), casReferenceId, aMessageContext.getEndpoint(),AsynchAEMessage.Process);
- // Dont process this empty message
- return;
- }
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas__FINEST",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
- long t1 = System.nanoTime();
- if ( isNewCAS )
- {
- cas = getController().getCasManagerWrapper().getNewCas(newCASProducedBy);
- }
- else
- {
- cas = getController().getCasManagerWrapper().getNewCas();
- }
- long timeWaitingForCAS = System.nanoTime() - t1;
-
- if ( getController().isStopped() )
- {
- // The Controller is in shutdown state.
- getController().dropCAS(cas);
- return;
- }
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted__FINE",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
- t1 = System.nanoTime();
- XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
- UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
-
- long timeToDeserializeCAS = System.nanoTime() - t1;
- LongNumericStatistic statistic;
- if ( (statistic = getController().getMonitor().getLongNumericStatistic("",Monitor.TotalDeserializeTime)) != null )
- {
- statistic.increment(timeToDeserializeCAS);
- }
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialize_cas_time_FINE",
- new Object[] { timeToDeserializeCAS / 1000 });
- ServicePerformance casStats = null;
-
-
- if (casReferenceId == null)
- {
- if (getController() instanceof PrimitiveAnalysisEngineController)
- {
- inputCasReferenceId = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData);
- }
- else
- {
- casReferenceId = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData);
- if ( inputCasReferenceId == null )
- {
- inputCasReferenceId = casReferenceId;
- }
- }
- casStats = getController().getCasStatistics(inputCasReferenceId);
-
- }
- else
- {
- getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, casReferenceId);
-/*
- if ( aMessageContext.propertyExists(AsynchAEMessage.InputCasReference))
- {
- CacheEntry cacheEntry = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId);
- String parentCasId = aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
- cacheEntry.setInputCasReferenceId(parentCasId);
- }
-*/
- casStats = getController().getCasStatistics(casReferenceId);
- }
- casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
- if ( getController().isTopLevelComponent() )
- {
- getController().getServicePerformance().incrementCasDeserializationTime(timeToDeserializeCAS);
- }
- // Set a local flag to indicate that the CAS has been added to the cache. This will be usefull when handling an exception
- // If an exception happens before the CAS is added to the cache, the CAS needs to be dropped immediately.
- casRegistered = true;
- if ( casReferenceId == null )
- {
- getController().saveTime(inTime, inputCasReferenceId, getController().getName());
- }
- else
- {
- getController().saveTime(inTime, casReferenceId, getController().getName());
- }
-
- CacheEntry entry = null;
- if ( getController() instanceof AggregateAnalysisEngineController )
- {
- entry = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId);
- if ( isNewCAS )
- {
- if ( newCASProducedBy != null )
- {
- entry.setCasMultiplierKey(newCASProducedBy);
- }
- if ( remoteCasReferenceId != null )
- {
- entry.setRemoteCMCasReferenceId(remoteCasReferenceId);
- }
- if ( requestToFreeCasSent )
- {
- entry.setSendRequestToFreeCas(false);
- }
- // associate this subordinate CAS with the parent CAS
- entry.setInputCasReferenceId(inputCasReferenceId);
- entry.setReplyReceived();
- }
- DelegateStats stats = new DelegateStats();
- if ( entry.getStat() == null )
- {
- entry.setStat(stats);
- // Add entry for self (this aggregate). MessageContext.getEndpointName()
- // returns the name of the queue receiving the message.
- stats.put(getController().getServiceEndpointName(), new TimerStats());
- }
- else
- {
- if (!stats.containsKey(getController().getServiceEndpointName()))
- {
- stats.put(getController().getServiceEndpointName(), new DelegateStats());
- }
- }
- }
-
- cacheStats( inputCasReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialized_cas_ready_to_process_FINE",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
-
-
- cacheProcessCommandInClientEndpoint();
-
- if ( getController().isStopped() )
+ entry = deserializeCASandRegisterWithCache( casReferenceId, freeCasEndpoint, newCASProducedBy, aMessageContext);
+ if ( getController().isStopped() || entry == null || entry.getCas() == null)
{
if ( entry != null )
{
- // The Controller is in shutdown state.
+ // The Controller is in shutdown state, release the CAS
getController().dropCAS( entry.getCasReferenceId(), true);
- return;
+ entry = null;
}
+ return;
}
- invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
+ // *****************************************************************
+ // Process the CAS
+ // *****************************************************************
+ invokeProcess(entry.getCas(), inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
}
else
{
@@ -321,7 +375,6 @@
"handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_duplicate_request__INFO",
new Object[] { casReferenceId});
}
-
}
catch ( Exception e)
{
@@ -333,31 +386,122 @@
errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
errorContext.add(AsynchAEMessage.CasReference, casReferenceId );
- if ( casRegistered == false )
+ if ( entry != null )
{
- getController().dropCAS(cas);
+ getController().dropCAS(entry.getCas());
}
getController().getErrorHandlerChain().handle(e, errorContext, getController());
}
}
- private void handleProcessRequestWithXCAS(MessageContext aMessageContext) throws AsynchAEException
+ private void handleProcessRequestWithCASReference(MessageContext aMessageContext) throws AsynchAEException
{
-
+ boolean isNewCAS = false;
+ String newCASProducedBy = null;
+
+
try
{
- boolean isNewCAS = false;
- String newCASProducedBy = null;
-
// This is only used when handling CASes produced by CAS Multiplier
+ String inputCasReferenceId = null;
+ CAS cas = null;
+ String casReferenceId = getCasReferenceId(aMessageContext);
+ if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
+ {
+ isNewCAS = true;
+ Endpoint casMultiplierEndpoint = aMessageContext.getEndpoint();
+
+ if ( casMultiplierEndpoint == null )
+ {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO",
+ new Object[] { casReferenceId });
+ return;
+ }
+ //
+ if ( getController() instanceof AggregateAnalysisEngineController )
+ {
+ getController().getInProcessCache().setCasProducer(casReferenceId, casMultiplierEndpoint.getEndpoint());
+ newCASProducedBy = ((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(casMultiplierEndpoint.getEndpoint());
+ casMultiplierEndpoint.setIsCasMultiplier(true);
+// ((AggregateAnalysisEngineController)getController()).getServicePerformance(newCASProducedBy).incrementNumberOfCASesProcessed();
+ CacheEntry subordinateCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId);
+ CacheEntry inputCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(subordinateCasCacheEntry.getInputCasReferenceId());
+ if ( inputCasCacheEntry != null )
+ {
+ synchronized( inputCasCacheEntry )
+ {
+ inputCasCacheEntry.incrementSubordinateCasInPlayCount();
+ }
+ }
+ }
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_new_cas__FINE",
+ new Object[] { casReferenceId, newCASProducedBy });
+
+ aMessageContext.getEndpoint().setEndpoint(casMultiplierEndpoint.getEndpoint());
+ aMessageContext.getEndpoint().setServerURI(casMultiplierEndpoint.getServerURI());
+ inputCasReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
+ }
+ else
+ {
+ if ( getController() instanceof AggregateAnalysisEngineController )
+ {
+ ((AggregateAnalysisEngineController)getController()).addMessageOrigin(casReferenceId, aMessageContext.getEndpoint());
+ }
+
+ }
+ cas = getController().getInProcessCache().getCasByReference(casReferenceId);
+ long arrivalTime = System.nanoTime();
+ getController().saveTime(arrivalTime, casReferenceId, getController().getName());//aMessageContext.getEndpointName());
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_analyzing_cas__FINE",
+ new Object[] { casReferenceId});
+
+ // Save Process command in the client endpoint.
+ cacheProcessCommandInClientEndpoint();
+
+ if ( getController().isStopped() )
+ {
+ return;
+ }
+
+ if ( isNewCAS )
+ {
+ invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
+ }
+ else
+ {
+ invokeProcess(cas, casReferenceId, null, aMessageContext, newCASProducedBy);
+ }
+ }
+ catch ( AsynchAEException e)
+ {
+ throw e;
+ }
+ catch ( Exception e)
+ {
+ throw new AsynchAEException(e);
+ }
+
+ }
+
+
+ private void handleProcessRequestWithXCAS(MessageContext aMessageContext) throws AsynchAEException
+ {
+
+ try
+ {
// Get the CAS Reference Id of the input CAS
- String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+ String casReferenceId = getCasReferenceId(aMessageContext);
String inputCasReferenceId = casReferenceId;
+ // This is only used when handling CASes produced by CAS Multiplier
+ String newCASProducedBy = null;
if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
{
- isNewCAS = true;
// This CAS came in from the CAS Multiplier. Treat it differently than the
// input CAS. First, in case the Aggregate needs to send this CAS to the
// client, retrieve the client destination by looking up the client endpoint
@@ -430,7 +574,8 @@
if (casReferenceId == null)
{
- casReferenceId = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData);
+ CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData);
+ casReferenceId = entry.getCasReferenceId();
}
else
{
@@ -466,138 +611,6 @@
}
- private void handleProcessRequestWithCASReference(MessageContext aMessageContext) throws AsynchAEException
- {
- boolean isNewCAS = false;
- String newCASProducedBy = null;
-
- if ( getController().isStopped() )
- {
- return;
- }
-
- try
- {
- String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
-
- if ( casReferenceId == null )
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_message_has_cas_refid__INFO",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
-
- getController().
- getOutputChannel().
- sendReply(new InvalidMessageException("No Cas Reference Id Received From Delegate In message"), null, aMessageContext.getEndpoint(),AsynchAEMessage.Process);
- // Dont process this empty message
- return;
-
- }
-// Endpoint replyToEndpoint = aMessageContext.getEndpoint();
-
- // This is only used when handling CASes produced by CAS Multiplier
- String inputCasReferenceId = null;
- CAS cas = null;
-
- if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
- {
- isNewCAS = true;
- Endpoint casMultiplierEndpoint = aMessageContext.getEndpoint();
-
- if ( casMultiplierEndpoint == null )
- {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO",
- new Object[] { casReferenceId });
- return;
- }
- //
- if ( getController() instanceof AggregateAnalysisEngineController )
- {
- getController().getInProcessCache().setCasProducer(casReferenceId, casMultiplierEndpoint.getEndpoint());
- newCASProducedBy =
- ((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(casMultiplierEndpoint.getEndpoint());
- casMultiplierEndpoint.setIsCasMultiplier(true);
- ((AggregateAnalysisEngineController)getController()).
- getServicePerformance(newCASProducedBy).
- incrementNumberOfCASesProcessed();
- CacheEntry subordinateCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId);
- subordinateCasCacheEntry.setReplyReceived();
-
- CacheEntry inputCasCacheEntry = getController().
- getInProcessCache().
- getCacheEntryForCAS(subordinateCasCacheEntry.getInputCasReferenceId());
- if ( inputCasCacheEntry != null )
- {
- synchronized( inputCasCacheEntry )
- {
-// System.out.println("++++++++ Incrementing Delegate:"+casReferenceId+" Count For Input Cas::"+subordinateCasCacheEntry.getInputCasReferenceId());
- inputCasCacheEntry.incrementSubordinateCasInPlayCount();
- }
- }
- }
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_new_cas__FINE",
- new Object[] { casReferenceId, newCASProducedBy });
- // MessageContext contains endpoint set by the CAS Multiplier service. Overwrite
- // this with the endpoint of the client who sent the input CAS. In case this
- // aggregate is configured to send new CASes to the client we know where to send them.
-// if ( aMessageContext.getEndpoint() != null )
-// {
-// aMessageContext.getEndpoint().setEndpoint(casMultiplierEndpoint.getEndpoint());
-// aMessageContext.getEndpoint().setServerURI(casMultiplierEndpoint.getServerURI());
-// }
- aMessageContext.getEndpoint().setEndpoint(casMultiplierEndpoint.getEndpoint());
- aMessageContext.getEndpoint().setServerURI(casMultiplierEndpoint.getServerURI());
-
- inputCasReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
- }
- else
- {
- if ( getController() instanceof AggregateAnalysisEngineController )
- {
- ((AggregateAnalysisEngineController)getController()).addMessageOrigin(casReferenceId, aMessageContext.getEndpoint());
- }
-
- }
- cas = getController().getInProcessCache().getCasByReference(casReferenceId);
-
- long arrivalTime = System.nanoTime();
- getController().saveTime(arrivalTime, casReferenceId, getController().getName());//aMessageContext.getEndpointName());
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_analyzing_cas__FINE",
- new Object[] { casReferenceId});
-
- // Save Process command in the client endpoint.
- cacheProcessCommandInClientEndpoint();
-
- if ( getController().isStopped() )
- {
- return;
- }
-
- if ( isNewCAS )
- {
- invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
- }
- else
- {
- invokeProcess(cas, casReferenceId, null, aMessageContext, newCASProducedBy);
- }
- }
- catch ( AsynchAEException e)
- {
- throw e;
- }
- catch ( Exception e)
- {
- throw new AsynchAEException(e);
- }
-
- }
-
-
private void cacheProcessCommandInClientEndpoint()
{
@@ -615,32 +628,13 @@
getController().collectionProcessComplete(replyToEndpoint);
}
- private void handleReleaseCASRequest(MessageContext aMessageContext)
+ private void handleReleaseCASRequest(MessageContext aMessageContext) throws AsynchAEException
{
-
- if ( getController() instanceof PrimitiveAnalysisEngineController )
- {
- ( (PrimitiveAnalysisEngineController)getController()).releaseNextCas();
- }
-/*
- try
- {
- String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleReleaseCASRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_release_cas_req__FINE",
- new Object[] { getController().getName(), casReferenceId });
-System.out.println(getController().getName()+" ::::::: Processing Release CAS Request:"+casReferenceId);
-
- if ( casReferenceId != null && getController().getInProcessCache().entryExists(casReferenceId))
- {
- getController().dropCAS(casReferenceId, true);
- }
- }
- catch( Exception e)
- {
- getController().getErrorHandlerChain().handle(e, HandlerBase.populateErrorContext( (MessageContext)aMessageContext ), getController());
- }
-*/
+ String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "handleReleaseCASRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_release_cas_req__FINE",
+ new Object[] { getController().getName(), casReferenceId });
+ getController().releaseNextCas(casReferenceId);
}
private void handleStopRequest(MessageContext aMessageContext)
@@ -679,23 +673,43 @@
getController().getControllerLatch().waitUntilInitialized();
- // If a Process Request, increment number of docs processed
- if (messageContext.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request
- && command == AsynchAEMessage.Process) {
- // Increment number of CASes processed by this service
- getController().getServicePerformance().incrementNumberOfCASesProcessed();
- }
+ // If a Process Request, increment number of CASes processed
+ if (messageContext.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request
+ && command == AsynchAEMessage.Process &&!messageContext.propertyExists(AsynchAEMessage.CasSequence)) {
+ // Increment number of CASes processed by this service
+ getController().getServicePerformance().incrementNumberOfCASesProcessed();
+ }
+ if ( getController().isStopped() )
+ {
+ return;
+ }
if (AsynchAEMessage.CASRefID == payload)
{
+ // Fetch id of the CAS from the message.
+ if ( getCasReferenceId(messageContext) == null )
+ {
+ return; // Invalid message. Nothing to do
+ }
+
handleProcessRequestWithCASReference(messageContext);
}
else if (AsynchAEMessage.XMIPayload == payload)
{
+ // Fetch id of the CAS from the message.
+ if ( getCasReferenceId(messageContext) == null )
+ {
+ return; // Invalid message. Nothing to do
+ }
handleProcessRequestWithXMI(messageContext);
}
else if (AsynchAEMessage.XCASPayload == payload)
{
+ // Fetch id of the CAS from the message.
+ if ( getCasReferenceId(messageContext) == null )
+ {
+ return; // Invalid message. Nothing to do
+ }
handleProcessRequestWithXCAS(messageContext);
}
else if ( AsynchAEMessage.None == payload && AsynchAEMessage.CollectionProcessComplete == command)
@@ -718,6 +732,7 @@
}
catch( Exception e)
{
+ e.printStackTrace();
getController().getErrorHandlerChain().handle(e, HandlerBase.populateErrorContext( (MessageContext)anObjectToHandle ), getController());
}
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java Fri Aug 22 11:53:05 2008
@@ -40,6 +40,7 @@
import org.apache.uima.aae.jmx.ServicePerformance;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UIMAMessage;
import org.apache.uima.aae.monitor.Monitor;
import org.apache.uima.aae.monitor.statistics.DelegateStats;
import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
@@ -59,162 +60,6 @@
super(aName);
}
- private void aggregateDelegateStats(MessageContext aMessageContext, String aCasReferenceId) throws AsynchAEException
- {
- String delegateKey = "";
- try
- {
-
- delegateKey = ((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
- CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
- if ( entry == null )
- {
- throw new AsynchAEException("CasReferenceId:"+aCasReferenceId+" Not Found in the Cache.");
- }
- CacheEntry inputCasEntry = null;
- String inputCasReferenceId = entry.getInputCasReferenceId();
- ServicePerformance casStats =
- ((AggregateAnalysisEngineController)getController()).getCasStatistics(aCasReferenceId);
- if ( inputCasReferenceId != null &&
- getController().getInProcessCache().entryExists(inputCasReferenceId) )
- {
- String casProducerKey = entry.getCasProducerKey();
- if ( casProducerKey != null &&
- ((AggregateAnalysisEngineController)getController()).
- isDelegateKeyValid(casProducerKey) )
- {
- // Get entry for the input CAS
- inputCasEntry = getController().
- getInProcessCache().
- getCacheEntryForCAS(inputCasReferenceId);
- }
-
- }
- ServicePerformance delegateServicePerformance =
- ((AggregateAnalysisEngineController)getController()).getServicePerformance(delegateKey);
-
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeToSerializeCAS))
- {
- long timeToSerializeCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeToSerializeCAS)).longValue();
- if ( timeToSerializeCAS > 0)
- {
- casStats.incrementCasSerializationTime(timeToSerializeCAS);
- if ( delegateServicePerformance != null )
- {
- delegateServicePerformance.
- incrementCasSerializationTime(timeToSerializeCAS);
- }
- getController().getServicePerformance().
- incrementCasSerializationTime(timeToSerializeCAS);
- }
- }
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeToDeserializeCAS))
- {
- long timeToDeserializeCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeToDeserializeCAS)).longValue();
- if ( timeToDeserializeCAS > 0 )
- {
- casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
-
- if ( delegateServicePerformance != null )
- {
- delegateServicePerformance.
- incrementCasDeserializationTime(timeToDeserializeCAS);
- }
- getController().getServicePerformance().
- incrementCasDeserializationTime(timeToDeserializeCAS);
- }
- }
-
- if (aMessageContext.propertyExists(AsynchAEMessage.IdleTime))
- {
- long idleTime = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.IdleTime)).longValue();
- if ( idleTime > 0 )
- {
- casStats.incrementIdleTime(idleTime);
- if ( delegateServicePerformance != null )
- {
- delegateServicePerformance.
- incrementIdleTime(idleTime);
- }
- }
- }
-
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeWaitingForCAS))
- {
- long timeWaitingForCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeWaitingForCAS)).longValue();
- if ( aMessageContext.getEndpoint().isRemote())
- {
- entry.incrementTimeWaitingForCAS(timeWaitingForCAS);
- if ( inputCasEntry != null )
- {
- inputCasEntry.incrementTimeWaitingForCAS(timeWaitingForCAS);
- }
- }
- }
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeInProcessCAS))
- {
- long timeInProcessCAS = ((Long) aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInProcessCAS)).longValue();
- casStats.incrementAnalysisTime(timeInProcessCAS);
- if ( delegateServicePerformance != null )
- {
- delegateServicePerformance.
- incrementAnalysisTime(timeInProcessCAS);
- }
- // Accumulate processing time
- getController().getServicePerformance().
- incrementAnalysisTime(timeInProcessCAS);
- if ( inputCasReferenceId != null )
- {
- ServicePerformance inputCasStats =
- ((AggregateAnalysisEngineController)getController()).
- getCasStatistics(inputCasReferenceId);
- // Update processing time for this CAS
- if ( inputCasStats != null )
- {
- inputCasStats.incrementAnalysisTime(timeInProcessCAS);
- }
- }
- }
- }
- catch( AsynchAEException e)
- {
- throw e;
- }
- catch( Exception e)
- {
- throw new AsynchAEException(e);
- }
- }
- private void computeStats(MessageContext aMessageContext, String aCasReferenceId) throws AsynchAEException
- {
- if (aMessageContext.propertyExists(AsynchAEMessage.TimeInService))
- {
- long departureTime = getController().getTime(aCasReferenceId, aMessageContext.getEndpoint().getEndpoint());
- long currentTime = System.nanoTime();
- long roundTrip = currentTime - departureTime;
- long timeInService = aMessageContext.getMessageLongProperty(AsynchAEMessage.TimeInService);
- long totalTimeInComms = currentTime - (departureTime - timeInService);
-
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_roundtrip_time__FINE",
- new Object[] { aCasReferenceId, aMessageContext.getEndpoint(),(double) roundTrip / (double) 1000000 });
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_delegate__FINE",
- new Object[] { aCasReferenceId, (double) timeInService / (double) 1000000, aMessageContext.getEndpoint() });
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "computeStats", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_show_time_spent_in_comms__FINE",
- new Object[] { aCasReferenceId, (double) totalTimeInComms / (double) 1000000, aMessageContext.getEndpoint() });
- }
-
- if ( getController() instanceof AggregateAnalysisEngineController )
- {
- aggregateDelegateStats( aMessageContext, aCasReferenceId );
- }
- }
-
private Endpoint lookupEndpoint(String anEndpointName, String aCasReferenceId)
{
return getController().getInProcessCache().getEndpoint(anEndpointName, aCasReferenceId);
@@ -367,39 +212,45 @@
"handleProcessResponseWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_rcvd_reply_FINEST",
new Object[] { aMessageContext.getEndpoint().getEndpoint(), casReferenceId, xmi });
}
-
- long t1 = System.nanoTime();
-
- synchronized (monitor)
- {
- XmiSerializationSharedData deserSharedData;
- if (totalNumberOfParallelDelegatesProcessingCas > 1 && cacheEntry.howManyDelegatesResponded() > 0)
- {
- // process secondary reply from a parallel step
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ long t1 = getController().getCpuTime();
+ /* --------------------- */
+ /** DESERIALIZE THE CAS. */
+ /* --------------------- */
+
+ // check if the CAS is part of the Parallel Step
+ if (totalNumberOfParallelDelegatesProcessingCas > 1 )
+ {
+ // Synchronized because replies are merged into the same CAS.
+ synchronized (monitor)
+ {
+ // Check if this a secondary reply in a parallel step. If it is the first reply, deserialize the CAS
+ // using a standard approach. There is no merge to be done yet. Otherwise, we need to
+ // merge the CAS with previous results.
+ if ( cacheEntry.howManyDelegatesResponded() > 0 )
+ {
+ // process secondary reply from a parallel step
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"handleProcessResponseWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_delegate_responded_count_FINEST",
new Object[] { cacheEntry.howManyDelegatesResponded(), casReferenceId});
-
- int highWaterMark = cacheEntry.getHighWaterMark();
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+
+ int highWaterMark = cacheEntry.getHighWaterMark();
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"handleProcessResponseWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_high_water_mark_FINEST",
new Object[] { highWaterMark, casReferenceId });
-
- deserSharedData = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId).getDeserSharedData();
- UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, highWaterMark);
- }
- else // general case, or first reply from a parallel step
- {
- // Processing the reply from a standard, non-parallel delegate
- deserSharedData = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId).getDeserSharedData();
- if (deserSharedData == null) {
- deserSharedData = new XmiSerializationSharedData();
- getController().getInProcessCache().getCacheEntryForCAS(casReferenceId).setXmiSerializationData(deserSharedData);
- }
- UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
+ deserialize( xmi, cas, casReferenceId, highWaterMark);
+ }
+ else
+ {
+ // first reply from a parallel step
+ deserialize(xmi, cas, casReferenceId);
+ }
}
}
-
+ else // general case
+ {
+ // Processing a reply from a standard, non-parallel delegate
+ deserialize(xmi, cas, casReferenceId);
+ }
if ( cacheEntry != null && totalNumberOfParallelDelegatesProcessingCas > 1 )
{
@@ -409,7 +260,7 @@
}
}
- long timeToDeserializeCAS = System.nanoTime() - t1;
+ long timeToDeserializeCAS = getController().getCpuTime() - t1;
getController().
getServicePerformance().
@@ -455,6 +306,23 @@
}
}
+ private void deserialize( String xmi, CAS cas, String casReferenceId, int highWaterMark ) throws Exception
+ {
+ XmiSerializationSharedData deserSharedData;
+ deserSharedData = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId).getDeserSharedData();
+ UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, highWaterMark);
+ }
+ private void deserialize( String xmi, CAS cas, String casReferenceId ) throws Exception
+ {
+ // Processing the reply from a standard, non-parallel delegate
+ XmiSerializationSharedData deserSharedData;
+ deserSharedData = getController().getInProcessCache().getCacheEntryForCAS(casReferenceId).getDeserSharedData();
+ if (deserSharedData == null) {
+ deserSharedData = new XmiSerializationSharedData();
+ getController().getInProcessCache().getCacheEntryForCAS(casReferenceId).setXmiSerializationData(deserSharedData);
+ }
+ UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
+ }
private void handleProcessResponseWithCASReference(MessageContext aMessageContext )
{
String casReferenceId = null;
@@ -472,11 +340,12 @@
ServicePerformance delegateServicePerformance =
((AggregateAnalysisEngineController)getController()).
getServicePerformance(delegateKey);
+ /*
if ( delegateServicePerformance != null )
{
delegateServicePerformance.incrementNumberOfCASesProcessed();
}
-
+*/
//CAS cas = getController().getInProcessCache().getCasByReference(casReferenceId);
if (cas != null)
{
@@ -576,7 +445,7 @@
}
return true;
}
- private void handleProcessResponseWithException(MessageContext aMessageContext)
+ private void handleProcessResponseWithException(MessageContext aMessageContext, String delegateKey)
{
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"handleProcessResponseWithException", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_handling_exception_from_delegate_FINE",
@@ -617,7 +486,7 @@
{
isCpCError = true;
((AggregateAnalysisEngineController)getController()).
- processCollectionCompleteReplyFromDelegate(aMessageContext.getEndpoint().getEndpoint(), false);
+ processCollectionCompleteReplyFromDelegate(delegateKey, false);
}
else
{
@@ -653,11 +522,10 @@
}
- private void handleCollectionProcessCompleteReply(MessageContext aMessageContext)
+ private void handleCollectionProcessCompleteReply(MessageContext aMessageContext, String delegateKey)
{
try
{
- String delegateKey = ((Endpoint)aMessageContext.getEndpoint()).getEndpoint();
if ( getController() instanceof AggregateAnalysisEngineController )
{
((AggregateAnalysisEngineController) getController())
@@ -692,9 +560,24 @@
int command = messageContext.getMessageIntProperty(AsynchAEMessage.Command);
String delegate = ((Endpoint)messageContext.getEndpoint()).getEndpoint();
String key = null;
+ String fromServer = null;
if ( getController() instanceof AggregateAnalysisEngineController )
{
- key = ((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(delegate);
+ if ( ((Endpoint)messageContext.getEndpoint()).isRemote() )
+ {
+ if ( ((MessageContext)anObjectToHandle).propertyExists(AsynchAEMessage.EndpointServer))
+ {
+
+ fromServer =((MessageContext)anObjectToHandle).getMessageStringProperty(AsynchAEMessage.EndpointServer);
+
+ }
+ else if ( ((MessageContext)anObjectToHandle).propertyExists(UIMAMessage.ServerURI))
+ {
+ fromServer = ((MessageContext)anObjectToHandle).getMessageStringProperty(UIMAMessage.ServerURI);
+ }
+ }
+
+ key = ((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(delegate, fromServer);
}
if (AsynchAEMessage.CASRefID == payload)
{
@@ -715,11 +598,19 @@
}
else if (AsynchAEMessage.Exception == payload)
{
- handleProcessResponseWithException(messageContext);
+ if ( key == null )
+ {
+ key = ((Endpoint)messageContext.getEndpoint()).getEndpoint();
+ }
+ handleProcessResponseWithException(messageContext, key);
}
else if (AsynchAEMessage.None == payload && AsynchAEMessage.CollectionProcessComplete == command)
{
- handleCollectionProcessCompleteReply(messageContext);
+ if ( key == null )
+ {
+ key = ((Endpoint)messageContext.getEndpoint()).getEndpoint();
+ }
+ handleCollectionProcessCompleteReply(messageContext, key);
}
else if (AsynchAEMessage.None == payload && AsynchAEMessage.ACK == command)
{
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/AggregateServiceInfo.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/AggregateServiceInfo.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/AggregateServiceInfo.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/AggregateServiceInfo.java Fri Aug 22 11:53:05 2008
@@ -26,4 +26,14 @@
/**
*
*/
+
+ public AggregateServiceInfo()
+ {
+ super(false);
+ }
+
+ public AggregateServiceInfo( boolean isaCasMultiplier )
+ {
+ super(isaCasMultiplier);
+ }
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/PrimitiveServiceInfo.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/PrimitiveServiceInfo.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/PrimitiveServiceInfo.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/PrimitiveServiceInfo.java Fri Aug 22 11:53:05 2008
@@ -30,7 +30,16 @@
/**
*
*/
+ public PrimitiveServiceInfo()
+ {
+ super(false);
+ }
+ public PrimitiveServiceInfo( boolean isaCasMultiplier )
+ {
+ super(isaCasMultiplier);
+ }
+
private int instanceCount = 0;
public int getAnalysisEngineInstanceCount()
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfo.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfo.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfo.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfo.java Fri Aug 22 11:53:05 2008
@@ -28,9 +28,23 @@
private static final String label="Service Info";
private String brokerURL="";
private String inputQueueName="";
+ private String replyQueueName="";
private String state="";
private String[] deploymentDescriptor= new String[] {""};
+ private boolean casMultiplier;
+ private boolean topLevel;
+ private String serviceKey;
+ private boolean aggregate;
+ public ServiceInfo()
+ {
+ this(false);
+ }
+
+ public ServiceInfo( boolean isaCasMultiplier )
+ {
+ casMultiplier = isaCasMultiplier;
+ }
public String getLabel()
{
return label;
@@ -68,6 +82,47 @@
{
state = aState;
}
+ public boolean isCASMultiplier()
+ {
+ return casMultiplier;
+ }
+ public void setCASMultiplier()
+ {
+ casMultiplier = true;
+ }
+ public void setTopLevel()
+ {
+ topLevel = true;
+ }
+ public boolean isTopLevel()
+ {
+ return topLevel;
+ }
+ public String getServiceKey() {
+ return serviceKey;
+ }
+
+ public void setServiceKey(String serviceKey) {
+ this.serviceKey = serviceKey;
+ }
+
+ public String getReplyQueueName()
+ {
+ return replyQueueName;
+ }
+
+ public void setReplyQueueName(String aReplyQueueName)
+ {
+ replyQueueName = aReplyQueueName;
+ }
+
+ public boolean isAggregate() {
+ return aggregate;
+ }
+
+ public void setAggregate(boolean aggregate) {
+ this.aggregate = aggregate;
+ }
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfoMBean.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfoMBean.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfoMBean.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServiceInfoMBean.java Fri Aug 22 11:53:05 2008
@@ -25,6 +25,11 @@
{
public String getState();
public String getInputQueueName();
+ public String getReplyQueueName();
public String getBrokerURL();
public String[] getDeploymentDescriptor();
+ public boolean isCASMultiplier();
+ public boolean isTopLevel();
+ public String getServiceKey();
+ public boolean isAggregate();
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformance.java Fri Aug 22 11:53:05 2008
@@ -23,12 +23,14 @@
import java.text.FieldPosition;
import java.util.Formatter;
+import org.apache.uima.aae.controller.AnalysisEngineController;
+
public class ServicePerformance implements ServicePerformanceMBean
{
private static final long serialVersionUID = 1L;
private static final String label="Service Performance";
- private long accumulatedIdleTime=0;
+ private long idleTime=0;
private long numberOfCASesProcessed=0;
private long casDeserializationTime=0;
private long casSerializationTime=0;
@@ -36,9 +38,46 @@
private long maxSerializationTime=0;
private long maxDeserializationTime=0;
private long maxAnalysisTime=0;
-
+ private long casPoolWaitTime=0;
+ private long shadowCasPoolWaitTime=0;
+ private long timeSpentInCMGetNext = 0;
private Object sem = new Object();
+ private AnalysisEngineController controller;
+ private boolean isRemoteDelegate = false;
+ private long uptime = System.nanoTime();
+ long lastUpdate=System.nanoTime();
+ private int processThreadCount=1;
+
+ private Object waitmux = new Object();
+
+ private boolean waitingForCAS = false;
+
+ private long totalWaitTimeForCAS = 0;
+
+ private long lastCASWaitTimeUpdate = 0;
+ private Object shadowPoolMux = new Object();
+ private boolean waitingForSPCAS = false;
+ private long lastSPCASWaitTimeUpdate = 0;
+
+ private Object getNextMux = new Object();
+ private boolean waitingInGetNext = false;
+ private long lastGetNextWaitTimeUpdate = 0;
+ private long totalGetNextWaitTime = 0;
+
+ public ServicePerformance()
+ {
+ }
+
+ public ServicePerformance(AnalysisEngineController aController)
+ {
+ controller = aController;
+ }
+
+ public void setRemoteDelegate()
+ {
+ isRemoteDelegate = true;
+ }
public String getLabel()
{
return label;
@@ -46,32 +85,56 @@
public synchronized void reset()
{
- accumulatedIdleTime = 0;
+ idleTime = 0;
numberOfCASesProcessed=0;
casDeserializationTime=0;
casSerializationTime=0;
+ casPoolWaitTime = 0;
+ shadowCasPoolWaitTime=0;
analysisTime=0;
+ maxSerializationTime=0;
+ maxDeserializationTime=0;
+ maxAnalysisTime=0;
+ timeSpentInCMGetNext = 0;
+ uptime = System.nanoTime();
+ }
+
+
+ public void setIdleTime( long anIdleTime )
+ {
+ synchronized( sem )
+ {
+ idleTime = anIdleTime;
+ }
}
public double getIdleTime()
{
- if ( accumulatedIdleTime != 0)
+
+ if ( controller != null )
+ {
+ // Force update of the idle time
+ return ((double)controller.getIdleTime()/(double) 1000000);
+ }
+ else
+ {
synchronized( sem )
{
- return((double)accumulatedIdleTime/(double) 1000000);
+ return ((double)idleTime/(double) 1000000);
}
- else
- return 0;
+
+ }
}
public long getRawIdleTime()
{
- return accumulatedIdleTime;
+ return idleTime;
}
public void incrementIdleTime(long anIdleTime)
{
synchronized( sem )
{
- accumulatedIdleTime += anIdleTime;
+ idleTime += anIdleTime;
+ lastUpdate = System.nanoTime();
}
}
@@ -89,7 +152,22 @@
public double getAnalysisTime()
{
- return (double)analysisTime/(double)1000000;
+// return (double)analysisTime/(double)1000000;
+
+ if ( controller != null )
+ {
+ return ((double)controller.getAnalysisTime()/(double) 1000000);
+ }
+ else
+ {
+ synchronized( sem )
+ {
+ return (double)analysisTime/(double)1000000;
+ }
+
+ }
+
+
}
public long getRawAnalysisTime()
@@ -157,4 +235,158 @@
{
return (double)maxAnalysisTime / (double)1000000;
}
+ public void incrementCasPoolWaitTime(long aCasPoolsWaitTime )
+ {
+ synchronized (sem )
+ {
+ casPoolWaitTime += aCasPoolsWaitTime;
+ }
+ }
+ public double getCasPoolWaitTime()
+ {
+ synchronized (sem )
+ {
+ return (double)getTimeWaitingForCAS()/(double)1000000;
+ }
+ }
+ public double getShadowCasPoolWaitTime()
+ {
+ return ((double)getTimeWaitingForShadowPoolCAS()/(double) 1000000);
+ }
+
+ public double getTimeSpentInCMGetNext()
+ {
+ // Force update of the wait time
+ return ((double)getTimeWaitingInGetNext()/(double) 1000000);
+ }
+
+ public void beginWaitOnCASPool()
+ {
+ synchronized( waitmux )
+ {
+ if ( !waitingForCAS )
+ {
+ waitingForCAS = true;
+ lastCASWaitTimeUpdate = System.nanoTime();
+ }
+ }
+ }
+ public void endWaitOnCASPool()
+ {
+ synchronized( waitmux )
+ {
+ long delta= (System.nanoTime() - lastCASWaitTimeUpdate);
+ totalWaitTimeForCAS += delta;
+ waitingForCAS = false;
+ }
+ }
+
+
+
+ public long getTimeWaitingForCAS()
+ {
+ synchronized( waitmux )
+ {
+ long now = System.nanoTime();
+ if ( waitingForCAS )
+ {
+ long delta= (System.nanoTime() - lastCASWaitTimeUpdate);
+ totalWaitTimeForCAS += delta;
+ lastCASWaitTimeUpdate = now;
+ }
+ return totalWaitTimeForCAS;
+ }
+ }
+
+
+
+ public void beginWaitOnShadowCASPool()
+ {
+ synchronized( shadowPoolMux )
+ {
+ if ( !waitingForSPCAS )
+ {
+ waitingForSPCAS = true;
+ lastSPCASWaitTimeUpdate = System.nanoTime();
+ }
+ }
+ }
+ public void endWaitOnShadowCASPool()
+ {
+ synchronized( shadowPoolMux )
+ {
+ long delta= (System.nanoTime() - lastSPCASWaitTimeUpdate);
+ shadowCasPoolWaitTime += delta;
+ waitingForSPCAS = false;
+ }
+ }
+
+ public long getTimeWaitingForShadowPoolCAS()
+ {
+ synchronized( shadowPoolMux )
+ {
+ long now = System.nanoTime();
+ if ( waitingForSPCAS )
+ {
+ long delta= (System.nanoTime() - lastSPCASWaitTimeUpdate);
+ shadowCasPoolWaitTime += delta;
+ lastSPCASWaitTimeUpdate = now;
+ }
+ return shadowCasPoolWaitTime;
+ }
+
+ }
+
+
+ public void beginGetNextWait()
+ {
+ synchronized( getNextMux )
+ {
+ if ( !waitingInGetNext )
+ {
+ waitingInGetNext = true;
+ lastGetNextWaitTimeUpdate = System.nanoTime();
+ }
+ else
+ {
+
+ }
+ }
+ }
+ public void endGetNextWait()
+ {
+ synchronized( getNextMux )
+ {
+ long delta= (System.nanoTime() - lastGetNextWaitTimeUpdate);
+ totalGetNextWaitTime += delta;
+ waitingInGetNext = false;
+ }
+ }
+
+ public long getTimeWaitingInGetNext()
+ {
+ synchronized( getNextMux )
+ {
+ long now = System.nanoTime();
+ if ( waitingInGetNext )
+ {
+ long delta= (System.nanoTime() - lastGetNextWaitTimeUpdate);
+ totalGetNextWaitTime += delta;
+ lastGetNextWaitTimeUpdate = now;
+ }
+ return totalGetNextWaitTime;
+ }
+
+ }
+
+ public int getProcessThreadCount() {
+ return processThreadCount;
+ }
+
+ public void setProcessThreadCount(int processThreadCount) {
+ this.processThreadCount = processThreadCount;
+ }
+
+
+
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformanceMBean.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformanceMBean.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformanceMBean.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/jmx/ServicePerformanceMBean.java Fri Aug 22 11:53:05 2008
@@ -32,5 +32,8 @@
public double getCasDeserializationTime();
public double getCasSerializationTime();
public double getAnalysisTime();
-
+ public double getCasPoolWaitTime();
+ public double getShadowCasPoolWaitTime();
+ public double getTimeSpentInCMGetNext();
+ public int getProcessThreadCount();
}
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/message/AsynchAEMessage.java Fri Aug 22 11:53:05 2008
@@ -23,7 +23,6 @@
public interface AsynchAEMessage
{
public static final String TotalTimeSpentInAnalytic = "TimeInAnalytic";
- public static final String SendTime = "SendTime";
public static final String TimeInService = "TimeInService";
public static final String Endpoint = "Endpoint";
public static final String DelegateStats = "DelegateStats";
@@ -31,14 +30,13 @@
public static final String CasReference = "CasReference";
public static final String InputCasReference = "InputCasReference";
-// public static final String CasReferenceId = "CasReferenceId";
public static final String MessageFrom = "MessageFrom";
public static final String XCASREFId = "XCASRefId";
public static final String XCas = "XCas";
public static final String AEMetadata = "Metadata";
public static final String CasSequence = "CasSequence";
public static final String ReplyToEndpoint = "ReplyToEndpoint";
-
+ public static final String EndpointServer = "EndpointServer";
public static final String ServerIP = "ServerIP";
public static final String RemoveEndpoint = "RemoveEndpoint";
public static final String Aborted = "Aborted";
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=688174&r1=688173&r2=688174&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/resources/uimaee_messages.properties Fri Aug 22 11:53:05 2008
@@ -61,6 +61,9 @@
UIMAEE_worker_thread_terminated__INFO = Worker Thread: {0} Terminated
UIMAEE_request_cas__FINE = Requesting CAS from CasPool For CAS Deserialization. Cas Sent From: {0}
UIMAEE_request_cas_granted__FINE = Obtained CAS from CasPool For CAS Deserialization. Cas Sent From: {0}
+UIMAEE_request_cas_cm__FINE = Requesting CAS from Cas Multipliers Shadow Cas Pool Identified By Key: {0} For CAS Deserialization.
+UIMAEE_request_cas_granted_cm__FINE = Obtained CAS from Cas Multipliers Shadow Cas Pool Identified By Key: {0} For CAS Deserialization.
+
UIMAEE_deserialized_cas_ready_to_process_FINE = Deserialized CAS from XMI. CAS Sent From: {0}
UIMAEE_got_cas__FINEST = Got Cas From: {0} Cas Received Contains: {1}
UIMAEE_duplicate_request__INFO = Duplicate Request With Cas Reference Id: {0} Received. Ignoring Duplicate.
@@ -121,7 +124,7 @@
UIMAEE_cas_retries_exceeded__FINE = Controller: {0} Process CAS Retries Exceeded Or Not Configured. Delegate: {1} Cas Reference Id: {2}
UIMAEE_process_cas_exceeded_threshold__INFO = Controller: {0} Process CAS Threshold Exceeded Configured Maximum.Delegate: {1} Cas Reference Id: {2} Threshold: {3} Action To Take: {4}
UIMAEE_ignore_error__INFO = Controller: {0} Ignoring Error Of Class: {1}
-UIMAEE_show_cache_entry_key__FINEST = InProcessCache Contains: {0} Entries: {1}
+UIMAEE_show_cache_entry_key__FINEST = Controller: {0} InProcessCache Contains: {1} Entries: {2}
UIMAEE_remove_cache_entry_for_cas__FINEST = Removing Cache Entry For Cas Reference Id: {0}
UIMAEE_cas_is_null_remove_from_cache_failed__FINEST = Unable to Remove Cache Entry. Cas Reference Id is Null
UIMAEE_cas_is_invalid_remove_from_cache_failed__FINEST = Unable to Remove Cache Entry. Provided Cas Reference Id: {0} is not a valid key.
@@ -148,3 +151,10 @@
UIMAEE_final_step_parent_cas_child_count__FINEST = Controller: {0} Final Step - Parent Cas Reference Id: {1} Has In-Play Subordinate CASes. Current Subordinate CAS Count: {2}
UIMAEE_final_step_parent_cas_no_children__FINEST = Controller: {0} Final Step - Parent Cas Reference Id: {1} Has No Subordinate CASes Being Processed.
UIMAEE_unable_to_check_ae_back_to_pool__WARNING = Controller: {0} Unable to Check In an Instance Of AE While Processing CPC. Exception {1}
+UIMAEE_sending_fcq_req__FINE = Controller: {0} Sending Request To Release CAS: {1} To Cas Multiplier:{2} Queue: {3}
+UIMAEE_remove_cache_entry__INFO = Controller: {0} Releasing CASes Produced From Input CAS: {1}
+UIMAEE_dump_msg_origin__FINE = Controller: {0} Origin Map Dump {1}
+UIMAEE_show_abbrev_cache_stats___FINE = Controller: {0} Number of CASes In the Cache: {1} Number of CASes in Final State: {2}
+UIMAEE_service_idle_time_cas_pool_INFO = \tTimestamp:\t{0}\t[ {1} ]\tCM:\t{2}\tRemote:\t{3}\tIdle\t{4}\tCASes\t{5}\tInQDepth\t{6}\tRQDepth\t{7}\tCP Wait\t{8}\tAnalysis:\t{9}\tThreadCnt:\t{10}\tCMFreeCasCount:\t{11}
+UIMAEE_service_idle_time_shadow_cas_pool_INFO = \tTimestamp:\t{0}\t[ {1} ]\tCM:\t{2}\tRemote:\t{3}\tIdle\t{4}\tCASes\t{5}\tInQDepth\t{6}\tRQDepth\t{7}\tSCP Wait\t{8}\tAnalysis:\t{9}\tThreadCnt:\t{10}\tCMFreeCasCount:\t{11}
+UIMAEE_marker_INFO = ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++