You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/02/16 19:06:24 UTC
svn commit: r1071337 -
/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
Author: cwiklik
Date: Wed Feb 16 18:06:24 2011
New Revision: 1071337
URL: http://svn.apache.org/viewvc?rev=1071337&view=rev
Log:
UIMA-2055 Modified to work with new/refactored JmsOutputChannel
Modified:
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java?rev=1071337&r1=1071336&r2=1071337&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AggregateAnalysisEngineController_impl.java Wed Feb 16 18:06:24 2011
@@ -481,7 +481,7 @@ public class AggregateAnalysisEngineCont
// Send reply back to the client. Use internal (non-jms) transport
transport.getUimaMessageDispatcher(aClientEndpoint.getEndpoint()).dispatch(message);
} else {
- getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, aClientEndpoint);
+ getOutputChannel().sendReply(AsynchAEMessage.CollectionProcessComplete, aClientEndpoint, null, false);
}
clearStats();
@@ -610,7 +610,7 @@ public class AggregateAnalysisEngineCont
}
}
} else {
- getOutputChannel().sendRequest(AsynchAEMessage.CollectionProcessComplete, endpoint);
+ getOutputChannel().sendRequest(AsynchAEMessage.CollectionProcessComplete, null, endpoint);
endpoint.startCollectionProcessCompleteTimer();
}
}
@@ -1008,8 +1008,12 @@ public class AggregateAnalysisEngineCont
// If the delegate CM is a remote, send a Free CAS notification
if (delegateCM.getEndpoint().isRemote()) {
parentCasStateEntry.getFreeCasNotificationEndpoint().setCommand(AsynchAEMessage.Stop);
+ Endpoint fcEndpoint = parentCasStateEntry.getFreeCasNotificationEndpoint();
+ fcEndpoint.setReplyEndpoint(true);
+ fcEndpoint.setIsCasMultiplier(true);
+ fcEndpoint.setFreeCasEndpoint(true);
getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, entry.getCasReferenceId(),
- parentCasStateEntry.getFreeCasNotificationEndpoint());
+ fcEndpoint);
}
// Check if a request to stop generation of new CASes from the parent of
// this CAS has been sent to the CM. The Delegate object keeps track of
@@ -1136,7 +1140,7 @@ public class AggregateAnalysisEngineCont
// Remove a delegate endpoint from the single step list cached in the CAS entry
Endpoint endpoint = (Endpoint_impl) entry.getDelayedSingleStepList().remove(0);
// send the CAS to a collocated delegate from the delayed single step list.
- dispatchProcessRequest(aCasReferenceId, endpoint, true);
+ dispatchProcessRequest(entry, endpoint, true);
}
} catch (Exception e) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -1244,13 +1248,13 @@ public class AggregateAnalysisEngineCont
String analysisEngineKey = aStep.getAnalysisEngineKey();
// Find the endpoint for the delegate
endpoint = lookUpEndpoint(analysisEngineKey, true);
+ CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
if (endpoint != null) {
endpoint.setController(this);
CasStateEntry casStateEntry = getLocalCache().lookupEntry(aCasReferenceId);
casStateEntry.resetReplyReceived();
if (enableCasLogMap!=null && enableCasLogMap.containsKey(analysisEngineKey)) {
// Get a CAS
- CacheEntry cacheEntry = getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
CAS cas = cacheEntry.getCas();
logCasForEndpoint(analysisEngineKey, cas);
}
@@ -1287,7 +1291,7 @@ public class AggregateAnalysisEngineCont
"UIMAEE_next_step_dispatch__FINEST",
new Object[] { getComponentName(), analysisEngineKey, aCasReferenceId });
}
- dispatchProcessRequest(aCasReferenceId, endpoint, true);
+ dispatchProcessRequest(cacheEntry, endpoint, true);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"simpleStep", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
@@ -1431,7 +1435,7 @@ public class AggregateAnalysisEngineCont
casStateEntry.setNumberOfParallelDelegates(endpoints.length);
}
// Dispatch CAS to remote parallel delegates
- dispatchProcessRequest(aCasReferenceId, endpoints, true);
+ dispatchProcessRequest(cacheEntry, endpoints, true);
} else {
// All delegates in a parallel step are co-located. Send the CAS
// to the first delegate in the single step list.
@@ -1836,8 +1840,10 @@ public class AggregateAnalysisEngineCont
}
// freeCasEndpoint is a special endpoint for sending Free CAS Notification.
if (casDropped && freeCasEndpoint != null) {
- freeCasEndpoint.setReplyEndpoint(true);
try {
+ freeCasEndpoint.setReplyEndpoint(true);
+ freeCasEndpoint.setIsCasMultiplier(true);
+ freeCasEndpoint.setFreeCasEndpoint(true);
// send Free CAS Notification to a Cas Multiplier
getOutputChannel().sendRequest(AsynchAEMessage.ReleaseCAS, aCasReferenceId,
freeCasEndpoint);
@@ -2343,13 +2349,13 @@ public class AggregateAnalysisEngineCont
}
}
- private void dispatch(String aCasReferenceId, Endpoint anEndpoint) throws AsynchAEException {
+ private void dispatch(CacheEntry entry, Endpoint anEndpoint) throws AsynchAEException {
if (!anEndpoint.isRemote()) {
try {
UimaTransport transport = getTransport(anEndpoint.getEndpoint());
UimaMessage message = transport.produceMessage(AsynchAEMessage.Process,
AsynchAEMessage.Request, getName());
- message.addStringProperty(AsynchAEMessage.CasReference, aCasReferenceId);
+ message.addStringProperty(AsynchAEMessage.CasReference, entry.getCasReferenceId());
transport.getUimaMessageDispatcher(anEndpoint.getEndpoint()).dispatch(message);
} catch (Exception e) {
@@ -2370,9 +2376,9 @@ public class AggregateAnalysisEngineCont
// is in this state, delay CASes by placing them on a list of
// CASes pending dispatch. Once the ping reply is received all
// delayed CASes will be dispatched to the delegate.
- if (!delayCasIfDelegateInTimedOutState(aCasReferenceId, anEndpoint.getDelegateKey())) {
+ if (!delayCasIfDelegateInTimedOutState(entry.getCasReferenceId(), anEndpoint.getDelegateKey())) {
// The delegate is in the normal state so send it this CAS
- getOutputChannel().sendRequest(aCasReferenceId, anEndpoint);
+ getOutputChannel().sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), anEndpoint);
}
}
}
@@ -2406,13 +2412,13 @@ public class AggregateAnalysisEngineCont
return false; // Cas Not Delayed
}
- private void dispatchProcessRequest(String aCasReferenceId, Endpoint anEndpoint,
+ private void dispatchProcessRequest(CacheEntry entry, Endpoint anEndpoint,
boolean addEndpointToCache) throws AsynchAEException {
if (addEndpointToCache) {
- getInProcessCache().addEndpoint(anEndpoint, aCasReferenceId);
+ getInProcessCache().addEndpoint(anEndpoint, entry.getCasReferenceId());
}
anEndpoint.setController(this);
- dispatch(aCasReferenceId, anEndpoint);
+ dispatch(entry, anEndpoint);
}
@@ -2439,29 +2445,38 @@ public class AggregateAnalysisEngineCont
endpoint = lookUpEndpoint(key, true);
getInProcessCache().addEndpoint(endpoint, aCasReferenceId);
}
- dispatchProcessRequest(aCasReferenceId, endpoint, addEndpointToCache);
+ if ( getInProcessCache().entryExists(aCasReferenceId) ) {
+ dispatchProcessRequest(getInProcessCache().getCacheEntryForCAS(aCasReferenceId), endpoint, addEndpointToCache);
+ }
}
- private void dispatchProcessRequest(String aCasReferenceId, Endpoint[] anEndpointList,
+ private void dispatchProcessRequest(CacheEntry entry, Endpoint[] anEndpointList,
boolean addEndpointToCache) throws AsynchAEException {
List<Endpoint> endpointList = new ArrayList<Endpoint>();
for (int i = 0; i < anEndpointList.length; i++) {
// Check if the delegate previously timed out. If so, add the CAS
// Id to the list pending dispatch. This list holds CASes that are
// delayed until the service responds to a Ping.
- if (delayCasIfDelegateInTimedOutState(aCasReferenceId, anEndpointList[i].getEndpoint())) {
+ if (delayCasIfDelegateInTimedOutState(entry.getCasReferenceId(), anEndpointList[i].getEndpoint())) {
// The CAS was delayed until the delegate responds to a Ping
continue;
} else {
endpointList.add(anEndpointList[i]);
}
if (addEndpointToCache) {
- getInProcessCache().addEndpoint(anEndpointList[i], aCasReferenceId);
+ getInProcessCache().addEndpoint(anEndpointList[i], entry.getCasReferenceId());
}
}
Endpoint[] endpoints = new Endpoint[endpointList.size()];
endpointList.toArray(endpoints);
- getOutputChannel().sendRequest(aCasReferenceId, endpoints);
+ for (Endpoint endpoint : endpoints) {
+ if (endpoint.isRemote()) {
+ // In Parallel Step serialization must be xmi.Binary serialization doesnt support merge.
+ endpoint.setSerializer("xmi");
+ getOutputChannel().sendRequest(AsynchAEMessage.Process, entry.getCasReferenceId(), endpoint);
+ }
+ }
+
}
public boolean isDelegateKeyValid(String aDelegateKey) {
@@ -2813,7 +2828,6 @@ public class AggregateAnalysisEngineCont
throw new AsynchAEException("Controller:" + getComponentName()
+ " Unable To Dispatch GetMeta Request. Provided Endpoint is Invalid (NULL)");
}
- anEndpoint.startMetadataRequestTimer();
anEndpoint.setController(this);
anEndpoint.setWaitingForResponse(true);
String key = lookUpDelegateKey(anEndpoint.getEndpoint());
@@ -2839,7 +2853,7 @@ public class AggregateAnalysisEngineCont
}
}
- getOutputChannel().sendRequest(AsynchAEMessage.GetMeta, anEndpoint);
+ getOutputChannel().sendRequest(AsynchAEMessage.GetMeta, null, anEndpoint);
}
public void retryMetadataRequest(Endpoint anEndpoint) throws AsynchAEException {
@@ -2915,11 +2929,7 @@ public class AggregateAnalysisEngineCont
public void retryLastCommand(int aCommand, Endpoint anEndpoint, String aCasReferenceId) {
try {
- if (AsynchAEMessage.Process == aCommand) {
- getOutputChannel().sendRequest(aCasReferenceId, anEndpoint);
- } else {
- getOutputChannel().sendRequest(aCommand, anEndpoint);
- }
+ getOutputChannel().sendRequest(aCommand, aCasReferenceId, anEndpoint);
} catch (AsynchAEException e) {
}