You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/06/24 20:53:24 UTC
svn commit: r788121 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
Author: cwiklik
Date: Wed Jun 24 18:53:23 2009
New Revision: 788121
URL: http://svn.apache.org/viewvc?rev=788121&view=rev
Log:
UIMA-1358 Returns parent CAS to the client on exception
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=788121&r1=788120&r2=788121&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Wed Jun 24 18:53:23 2009
@@ -931,7 +931,66 @@
}
}
+ private boolean abortProcessingCas(CasStateEntry casStateEntry, CacheEntry entry ) {
+ CasStateEntry parentCasStateEntry = null;
+ try {
+ // Check if this CAS has a parent
+ if ( casStateEntry.isSubordinate() ) {
+ // Fetch parent's cache entry
+ parentCasStateEntry = getLocalCache().lookupEntry(casStateEntry.getInputCasReferenceId());
+ // Check the state of the parent CAS. If it is marked as failed, it means that
+ // one of its child CASes failed and error handling was configured to fail the
+ // CAS. Such failure of a child CAS causes a failure of the parent CAS. All child
+ // CASes will be dropped in finalStep() as they come back from delegates. When all are
+ // accounted for and dropped, the parent CAS will be returned back to the client
+ // with an exception.
+ if ( parentCasStateEntry.isFailed()) {
+ // Fetch Delegate object for the CM that produced the CAS. The producer key
+ // is associated with a cache entry in the ProcessRequestHandler. Each new CAS
+ // must have a key of a CM that produced it.
+ Delegate delegateCM = lookupDelegate(entry.getCasProducerKey());
+ if ( delegateCM != null && delegateCM.getEndpoint().isCasMultiplier() )
+ {
+ // If the delegate CM is a remote, send a Free CAS notification
+ if ( delegateCM.getEndpoint().isRemote()) {
+ getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, entry.getCasReferenceId(), delegateCM.getNotificationEndpoint());
+ }
+ // Check if a request to stop generation of new CASes from the parent of
+ // this CAS has been sent to the CM. The Delegate object keeps track of
+ // requests to STOP that are sent to the CM. Only one STOP is needed.
+ if ( delegateCM.isGeneratingChildrenFrom(parentCasStateEntry.getCasReferenceId())){
+ // Issue a request to the CM to stop producing new CASes from a given input
+ // CAS
+ stopCasMultiplier(delegateCM, parentCasStateEntry.getCasReferenceId());
+ }
+ }
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "abortProcessingCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_forcing_cas_to_finalstep__FINE", new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), casStateEntry.getSubordinateCasInPlayCount() });
+ }
+ casStateEntry.setReplyReceived();
+ // Force the CAS to go to the Final Step where it will be dropped
+ finalStep( new FinalStep(), casStateEntry.getCasReferenceId());
+ return true; // Done here
+ }
+ } else if ( casStateEntry.isFailed() ) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "abortProcessingCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_forcing_cas_to_finalstep__FINE", new Object[] { getComponentName(), casStateEntry.getCasReferenceId(), casStateEntry.getSubordinateCasInPlayCount() });
+ }
+ casStateEntry.setReplyReceived();
+ // move this CAS to the final step
+ finalStep( new FinalStep(), casStateEntry.getCasReferenceId());
+ return true;
+ }
+
+ } catch ( Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "abortProcessingCas", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", new Object[] { e });
+ }
+ e.printStackTrace();
+ }
+ return false;
+ }
/**
* This is a process method that is executed for CASes not created by a Multiplier in this aggregate.
*
@@ -950,6 +1009,14 @@
try {
CacheEntry entry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
+ // Check if this CAS should be aborted due to previous error on this CAS or its
+ // parent. If this is the case the method will move the CAS to the final state
+ // where it will be dropped. If the CAS is an input CAS, it will be returned to
+ // the client with an exception
+ if ( abortProcessingCas( casStateEntry, entry )) {
+ // This CAS was aborted, we are done here
+ return;
+ }
// Check if this is an input CAS from the client. If not, check if last
// delegate handling this CAS was a Cas Multiplier configured to process
// parent CAS last
@@ -1076,7 +1143,6 @@
"process", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_executing_step_input_cas__FINEST",
new Object[] {getComponentName(), aCasReferenceId });
}
-
// Execute a step in the flow. false means that this CAS has not
// been produced by CAS Multiplier
executeFlowStep(flow, aCasReferenceId, false);
@@ -1110,13 +1176,17 @@
if ( endpoint != null )
{
endpoint.setController(this);
- CacheEntry entry =
- getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
- if ( endpoint.isCasMultiplier() && endpoint.isRemote() )
+ if ( endpoint.isCasMultiplier() )
{
- entry.setCasMultiplierKey(analysisEngineKey);
+ Delegate delegateCM = lookupDelegate(analysisEngineKey);
+ delegateCM.setGeneratingChildrenFrom(aCasReferenceId, true);
+ // Record the outgoing CAS. CASes destined for remote CM are recorded
+ // in JmsOutputchannel.
+ if ( !endpoint.isRemote() ) {
+ delegateCM.addNewCasToOutstandingList(aCasReferenceId, true);
+ }
}
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
@@ -1397,6 +1467,7 @@
boolean casDropped = false;
boolean doDecrementChildCount = false;
localCache.dumpContents();
+
// First locate entries in the global and local cache for a given CAS
// If not found, log a message and return
try
@@ -1410,8 +1481,11 @@
// may still have children and will not be returned to the client until
// all of them are fully processed. This state info will aid in the
// internal bookkeeping when the final child is processed.
- casStateEntry.setState(CacheEntry.FINAL_STATE);
casStateEntry.setFinalStep(aStep);
+ casStateEntry.setState(CacheEntry.FINAL_STATE);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_in_finalstep__FINE", new Object[] { getComponentName(),casStateEntry.getCasReferenceId(),casStateEntry.getSubordinateCasInPlayCount() });
+ }
}
}
catch(Exception e)
@@ -1436,10 +1510,11 @@
}
// Check if this CAS has children that are still being processed in this aggregate
if ( casHasChildrenInPlay(casStateEntry)) {
- // save this Step object in the local cache. It will be needed when this CAS is
- // resumed when all its children are fully processed
-// casStateEntry.setFinalStep(aStep);
-
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_has_children__FINE", new Object[] { getComponentName(),casStateEntry.getCasReferenceId(),casStateEntry.getCasReferenceId(),casStateEntry.getSubordinateCasInPlayCount() });
+ }
+
+ replySentToClient = false;
return;
}
if (UIMAFramework.getLogger().isLoggable(Level.FINEST) )
@@ -1460,7 +1535,7 @@
}
// If the CAS was generated by this component but the Flow Controller wants to drop it OR this component
// is not a Cas Multiplier
- if (forceToDropTheCas(cacheEntry, aStep)) {
+ if ( forceToDropTheCas(parentCasStateEntry, cacheEntry, aStep)) {
if (casStateEntry.isReplyReceived()) {
if (isSubordinate) {
// drop the flow since we no longer need it
@@ -1480,7 +1555,7 @@
} else {
doDecrementChildCount = false;
}
- } else {
+ } else if ( !casStateEntry.isDropped() ) {
casStateEntry.setWaitingForRelease(true);
// Send a reply to the Client. If the CAS is an input CAS it will be dropped
cEndpoint = replyToClient(cacheEntry, casStateEntry);
@@ -1496,24 +1571,37 @@
} else {
// Remove entry from the local cache for this CAS. If the client
// is remote the entry was removed in replyToClient()
+ try {
+ localCache.lookupEntry(aCasReferenceId).setDropped(true);
+ } catch( Exception e) {}
localCache.remove(aCasReferenceId);
}
// If debug level=FINEST dump the entire cache
localCache.dumpContents();
}
- if ( doDecrementChildCount ) {
- if ( parentCasStateEntry == null ) {
- parentCasStateEntry = localCache.lookupEntry(casStateEntry.getInputCasReferenceId());
- }
+ if ( parentCasStateEntry == null && isSubordinate ) {
+ parentCasStateEntry = localCache.lookupEntry(casStateEntry.getInputCasReferenceId());
+ }
+ if ( doDecrementChildCount ) {
// Child CAS has been fully processed, decrement its parent count of active child CASes
if ( parentCasStateEntry != null ) {
parentCasStateEntry.decrementSubordinateCasInPlayCount();
// If debug level=FINEST dump the entire cache
localCache.dumpContents();
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "finalStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_cas_decremented_child_count__FINE", new Object[] { getComponentName(),casStateEntry.getCasReferenceId(),casStateEntry.getCasReferenceId(),casStateEntry.getSubordinateCasInPlayCount() });
+ }
}
}
+
boolean clientIsCollocated = ( cEndpoint == null || !cEndpoint.isRemote());
+
+ if ( parentCasStateEntry != null &&
+ parentCasStateEntry.getSubordinateCasInPlayCount() == 0 &&
+ parentCasStateEntry.isFailed() ) {
+ parentCasStateEntry.setReplyReceived();
+ }
// For subordinate CAS, check if its parent needs to be put in play. This should happen if
// this CAS was the last of the children in play
if ( isSubordinate && releaseParentCas(casDropped, clientIsCollocated, parentCasStateEntry) )
@@ -1541,6 +1629,7 @@
}
catch( Exception e)
{
+ e.printStackTrace();
HashMap map = new HashMap();
map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
map.put(AsynchAEMessage.CasReference, aCasReferenceId);
@@ -1608,12 +1697,15 @@
}
return retValue;
}
- private boolean forceToDropTheCas( CacheEntry cacheEntry, FinalStep aStep)
+ private boolean forceToDropTheCas( CasStateEntry entry, CacheEntry cacheEntry, FinalStep aStep)
{
// Get the key of the Cas Producer
String casProducer = cacheEntry.getCasProducerAggregateName();
// CAS is considered new from the point of view of this service IF it was produced by it
boolean isNewCas = (cacheEntry.isNewCas() && casProducer != null && getComponentName().equals(casProducer));
+ if ( entry != null && entry.isFailed() && isNewCas ) {
+ return true; // no point to continue if the CAS was produced in this aggregate and its parent failed here
+ }
// If the CAS was generated by this component but the Flow Controller wants to drop the CAS OR this component
// is not a Cas Multiplier
if ( isNewCas && ( aStep.getForceCasToBeDropped() || !isCasMultiplier()) )
@@ -1622,117 +1714,197 @@
}
return false;
}
+ private void sendReplyWithException( CacheEntry acacheEntry, CasStateEntry casStateEntry, Endpoint replyEndpoint) throws Exception {
+ //boolean casProducedInThisAggregate = getComponentName().equals(cacheEntry.getCasProducerAggregateName());
+ if ( casStateEntry.isSubordinate()) {
+ // We must reply with the input CAS
+ //casStateEntry = getLocalCache().lookupEntry(casStateEntry.getInputCasReferenceId());
+ casStateEntry = getLocalCache().getTopCasAncestor(casStateEntry.getCasReferenceId());
+ }
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendReplyWithException", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_returning_exception_to_client__FINE", new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),replyEndpoint.getEndpoint()});
+ }
+ if ( replyEndpoint.isRemote()) {
+ // this is an input CAS that has been marked as failed. Return the input CAS
+ // and an exception to the client.
+ getOutputChannel().sendReply(casStateEntry.getErrors().get(0),
+ casStateEntry.getCasReferenceId(),
+ null,
+ replyEndpoint,
+ AsynchAEMessage.Process);
+ } else {
+ replyEndpoint.setReplyEndpoint(true);
+ UimaTransport vmTransport = getTransport(replyEndpoint.getEndpoint()) ;
+ UimaMessage message =
+ vmTransport.produceMessage(AsynchAEMessage.Process,AsynchAEMessage.Response, this.getName());
+ message.addIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.Exception);
+ message.addStringProperty(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId());
+
+ Throwable wrapper = null;
+ Throwable cause = casStateEntry.getErrors().get(0);
+ if ( !(cause instanceof UimaEEServiceException) )
+ {
+ // Strip off AsyncAEException and replace with UimaEEServiceException
+ if ( cause instanceof AsynchAEException && cause.getCause() != null )
+ {
+ wrapper = new UimaEEServiceException(cause.getCause());
+ }
+ else
+ {
+ wrapper = new UimaEEServiceException(cause);
+ }
+ }
+ if ( wrapper == null )
+ {
+ message.addObjectProperty(AsynchAEMessage.Cargo, cause);
+ }
+ else
+ {
+ message.addObjectProperty(AsynchAEMessage.Cargo, wrapper);
+ }
+ vmTransport.getUimaMessageDispatcher(replyEndpoint.getEndpoint()).dispatch( message );
+ }
+ }
+
- private Endpoint replyToClient(CacheEntry cacheEntry, CasStateEntry casStateEntry ) throws Exception
- {
- Endpoint endpoint = null;
-
- // Get the endpoint that represents a client that send the request
- // to this service. If the first arg to getEndpoint() is null, the method
- // should return the origin.
- if (isTopLevelComponent())
- {
- if ( casStateEntry.isSubordinate())
- {
- endpoint = getInProcessCache().getTopAncestorEndpoint(cacheEntry);
- }
- else
- {
- endpoint = getInProcessCache().getEndpoint(null, casStateEntry.getCasReferenceId());
- }
- }
- else
- {
- endpoint = getReplyEndpoint( cacheEntry );
- dropFlow(casStateEntry.getCasReferenceId(), false);
- }
- if ( endpoint != null )
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step__FINEST", new Object[] { casStateEntry.getCasReferenceId(), (double) (System.nanoTime() - endpoint.getEntryTime()) / (double) 1000000 });
+ private boolean sendExceptionToClient( CacheEntry cacheEntry, CasStateEntry casStateEntry, Endpoint replyEndpoint ) throws Exception {
+ // Dont send CASes to the client if the input CAS is in failed state. One
+ // of the descendant CASes may have failed in one of the delegates. Any
+ // exception on descendant CAS causes the input CAS to be returned to the
+ // client with an exception but only when all its descendant CASes are
+ // accounted for and released.
+ if ( casStateEntry.isSubordinate() ) {
+
+ // Fetch the top ancestor CAS of this CAS.
+ CasStateEntry topAncestorCasStateEntry = getLocalCache().
+ getTopCasAncestor(casStateEntry.getInputCasReferenceId());
+ // check the state
+ if ( topAncestorCasStateEntry.isFailed() && topAncestorCasStateEntry.getSubordinateCasInPlayCount() == 0) {
+
+ return true;
+ } else {
+ // Add the id of the generated CAS to the map holding outstanding CASes. This
+ // map will be referenced when a client sends Free CAS Notification. The map
+ // stores the id of the CAS both as a key and a value. Map is used to facilitate
+ // quick lookup
+ cmOutstandingCASes.put(casStateEntry.getCasReferenceId(),casStateEntry.getCasReferenceId());
}
- if (endpoint.getEndpoint() == null)
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_reply_destination__INFO", new Object[] { casStateEntry.getCasReferenceId() });
+ } else if ( casStateEntry.isFailed()) {
+ return true;
+ }
+ return false;
+ }
+ private void sendReplyToRemoteClient( CacheEntry cacheEntry, CasStateEntry casStateEntry, Endpoint replyEndpoint) throws Exception {
+ if ( sendExceptionToClient(cacheEntry, casStateEntry, replyEndpoint)) {
+ sendReplyWithException(cacheEntry, casStateEntry, replyEndpoint);
+ } else {
+ // Send response to a given endpoint
+ getOutputChannel().sendReply(cacheEntry, replyEndpoint);
+ // Drop the CAS only if the client is remote and the CAS is an input CAS.
+ // If this CAS has a parent the client will send Release CAS notification to release the CAS.
+ if ( !casStateEntry.isSubordinate() )
+ {
+ dropCAS(casStateEntry.getCasReferenceId(), true);
+ // If the cache is empty change the state of the Aggregate to idle
+ if ( getInProcessCache().isEmpty() )
+ {
+ endProcess(AsynchAEMessage.Process);
}
- HashMap map = new HashMap();
- map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
- map.put(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId());
- handleError(map, new UnknownDestinationException());
+ }
+ }
+ }
- }
- // Dont send a reply to the client if the client is a CAS multiplier
- else if ( !endpoint.isCasMultiplier() )
- {
- endpoint.setFinal(true);
-
- if ( !isStopped() )
- {
- if ( endpoint.isRemote())
- {
- // Check if this CAS is new, meaning it has a parent and this component is a Cas Multiplier
- if ( casStateEntry.isSubordinate() && isCasMultiplier()) {
- // Add the generated CAS to the outstanding CAS Map. Client notification will release
- // this CAS back to its pool
- synchronized(syncObject)
- {
- cmOutstandingCASes.put(casStateEntry.getCasReferenceId(),casStateEntry.getCasReferenceId());
- }
- }
- if ( casStateEntry.isSubordinate()) {
- CasStateEntry topAncestorCasStateEntry = getLocalCache().
- getTopCasAncestor(casStateEntry.getInputCasReferenceId());
- }
- // Send response to a given endpoint
- getOutputChannel().sendReply(cacheEntry, endpoint);
- } else {
- int mType = AsynchAEMessage.Response;
- // Check if the CAS was produced in this aggregate by any of its delegates
- // If so, send the CAS as a request. Otherwise, the CAS is an input CAS and
- // needs to return as reply.
- if ( casStateEntry.isSubordinate() &&
- isCasMultiplier() &&
- getComponentName().equals(cacheEntry.getCasProducerAggregateName()) ) {
- // this is a Cas Multiplier, send this CAS to the client in a request message.
- mType = AsynchAEMessage.Request;
- // Return the CAS to the colocated client. First make sure that this CAS
- // is associated with the input CAS. This CAS may have been produced from
- // an intermediate CAS (which was produced from the input CAS). From the
- // client perspective, this Cas Multiplier Aggregate is a black box,
- // all CASes produced here must be linked with the input CAS.
- // Find the top ancestor of this CAS. It is the input CAS sent by the client
- String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
- // Modify the parent of this CAS.
- if ( inputCasId != null && !inputCasId.equals(casStateEntry.getInputCasReferenceId())) {
- casStateEntry.setInputCasReferenceId(inputCasId);
- cacheEntry.setInputCasReferenceId(inputCasId);
- }
- }
- sendVMMessage(mType, endpoint, cacheEntry);
- }
- }
- }
- // Drop the CAS only if the client is remote and the CAS is an input CAS.
- // If this CAS has a parent the client will send Release CAS notification to release the CAS.
- if ( endpoint.isRemote() && !casStateEntry.isSubordinate())
- {
- dropCAS(casStateEntry.getCasReferenceId(), true);
- // If the cache is empty change the state of the Aggregate to idle
- if ( getInProcessCache().isEmpty() )
- {
- endProcess(AsynchAEMessage.Process);
- }
- }
- }
- else
- {
+ private void sendReplyToCollocatedClient( CacheEntry cacheEntry, CasStateEntry casStateEntry, Endpoint replyEndpoint) throws Exception {
+ boolean casProducedInThisAggregate = getComponentName().equals(cacheEntry.getCasProducerAggregateName());
+ String componentName = getComponentName();
+
+ boolean isSubordinate = casStateEntry.isSubordinate();
+ boolean serviceIsCM = isCasMultiplier();
+ if ( sendExceptionToClient(cacheEntry, casStateEntry, replyEndpoint) ) {
+ try {
+ sendReplyWithException(cacheEntry, casStateEntry, replyEndpoint);
+ } catch (Exception e) {
+ } finally {
+ if ( casProducedInThisAggregate ) {
+ // Drop the CAS generated in this Aggregate
+ dropCAS(casStateEntry.getCasReferenceId(), true);
+ }
+ }
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "sendReplyToCollocatedClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_sending_reply_to_client__FINE", new Object[] { getComponentName(), casStateEntry.getCasReferenceId(),replyEndpoint.getEndpoint()});
+ }
+ int mType = AsynchAEMessage.Response;
+ // Check if the CAS was produced in this aggregate by any of its delegates
+ // If so, send the CAS as a request. Otherwise, the CAS is an input CAS and
+ // needs to return as reply.
+ if ( isSubordinate && serviceIsCM && casProducedInThisAggregate ) {
+ // this is a Cas Multiplier, send this CAS to the client in a request message.
+ mType = AsynchAEMessage.Request;
+ // Return the CAS to the colocated client. First make sure that this CAS
+ // is associated with the input CAS. This CAS may have been produced from
+ // an intermediate CAS (which was produced from the input CAS). From the
+ // client perspective, this Cas Multiplier Aggregate is a black box,
+ // all CASes produced here must be linked with the input CAS.
+ // Find the top ancestor of this CAS. It is the input CAS sent by the client
+ String inputCasId = getLocalCache().lookupInputCasReferenceId(casStateEntry);
+ // Modify the parent of this CAS.
+ if ( inputCasId != null && !inputCasId.equals(casStateEntry.getInputCasReferenceId())) {
+ casStateEntry.setInputCasReferenceId(inputCasId);
+ cacheEntry.setInputCasReferenceId(inputCasId);
+ }
+ }
+ // Send CAS to a given reply endpoint
+ sendVMMessage(mType, replyEndpoint, cacheEntry);
+ }
+ }
+
+
+ private boolean validEndpoint( Endpoint endpoint, CasStateEntry casStateEntry) {
+ if ( endpoint == null ) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_client_endpoint_not_found__INFO", new Object[] { getComponentName(), casStateEntry.getCasReferenceId() });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "validEndpoint", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_client_endpoint_not_found__INFO", new Object[] { getComponentName(), casStateEntry.getCasReferenceId() });
+ }
+ return false;
+ }
+ if (endpoint.getEndpoint() == null)
+ {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "validEndpoint", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_reply_destination__INFO", new Object[] { casStateEntry.getCasReferenceId() });
+ }
+ HashMap map = new HashMap();
+ map.put(AsynchAEMessage.Command, AsynchAEMessage.Process);
+ map.put(AsynchAEMessage.CasReference, casStateEntry.getCasReferenceId());
+ handleError(map, new UnknownDestinationException());
+ return false;
+ }
+ // Dont send a reply to the client if the client is a CAS multiplier
+ if ( endpoint.isCasMultiplier() ) {
+ return false;
+ }
+
+ return true;
+ }
+ private Endpoint replyToClient(CacheEntry cacheEntry, CasStateEntry casStateEntry ) throws Exception
+ {
+ Endpoint endpoint = getReplyEndpoint(cacheEntry, casStateEntry);
+ if ( !validEndpoint(endpoint, casStateEntry)) {
+ return null; // the reason has already been logged
+ }
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
+ "replyToClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_final_step__FINEST", new Object[] { casStateEntry.getCasReferenceId(), (double) (System.nanoTime() - endpoint.getEntryTime()) / (double) 1000000 });
+ }
+ endpoint.setFinal(true);
+ if ( !isStopped() ) {
+ if ( endpoint.isRemote()) {
+ sendReplyToRemoteClient(cacheEntry, casStateEntry, endpoint);
+ } else {
+ sendReplyToCollocatedClient(cacheEntry, casStateEntry, endpoint);
}
-
}
return endpoint;
}
@@ -1760,6 +1932,32 @@
// Send reply back to the client. Use internal (non-jms) transport
transport.getUimaMessageDispatcher(endpoint.getEndpoint()).dispatch(message);
}
+
+
+ private Endpoint getReplyEndpoint(CacheEntry cacheEntry, CasStateEntry casStateEntry ) throws Exception {
+ Endpoint endpoint = null;
+ // Get the endpoint that represents a client that send the request
+ // to this service. If the first arg to getEndpoint() is null, the method
+ // should return the origin.
+ if (isTopLevelComponent())
+ {
+ if ( casStateEntry.isSubordinate())
+ {
+ endpoint = getInProcessCache().getTopAncestorEndpoint(cacheEntry);
+ }
+ else
+ {
+ endpoint = getInProcessCache().getEndpoint(null, casStateEntry.getCasReferenceId());
+ }
+ }
+ else
+ {
+ endpoint = getReplyEndpoint( cacheEntry );
+ dropFlow(casStateEntry.getCasReferenceId(), false);
+ }
+ return endpoint;
+ }
+
private Endpoint getReplyEndpoint(CacheEntry cacheEntry) throws Exception
{
if ( cacheEntry == null )