You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2010/09/09 15:35:52 UTC
svn commit: r995424 -
/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
Author: cwiklik
Date: Thu Sep 9 13:35:52 2010
New Revision: 995424
URL: http://svn.apache.org/viewvc?rev=995424&view=rev
Log:
UIMA-1867 Modified to release a CAS if it is known that the client temp queue was deleted
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=995424&r1=995423&r2=995424&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Thu Sep 9 13:35:52 2010
@@ -1992,19 +1992,57 @@ public class AggregateAnalysisEngineCont
private void sendReplyToRemoteClient(CacheEntry cacheEntry, CasStateEntry casStateEntry,
Endpoint replyEndpoint) throws Exception {
+ // Check if the client endpoint is still available. It may have previously been
+ // marked as dead and now exists in the Dead Client Map.
+ if ( replyEndpoint.getDestination() != null &&
+ isClientDead(replyEndpoint.getDestination().toString() )) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "sendReplyToRemoteClient",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_client_dead__FINE",
+ new Object[] { getComponentName(), replyEndpoint.getDestination().toString(), casStateEntry.getCasReferenceId()});
+ }
+
+
+ dropCAS(casStateEntry.getCasReferenceId(), true);
+ // If the cache is empty change the state of the Aggregate to idle
+ if (getInProcessCache().isEmpty()) {
+ endProcess(AsynchAEMessage.Process);
+ }
+ return;
+ }
+
+
if (sendExceptionToClient(cacheEntry, casStateEntry, replyEndpoint)) {
sendReplyWithException(cacheEntry, casStateEntry, replyEndpoint);
} else {
// Send response to a given endpoint
getOutputChannel().sendReply(cacheEntry, replyEndpoint);
- // Drop the CAS only if the client is remote and the CAS is an input CAS.
+ // Drop the CAS only if the client is remote and the CAS is an input CAS OR
+ // the CAS is a child but there was a failure delivering it to a client. The client
+ // may have terminated while its input CAS was being processed, for example.
// If this CAS has a parent the client will send Release CAS notification to release the CAS.
- if (!casStateEntry.isSubordinate()) {
- dropCAS(casStateEntry.getCasReferenceId(), true);
- // If the cache is empty change the state of the Aggregate to idle
- if (getInProcessCache().isEmpty()) {
- endProcess(AsynchAEMessage.Process);
- }
+ if (!casStateEntry.isSubordinate() || (casStateEntry.isSubordinate() && isCasMultiplier() && casStateEntry.deliveryToClientFailed() )) {
+ if ( cmOutstandingCASes.containsKey(casStateEntry.getCasReferenceId())) {
+ cmOutstandingCASes.remove(casStateEntry.getCasReferenceId());
+ }
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "sendReplyToRemoteClient",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_client_dead__FINE",
+ new Object[] { getComponentName(), replyEndpoint.getDestination().toString(), casStateEntry.getCasReferenceId()});
+ }
+ dropCAS(casStateEntry.getCasReferenceId(), true);
+ // If the cache is empty change the state of the Aggregate to idle
+ if (getInProcessCache().isEmpty()) {
+ endProcess(AsynchAEMessage.Process);
+ }
}
}
}
@@ -2136,7 +2174,15 @@ public class AggregateAnalysisEngineCont
}
return endpoint;
}
-
+ /**
+ Check if a given destination exists in the DeadClient Map
+ **/
+ private boolean isClientDead(String destination) {
+ if ( super.deadClientDestinationMap.containsKey(destination)) {
+ return true;
+ }
+ return false;
+ }
private void sendVMMessage(int messageType, Endpoint endpoint, CacheEntry cacheEntry)
throws Exception {
// If the CAS was produced by this aggregate send the request message to the client