You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/02 17:24:00 UTC
svn commit: r810560 [2/4] - in
/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae:
handler/ handler/input/ jmx/
Modified: incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=810560&r1=810559&r2=810560&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Wed Sep 2 15:23:59 2009
@@ -47,161 +47,162 @@
import org.apache.uima.cas.impl.XmiSerializationSharedData;
import org.apache.uima.util.Level;
-public class ProcessRequestHandler_impl extends HandlerBase
-{
- private static final Class CLASS_NAME = ProcessRequestHandler_impl.class;
- private Object mux = new Object();
+public class ProcessRequestHandler_impl extends HandlerBase {
+ private static final Class CLASS_NAME = ProcessRequestHandler_impl.class;
+
+ private Object mux = new Object();
+
private UimaSerializer uimaSerializer = new UimaSerializer();
- public ProcessRequestHandler_impl(String aName)
- {
- super(aName);
- }
-
- private void cacheStats(String aCasReferenceId, long aTimeWaitingForCAS, long aTimeToDeserializeCAS )
- throws Exception
- {
- CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
- entry.incrementTimeWaitingForCAS( aTimeWaitingForCAS);
- entry.incrementTimeToDeserializeCAS(aTimeToDeserializeCAS);
- }
- private boolean messageContainsXMI(MessageContext aMessageContext, String casReferenceId) throws Exception
- {
- // Fetch serialized CAS from the message
- String xmi = aMessageContext.getStringMessage();
- // *****************************************************************
- // ***** NO XMI In Message. Kick this back to sender with exception
- // *****************************************************************
- if ( xmi == null )
- {
+ public ProcessRequestHandler_impl(String aName) {
+ super(aName);
+ }
+
+ private void cacheStats(String aCasReferenceId, long aTimeWaitingForCAS,
+ long aTimeToDeserializeCAS) throws Exception {
+ CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+ entry.incrementTimeWaitingForCAS(aTimeWaitingForCAS);
+ entry.incrementTimeToDeserializeCAS(aTimeToDeserializeCAS);
+ }
+
+ private boolean messageContainsXMI(MessageContext aMessageContext, String casReferenceId)
+ throws Exception {
+ // Fetch serialized CAS from the message
+ String xmi = aMessageContext.getStringMessage();
+ // *****************************************************************
+ // ***** NO XMI In Message. Kick this back to sender with exception
+ // *****************************************************************
+ if (xmi == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_message_has_no_cargo__INFO",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_message_has_no_cargo__INFO",
+ new Object[] { aMessageContext.getEndpoint().getEndpoint() });
}
CasStateEntry stateEntry = null;
String parentCasReferenceId = null;
- try {
- stateEntry = getController().getLocalCache().lookupEntry(casReferenceId);
- if ( stateEntry != null && stateEntry.isSubordinate()) {
- CasStateEntry topParentEntry = getController().getLocalCache().getTopCasAncestor(casReferenceId);
- parentCasReferenceId = topParentEntry.getCasReferenceId();
- }
- } catch ( Exception e){}
-
- getController().
- getOutputChannel().
- sendReply(new InvalidMessageException("No XMI data in message"), casReferenceId,parentCasReferenceId, aMessageContext.getEndpoint(),AsynchAEMessage.Process);
- // Dont process this empty message
- return false;
- }
- return true;
- }
- private synchronized CAS getCAS( boolean fetchCASFromShadowCasPool, String shadowCasPoolKey, String casReceivedFrom )
- {
- CAS cas = null;
- // If this is a new CAS (generated by a CM), fetch a CAS from a Shadow Cas Pool associated with a CM that
- // produced the CAS. Each CM will have its own Shadow Cas Pool
- if ( fetchCASFromShadowCasPool )
- {
+ try {
+ stateEntry = getController().getLocalCache().lookupEntry(casReferenceId);
+ if (stateEntry != null && stateEntry.isSubordinate()) {
+ CasStateEntry topParentEntry = getController().getLocalCache().getTopCasAncestor(
+ casReferenceId);
+ parentCasReferenceId = topParentEntry.getCasReferenceId();
+ }
+ } catch (Exception e) {
+ }
+
+ getController().getOutputChannel().sendReply(
+ new InvalidMessageException("No XMI data in message"), casReferenceId,
+ parentCasReferenceId, aMessageContext.getEndpoint(), AsynchAEMessage.Process);
+ // Dont process this empty message
+ return false;
+ }
+ return true;
+ }
+
+ private synchronized CAS getCAS(boolean fetchCASFromShadowCasPool, String shadowCasPoolKey,
+ String casReceivedFrom) {
+ CAS cas = null;
+ // If this is a new CAS (generated by a CM), fetch a CAS from a Shadow Cas Pool associated with
+ // a CM that
+ // produced the CAS. Each CM will have its own Shadow Cas Pool
+ if (fetchCASFromShadowCasPool) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "getCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_cm__FINE",
- new Object[] { shadowCasPoolKey });
- }
- // Aggregate time spent waiting for a CAS in the shadow cas pool
- ((AggregateAnalysisEngineController)getController()).getDelegateServicePerformance(shadowCasPoolKey).beginWaitOnShadowCASPool();
- cas = getController().getCasManagerWrapper().getNewCas(shadowCasPoolKey);
- ((AggregateAnalysisEngineController)getController()).getDelegateServicePerformance(shadowCasPoolKey).endWaitOnShadowCASPool();
-
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "getCAS",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_cm__FINE",
+ new Object[] { shadowCasPoolKey });
+ }
+ // Aggregate time spent waiting for a CAS in the shadow cas pool
+ ((AggregateAnalysisEngineController) getController()).getDelegateServicePerformance(
+ shadowCasPoolKey).beginWaitOnShadowCASPool();
+ cas = getController().getCasManagerWrapper().getNewCas(shadowCasPoolKey);
+ ((AggregateAnalysisEngineController) getController()).getDelegateServicePerformance(
+ shadowCasPoolKey).endWaitOnShadowCASPool();
+
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "getCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted_cm__FINE",
- new Object[] { shadowCasPoolKey });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "getCAS",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted_cm__FINE",
+ new Object[] { shadowCasPoolKey });
+ }
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "getCAS",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas__FINE",
+ new Object[] { casReceivedFrom });
}
- }
- else
- {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "getCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas__FINE",
- new Object[] { casReceivedFrom });
- }
- // Aggregate time spent waiting for a CAS in the service cas pool
- getController().getServicePerformance().beginWaitOnCASPool();
-
- cas = getController().getCasManagerWrapper().getNewCas();
- getController().getServicePerformance().endWaitOnCASPool();
- ServicePerformance sp = getController().getServicePerformance();
- sp.incrementCasPoolWaitTime(sp.getTimeWaitingForCAS());
-
+ // Aggregate time spent waiting for a CAS in the service cas pool
+ getController().getServicePerformance().beginWaitOnCASPool();
+
+ cas = getController().getCasManagerWrapper().getNewCas();
+ getController().getServicePerformance().endWaitOnCASPool();
+ ServicePerformance sp = getController().getServicePerformance();
+ sp.incrementCasPoolWaitTime(sp.getTimeWaitingForCAS());
+
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "getCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted__FINE",
- new Object[] { casReceivedFrom });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "getCAS",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted__FINE",
+ new Object[] { casReceivedFrom });
}
- }
- return cas;
- }
- /**
- *
- * @param casReferenceId
- * @param freeCasEndpoint
- * @param shadowCasPoolKey
- * @param aMessageContext
- * @return
- * @throws Exception
- */
- private CacheEntry deserializeCASandRegisterWithCache( String casReferenceId, Endpoint freeCasEndpoint, String shadowCasPoolKey, MessageContext aMessageContext)
- throws Exception
- {
- long inTime = System.nanoTime();
- boolean casRegistered = false;
-
-
- // Time how long we wait on Cas Pool to fetch a new CAS
- long t1 = getController().getCpuTime();
- // *************************************************************************
- // Fetch CAS from a Cas Pool. If the CAS came from a Cas Multiplier
- // fetch the CAS from a shadow CAS pool. Otherwise, fetch the CAS
- // from the service CAS Pool.
- // *************************************************************************
+ }
+ return cas;
+ }
+
+ /**
+ *
+ * @param casReferenceId
+ * @param freeCasEndpoint
+ * @param shadowCasPoolKey
+ * @param aMessageContext
+ * @return
+ * @throws Exception
+ */
+ private CacheEntry deserializeCASandRegisterWithCache(String casReferenceId,
+ Endpoint freeCasEndpoint, String shadowCasPoolKey, MessageContext aMessageContext)
+ throws Exception {
+ long inTime = System.nanoTime();
+ boolean casRegistered = false;
+
+ // Time how long we wait on Cas Pool to fetch a new CAS
+ long t1 = getController().getCpuTime();
+ // *************************************************************************
+ // Fetch CAS from a Cas Pool. If the CAS came from a Cas Multiplier
+ // fetch the CAS from a shadow CAS pool. Otherwise, fetch the CAS
+ // from the service CAS Pool.
+ // *************************************************************************
Endpoint endpoint = aMessageContext.getEndpoint();
- CAS cas = getCAS(aMessageContext.propertyExists(AsynchAEMessage.CasSequence), shadowCasPoolKey,endpoint.getEndpoint());
- long timeWaitingForCAS = getController().getCpuTime() - t1;
- // Check if we are still running
- if ( getController().isStopped() )
- {
- // The Controller is in shutdown state.
- getController().dropCAS(cas);
- return null;
- }
- // *************************************************************************
- // Deserialize CAS from the message
- // *************************************************************************
- t1 = getController().getCpuTime();
- String serializationStrategy = endpoint.getSerializer();
- XmiSerializationSharedData deserSharedData = null;
- CacheEntry entry = null;
- if ( serializationStrategy.equals("xmi"))
- {
- // Fetch serialized CAS from the message
- String xmi = aMessageContext.getStringMessage();
- deserSharedData = new XmiSerializationSharedData();
-// UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
+ CAS cas = getCAS(aMessageContext.propertyExists(AsynchAEMessage.CasSequence), shadowCasPoolKey,
+ endpoint.getEndpoint());
+ long timeWaitingForCAS = getController().getCpuTime() - t1;
+ // Check if we are still running
+ if (getController().isStopped()) {
+ // The Controller is in shutdown state.
+ getController().dropCAS(cas);
+ return null;
+ }
+ // *************************************************************************
+ // Deserialize CAS from the message
+ // *************************************************************************
+ t1 = getController().getCpuTime();
+ String serializationStrategy = endpoint.getSerializer();
+ XmiSerializationSharedData deserSharedData = null;
+ CacheEntry entry = null;
+ if (serializationStrategy.equals("xmi")) {
+ // Fetch serialized CAS from the message
+ String xmi = aMessageContext.getStringMessage();
+ deserSharedData = new XmiSerializationSharedData();
+ // UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
- }
- else if ( serializationStrategy.equals("binary"))
- {
- // *************************************************************************
- // Register the CAS with a local cache
- // *************************************************************************
- //CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, casReferenceId);
- byte[] binarySource = aMessageContext.getByteMessage();
-// UimaSerializer.deserializeCasFromBinary(binarySource, cas);
+ } else if (serializationStrategy.equals("binary")) {
+ // *************************************************************************
+ // Register the CAS with a local cache
+ // *************************************************************************
+ // CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext,
+ // deserSharedData, casReferenceId);
+ byte[] binarySource = aMessageContext.getByteMessage();
+ // UimaSerializer.deserializeCasFromBinary(binarySource, cas);
uimaSerializer.deserializeCasFromBinary(binarySource, cas);
- }
+ }
// *************************************************************************
// Check and set up for Delta CAS reply
@@ -209,780 +210,750 @@
boolean acceptsDeltaCas = false;
Marker marker = null;
if (aMessageContext.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) {
- acceptsDeltaCas = aMessageContext.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
- if (acceptsDeltaCas ) {
- marker = cas.createMarker();
- }
+ acceptsDeltaCas = aMessageContext.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
+ if (acceptsDeltaCas) {
+ marker = cas.createMarker();
+ }
}
- // *************************************************************************
- // Register the CAS with a local cache
- // *************************************************************************
- //CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, casReferenceId);
- entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, casReferenceId, marker, acceptsDeltaCas);
+ // *************************************************************************
+ // Register the CAS with a local cache
+ // *************************************************************************
+ // CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext,
+ // deserSharedData, casReferenceId);
+ entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData,
+ casReferenceId, marker, acceptsDeltaCas);
long timeToDeserializeCAS = getController().getCpuTime() - t1;
- getController().incrementDeserializationTime(timeToDeserializeCAS);
- LongNumericStatistic statistic;
- if ( (statistic = getController().getMonitor().getLongNumericStatistic("",Monitor.TotalDeserializeTime)) != null )
- {
- statistic.increment(timeToDeserializeCAS);
- }
+ getController().incrementDeserializationTime(timeToDeserializeCAS);
+ LongNumericStatistic statistic;
+ if ((statistic = getController().getMonitor().getLongNumericStatistic("",
+ Monitor.TotalDeserializeTime)) != null) {
+ statistic.increment(timeToDeserializeCAS);
+ }
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialize_cas_time_FINE",
- new Object[] { (double)timeToDeserializeCAS / 1000000.0});
+ "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_deserialize_cas_time_FINE",
+ new Object[] { (double) timeToDeserializeCAS / 1000000.0 });
+ }
+
+ // Update Stats
+ ServicePerformance casStats = getController().getCasStatistics(casReferenceId);
+ casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
+ if (getController().isTopLevelComponent()) {
+ synchronized (mux) {
+ getController().getServicePerformance().incrementCasDeserializationTime(
+ timeToDeserializeCAS);
+ }
}
+ getController().saveTime(inTime, casReferenceId, getController().getName());
- // Update Stats
- ServicePerformance casStats = getController().getCasStatistics(casReferenceId);
- casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
- if ( getController().isTopLevelComponent() )
- {
- synchronized( mux )
- {
- getController().getServicePerformance().incrementCasDeserializationTime(timeToDeserializeCAS);
- }
- }
- getController().saveTime(inTime, casReferenceId, getController().getName());
-
- if ( getController() instanceof AggregateAnalysisEngineController )
- {
- // If the message came from a Cas Multiplier, associate the input/parent CAS id with this CAS
- if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
- {
- // Fetch parent CAS id
- String inputCasReferenceId =
- aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
- if ( shadowCasPoolKey != null )
- {
- // Save the key of the Cas Multiplier in the cache. It will be now known which Cas Multiplier produced this CAS
- entry.setCasProducerKey(shadowCasPoolKey);
- }
- // associate this subordinate CAS with the parent CAS
- entry.setInputCasReferenceId(inputCasReferenceId);
- // Save a Cas Multiplier endpoint where a Free CAS notification will be sent
- entry.setFreeCasEndpoint(freeCasEndpoint);
- cacheStats( inputCasReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
- }
- else
- {
- cacheStats( casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
- }
- DelegateStats stats = new DelegateStats();
- if ( entry.getStat() == null )
- {
- entry.setStat(stats);
- // Add entry for self (this aggregate). MessageContext.getEndpointName()
- // returns the name of the queue receiving the message.
- stats.put(getController().getServiceEndpointName(), new TimerStats());
- }
- else
- {
- if (!stats.containsKey(getController().getServiceEndpointName()))
- {
- stats.put(getController().getServiceEndpointName(), new DelegateStats());
- }
- }
- }
- else
- {
- cacheStats( casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
- }
+ if (getController() instanceof AggregateAnalysisEngineController) {
+ // If the message came from a Cas Multiplier, associate the input/parent CAS id with this CAS
+ if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
+ // Fetch parent CAS id
+ String inputCasReferenceId = aMessageContext
+ .getMessageStringProperty(AsynchAEMessage.InputCasReference);
+ if (shadowCasPoolKey != null) {
+ // Save the key of the Cas Multiplier in the cache. It will be now known which Cas
+ // Multiplier produced this CAS
+ entry.setCasProducerKey(shadowCasPoolKey);
+ }
+ // associate this subordinate CAS with the parent CAS
+ entry.setInputCasReferenceId(inputCasReferenceId);
+ // Save a Cas Multiplier endpoint where a Free CAS notification will be sent
+ entry.setFreeCasEndpoint(freeCasEndpoint);
+ cacheStats(inputCasReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
+ } else {
+ cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
+ }
+ DelegateStats stats = new DelegateStats();
+ if (entry.getStat() == null) {
+ entry.setStat(stats);
+ // Add entry for self (this aggregate). MessageContext.getEndpointName()
+ // returns the name of the queue receiving the message.
+ stats.put(getController().getServiceEndpointName(), new TimerStats());
+ } else {
+ if (!stats.containsKey(getController().getServiceEndpointName())) {
+ stats.put(getController().getServiceEndpointName(), new DelegateStats());
+ }
+ }
+ } else {
+ cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
+ }
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialized_cas_ready_to_process_FINE",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_deserialized_cas_ready_to_process_FINE",
+ new Object[] { aMessageContext.getEndpoint().getEndpoint() });
}
- cacheProcessCommandInClientEndpoint();
- return entry;
- }
- private String getCasReferenceId( MessageContext aMessageContext ) throws Exception
- {
- if ( !aMessageContext.propertyExists(AsynchAEMessage.CasReference) )
- {
+ cacheProcessCommandInClientEndpoint();
+ return entry;
+ }
+
+ private String getCasReferenceId(MessageContext aMessageContext) throws Exception {
+ if (!aMessageContext.propertyExists(AsynchAEMessage.CasReference)) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_message_has_cas_refid__INFO",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_message_has_cas_refid__INFO",
+ new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ }
+ getController().getOutputChannel().sendReply(
+ new InvalidMessageException("No Cas Reference Id Received From Delegate In message"),
+ null, null, aMessageContext.getEndpoint(), AsynchAEMessage.Process);
+ return null;
+ }
+ return aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+ }
+
+ /**
+ * Handles process request from a remote client
+ *
+ * @param aMessageContext
+ * - contains a message from UIMA-AS Client
+ * @throws AsynchAEException
+ */
+ private void handleProcessRequestFromRemoteClient(MessageContext aMessageContext)
+ throws AsynchAEException {
+ CacheEntry entry = null;
+ String casReferenceId = null;
+ // Check if there is a cargo in the message
+ if (aMessageContext.getMessageIntProperty(AsynchAEMessage.Payload) == AsynchAEMessage.XMIPayload
+ && aMessageContext.getStringMessage() == null) {
+ return; // No XMI just return
+ }
+
+ try {
+
+ String newCASProducedBy = null;
+ // Get the CAS Reference Id of the input CAS
+ // Fetch id of the CAS from the message. If it doesnt exist the method will create an entry in
+ // the log file and return null
+ casReferenceId = getCasReferenceId(aMessageContext);
+ if (casReferenceId == null) {
+ return; // Invalid message. Nothing to do
}
- getController().
- getOutputChannel().
- sendReply(new InvalidMessageException("No Cas Reference Id Received From Delegate In message"), null,null, aMessageContext.getEndpoint(),AsynchAEMessage.Process);
- return null;
- }
- return aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
- }
- /**
- * Handles process request from a remote client
- *
- * @param aMessageContext - contains a message from UIMA-AS Client
- * @throws AsynchAEException
- */
- private void handleProcessRequestFromRemoteClient(MessageContext aMessageContext) throws AsynchAEException
- {
- CacheEntry entry = null;
- String casReferenceId = null;
- // Check if there is a cargo in the message
- if ( aMessageContext.getMessageIntProperty(AsynchAEMessage.Payload)== AsynchAEMessage.XMIPayload && aMessageContext.getStringMessage() == null )
- {
- return; // No XMI just return
- }
-
- try
- {
-
- String newCASProducedBy = null;
- // Get the CAS Reference Id of the input CAS
- // Fetch id of the CAS from the message. If it doesnt exist the method will create an entry in the log file and return null
- casReferenceId = getCasReferenceId(aMessageContext);
- if ( casReferenceId == null )
- {
- return; // Invalid message. Nothing to do
- }
- // Initially make both equal
- String inputCasReferenceId = casReferenceId;
- // Destination where Free Cas Notification will be sent if the CAS came from a Cas Multiplier
- Endpoint freeCasEndpoint = null;
+ // Initially make both equal
+ String inputCasReferenceId = casReferenceId;
+ // Destination where Free Cas Notification will be sent if the CAS came from a Cas Multiplier
+ Endpoint freeCasEndpoint = null;
CasStateEntry inputCasStateEntry = null;
-
- // CASes generated by a Cas Multiplier will have a CasSequence property set.
- if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
- {
- // Fetch the name of the Cas Multiplier's input queue
- //String cmEndpointName = aMessageContext.getEndpoint().getEndpoint();
- String cmEndpointName = aMessageContext.getMessageStringProperty(AsynchAEMessage.MessageFrom);
- newCASProducedBy =((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(cmEndpointName);
- // Fetch an ID of the parent CAS
- inputCasReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
- // Fetch Cache entry for the parent CAS
- CacheEntry inputCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(inputCasReferenceId);
+
+ // CASes generated by a Cas Multiplier will have a CasSequence property set.
+ if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
+ // Fetch the name of the Cas Multiplier's input queue
+ // String cmEndpointName = aMessageContext.getEndpoint().getEndpoint();
+ String cmEndpointName = aMessageContext
+ .getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ newCASProducedBy = ((AggregateAnalysisEngineController) getController())
+ .lookUpDelegateKey(cmEndpointName);
+ // Fetch an ID of the parent CAS
+ inputCasReferenceId = aMessageContext
+ .getMessageStringProperty(AsynchAEMessage.InputCasReference);
+ // Fetch Cache entry for the parent CAS
+ CacheEntry inputCasCacheEntry = getController().getInProcessCache().getCacheEntryForCAS(
+ inputCasReferenceId);
// Fetch an endpoint where Free CAS Notification must be sent.
- // This endpoint is unique per CM instance. Meaning, each
- // instance of CM will have an endpoint where it expects Free CAS
+ // This endpoint is unique per CM instance. Meaning, each
+ // instance of CM will have an endpoint where it expects Free CAS
// notifications.
freeCasEndpoint = aMessageContext.getEndpoint();
- // Clone an endpoint where Free Cas Request will be sent
- freeCasEndpoint = (Endpoint)((Endpoint_impl)freeCasEndpoint).clone();
+ // Clone an endpoint where Free Cas Request will be sent
+ freeCasEndpoint = (Endpoint) ((Endpoint_impl) freeCasEndpoint).clone();
- if ( getController() instanceof AggregateAnalysisEngineController ) {
- inputCasStateEntry = ((AggregateAnalysisEngineController)getController()).
- getLocalCache().lookupEntry(inputCasReferenceId);
+ if (getController() instanceof AggregateAnalysisEngineController) {
+ inputCasStateEntry = ((AggregateAnalysisEngineController) getController())
+ .getLocalCache().lookupEntry(inputCasReferenceId);
- // Associate Free Cas Notification Endpoint with an input Cas
+ // Associate Free Cas Notification Endpoint with an input Cas
inputCasStateEntry.setFreeCasNotificationEndpoint(freeCasEndpoint);
}
- computeStats(aMessageContext, inputCasReferenceId);
- // Reset the destination
- aMessageContext.getEndpoint().setDestination(null);
- // This CAS came in from a CAS Multiplier. Treat it differently than the
- // input CAS. In case the Aggregate needs to send this CAS to the
- // client, retrieve the client destination by looking up the client endpoint
- // using input CAS reference id. CASes generated by the CAS multiplier will have
- // the same Cas Reference id.
- Endpoint replyToEndpoint = inputCasCacheEntry.getMessageOrigin();
- // The message context contains a Cas Multiplier endpoint. Since
- // we dont want to send a generated CAS back to the CM, override
- // with an endpoint provided by the client of
- // this service. Client endpoint is attached to an input Cas cache entry.
- aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
- aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
-
- // Before sending a CAS to Cas Multiplier, the aggregate has
- // saved the CM key in the CAS cache entry. Fetch the key
- // of the CM so that we can ask the right Shadow Cas Pool for
- // a new CAS. Every Shadow Cas Pool has a unique id which
- // corresponds to a Cas Multiplier key.
- //newCASProducedBy = inputCasCacheEntry.getCasMultiplierKey();
- if ( getController() instanceof AggregateAnalysisEngineController )
- {
- Endpoint casMultiplierEndpoint =
- ((AggregateAnalysisEngineController)getController()).lookUpEndpoint(newCASProducedBy, false);
- if ( casMultiplierEndpoint != null )
- {
- // Save the URL of the broker managing the Free Cas Notification queue.
- // This is needed when we try to establish a connection to the broker.
- freeCasEndpoint.setServerURI(casMultiplierEndpoint.getServerURI());
- }
- }
- }
- else if ( getController().isTopLevelComponent() )
- {
- if ( getController() instanceof AggregateAnalysisEngineController ) {
- ((AggregateAnalysisEngineController)getController()).addMessageOrigin(casReferenceId, aMessageContext.getEndpoint());
- }
- if ( getController().isCasMultiplier() ) {
- // Send an ack to the client. The ack message will include a FreeCasQueue
- // to enable the client to send messages to the service processing a CAS.
- try {
- getController().getOutputChannel().sendReply(AsynchAEMessage.ServiceInfo, aMessageContext.getEndpoint(), casReferenceId);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- // To prevent processing multiple messages with the same CasReferenceId, check the CAS cache
- // to see if the message with a given CasReferenceId is already being processed. It is, the
- // message contains the same request possibly issued by the caller due to timeout. Also this
- // mechanism helps with dealing with scenario when this service is not up when the client sends
- // request. The client can keep re-sending the same request until its timeout thresholds are
- // exceeded. By that time, there may be multiple messages in this service queue with the same
- // CasReferenceId. When the service finally comes back up, it will have multiple messages in
- // its queue possibly from the same client. Only the first message for any given CasReferenceId
- // should be processed.
- if ( !getController().getInProcessCache().entryExists(casReferenceId) )
- {
- CasStateEntry cse = null;
- if ( getController().getLocalCache().lookupEntry(casReferenceId) == null ) {
- // Create a new entry in the local cache for the CAS received from the remote
- cse = getController().getLocalCache().createCasStateEntry(casReferenceId);
- // Check if this CAS is a child
- if( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) ) {
- cse.setInputCasReferenceId(inputCasReferenceId);
- }
- } else {
- cse = getController().getLocalCache().lookupEntry(casReferenceId);
- }
-
- if ( getController() instanceof AggregateAnalysisEngineController && aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
- String delegateInputQueueName = aMessageContext.getMessageStringProperty(AsynchAEMessage.MessageFrom);
- String delegateKey =((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(delegateInputQueueName); //aMessageContext.getEndpoint().getEndpoint());
- if ( delegateKey != null ) {
- Delegate delegate = ((AggregateAnalysisEngineController)getController()).lookupDelegate(delegateKey);
- // Save the last delegate handling this CAS
- cse.setLastDelegate(delegate);
- // If there is one thread receiving messages from Cas Multiplier increment number of child Cases
- // of the parent CAS. If there are more threads (consumers) a special object ConcurrentMessageListener
- // has already incremented the count. This special object enforces order of processing for CASes
- // coming in from the Cas Multiplier.
- if ( !delegate.hasConcurrentConsumersOnReplyQueue() ) {
- inputCasStateEntry.incrementSubordinateCasInPlayCount();
- }
- }
- }
-
- entry = deserializeCASandRegisterWithCache( casReferenceId, freeCasEndpoint, newCASProducedBy, aMessageContext);
- if ( getController().isStopped() || entry == null || entry.getCas() == null)
- {
- if ( entry != null )
- {
- // The Controller is in shutdown state, release the CAS
- getController().dropCAS( entry.getCasReferenceId(), true);
- entry = null;
- }
- return;
- }
- // *****************************************************************
- // Process the CAS
- // *****************************************************************
- invokeProcess(entry.getCas(), inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
- }
- else
- {
+ computeStats(aMessageContext, inputCasReferenceId);
+ // Reset the destination
+ aMessageContext.getEndpoint().setDestination(null);
+ // This CAS came in from a CAS Multiplier. Treat it differently than the
+ // input CAS. In case the Aggregate needs to send this CAS to the
+ // client, retrieve the client destination by looking up the client endpoint
+ // using input CAS reference id. CASes generated by the CAS multiplier will have
+ // the same Cas Reference id.
+ Endpoint replyToEndpoint = inputCasCacheEntry.getMessageOrigin();
+ // The message context contains a Cas Multiplier endpoint. Since
+ // we dont want to send a generated CAS back to the CM, override
+ // with an endpoint provided by the client of
+ // this service. Client endpoint is attached to an input Cas cache entry.
+ aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
+ aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
+
+ // Before sending a CAS to Cas Multiplier, the aggregate has
+ // saved the CM key in the CAS cache entry. Fetch the key
+ // of the CM so that we can ask the right Shadow Cas Pool for
+ // a new CAS. Every Shadow Cas Pool has a unique id which
+ // corresponds to a Cas Multiplier key.
+ // newCASProducedBy = inputCasCacheEntry.getCasMultiplierKey();
+ if (getController() instanceof AggregateAnalysisEngineController) {
+ Endpoint casMultiplierEndpoint = ((AggregateAnalysisEngineController) getController())
+ .lookUpEndpoint(newCASProducedBy, false);
+ if (casMultiplierEndpoint != null) {
+ // Save the URL of the broker managing the Free Cas Notification queue.
+ // This is needed when we try to establish a connection to the broker.
+ freeCasEndpoint.setServerURI(casMultiplierEndpoint.getServerURI());
+ }
+ }
+ } else if (getController().isTopLevelComponent()) {
+ if (getController() instanceof AggregateAnalysisEngineController) {
+ ((AggregateAnalysisEngineController) getController()).addMessageOrigin(casReferenceId,
+ aMessageContext.getEndpoint());
+ }
+ if (getController().isCasMultiplier()) {
+ // Send an ack to the client. The ack message will include a FreeCasQueue
+ // to enable the client to send messages to the service processing a CAS.
+ try {
+ getController().getOutputChannel().sendReply(AsynchAEMessage.ServiceInfo,
+ aMessageContext.getEndpoint(), casReferenceId);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ // To prevent processing multiple messages with the same CasReferenceId, check the CAS cache
+ // to see if the message with a given CasReferenceId is already being processed. It is, the
+ // message contains the same request possibly issued by the caller due to timeout. Also this
+ // mechanism helps with dealing with scenario when this service is not up when the client
+ // sends
+ // request. The client can keep re-sending the same request until its timeout thresholds are
+ // exceeded. By that time, there may be multiple messages in this service queue with the same
+ // CasReferenceId. When the service finally comes back up, it will have multiple messages in
+ // its queue possibly from the same client. Only the first message for any given
+ // CasReferenceId
+ // should be processed.
+ if (!getController().getInProcessCache().entryExists(casReferenceId)) {
+ CasStateEntry cse = null;
+ if (getController().getLocalCache().lookupEntry(casReferenceId) == null) {
+ // Create a new entry in the local cache for the CAS received from the remote
+ cse = getController().getLocalCache().createCasStateEntry(casReferenceId);
+ // Check if this CAS is a child
+ if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
+ cse.setInputCasReferenceId(inputCasReferenceId);
+ }
+ } else {
+ cse = getController().getLocalCache().lookupEntry(casReferenceId);
+ }
+
+ if (getController() instanceof AggregateAnalysisEngineController
+ && aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
+ String delegateInputQueueName = aMessageContext
+ .getMessageStringProperty(AsynchAEMessage.MessageFrom);
+ String delegateKey = ((AggregateAnalysisEngineController) getController())
+ .lookUpDelegateKey(delegateInputQueueName); // aMessageContext.getEndpoint().getEndpoint());
+ if (delegateKey != null) {
+ Delegate delegate = ((AggregateAnalysisEngineController) getController())
+ .lookupDelegate(delegateKey);
+ // Save the last delegate handling this CAS
+ cse.setLastDelegate(delegate);
+ // If there is one thread receiving messages from Cas Multiplier increment number of
+ // child Cases
+ // of the parent CAS. If there are more threads (consumers) a special object
+ // ConcurrentMessageListener
+ // has already incremented the count. This special object enforces order of processing
+ // for CASes
+ // coming in from the Cas Multiplier.
+ if (!delegate.hasConcurrentConsumersOnReplyQueue()) {
+ inputCasStateEntry.incrementSubordinateCasInPlayCount();
+ }
+ }
+ }
+
+ entry = deserializeCASandRegisterWithCache(casReferenceId, freeCasEndpoint,
+ newCASProducedBy, aMessageContext);
+ if (getController().isStopped() || entry == null || entry.getCas() == null) {
+ if (entry != null) {
+ // The Controller is in shutdown state, release the CAS
+ getController().dropCAS(entry.getCasReferenceId(), true);
+ entry = null;
+ }
+ return;
+ }
+ // *****************************************************************
+ // Process the CAS
+ // *****************************************************************
+ invokeProcess(entry.getCas(), inputCasReferenceId, casReferenceId, aMessageContext,
+ newCASProducedBy);
+ } else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_duplicate_request__INFO",
- new Object[] { casReferenceId});
+ "handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_duplicate_request__INFO", new Object[] { casReferenceId });
}
- }
- }
- catch ( Exception e)
- {
- e.printStackTrace();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "handleProcessRequestFromRemoteClient",
- UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "handleProcessRequestFromRemoteClient", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_exception__WARNING", e);
}
- ErrorContext errorContext = new ErrorContext();
- errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
- errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
- errorContext.add(AsynchAEMessage.CasReference, casReferenceId );
- if ( entry != null )
- {
- getController().dropCAS(entry.getCas());
- }
- getController().getErrorHandlerChain().handle(e, errorContext, getController());
- }
-
- }
- private void handleProcessRequestWithCASReference(MessageContext aMessageContext) throws AsynchAEException
- {
- boolean isNewCAS = false;
- String newCASProducedBy = null;
-
-
- try
- {
- // This is only used when handling CASes produced by CAS Multiplier
- String inputCasReferenceId = null;
- CAS cas = null;
- CasStateEntry cse = null;
- String casReferenceId = getCasReferenceId(aMessageContext);
- if ( (cse = getController().getLocalCache().lookupEntry(casReferenceId)) == null ) {
- // Create a new entry in the local cache for the CAS received from the remote
+ ErrorContext errorContext = new ErrorContext();
+ errorContext.add(AsynchAEMessage.Endpoint, aMessageContext.getEndpoint());
+ errorContext.add(AsynchAEMessage.Command, AsynchAEMessage.Process);
+ errorContext.add(AsynchAEMessage.CasReference, casReferenceId);
+ if (entry != null) {
+ getController().dropCAS(entry.getCas());
+ }
+ getController().getErrorHandlerChain().handle(e, errorContext, getController());
+ }
+
+ }
+
+ private void handleProcessRequestWithCASReference(MessageContext aMessageContext)
+ throws AsynchAEException {
+ boolean isNewCAS = false;
+ String newCASProducedBy = null;
+
+ try {
+ // This is only used when handling CASes produced by CAS Multiplier
+ String inputCasReferenceId = null;
+ CAS cas = null;
+ CasStateEntry cse = null;
+ String casReferenceId = getCasReferenceId(aMessageContext);
+ if ((cse = getController().getLocalCache().lookupEntry(casReferenceId)) == null) {
+ // Create a new entry in the local cache for the CAS received from the remote
cse = getController().getLocalCache().createCasStateEntry(casReferenceId);
}
- // Check if this Cas has been sent from a Cas Multiplier. If so, its sequence will be > 0
- if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
- {
- isNewCAS = true;
-
- Endpoint casMultiplierEndpoint = aMessageContext.getEndpoint();
+ // Check if this Cas has been sent from a Cas Multiplier. If so, its sequence will be > 0
+ if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
+ isNewCAS = true;
- if ( casMultiplierEndpoint == null )
- {
+ Endpoint casMultiplierEndpoint = aMessageContext.getEndpoint();
+
+ if (casMultiplierEndpoint == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO",
- new Object[] { casReferenceId });
+ "handleProcessRequestWithCASReference",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_no_endpoint_for_reply__INFO",
+ new Object[] { casReferenceId });
}
- return;
- }
- // Get the id of the parent Cas
- inputCasReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.InputCasReference);
- if ( cse.getInputCasReferenceId() == null ) {
+ return;
+ }
+ // Get the id of the parent Cas
+ inputCasReferenceId = aMessageContext
+ .getMessageStringProperty(AsynchAEMessage.InputCasReference);
+ if (cse.getInputCasReferenceId() == null) {
cse.setInputCasReferenceId(inputCasReferenceId);
}
-
- if ( getController() instanceof AggregateAnalysisEngineController )
- {
- CasStateEntry parentCasEntry = getController().getLocalCache().lookupEntry(inputCasReferenceId);
- // Check if the parent CAS is in a failed state first
- if ( parentCasEntry != null && parentCasEntry.isFailed()) {
- // handle CAS release
- getController().process(null, casReferenceId);
- return;
- }
-
- String delegateKey =((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
- Delegate delegate = ((AggregateAnalysisEngineController)getController()).lookupDelegate(delegateKey);
- cse.setLastDelegate(delegate);
+
+ if (getController() instanceof AggregateAnalysisEngineController) {
+ CasStateEntry parentCasEntry = getController().getLocalCache().lookupEntry(
+ inputCasReferenceId);
+ // Check if the parent CAS is in a failed state first
+ if (parentCasEntry != null && parentCasEntry.isFailed()) {
+ // handle CAS release
+ getController().process(null, casReferenceId);
+ return;
+ }
+
+ String delegateKey = ((AggregateAnalysisEngineController) getController())
+ .lookUpDelegateKey(aMessageContext.getEndpoint().getEndpoint());
+ Delegate delegate = ((AggregateAnalysisEngineController) getController())
+ .lookupDelegate(delegateKey);
+ cse.setLastDelegate(delegate);
newCASProducedBy = delegate.getKey();
casMultiplierEndpoint.setIsCasMultiplier(true);
- try {
- // Save the endpoint of the CM which produced the Cas
- getController().getInProcessCache().setCasProducer(casReferenceId, newCASProducedBy);
- } catch( Exception e) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
- new Object[] { e });
- }
- e.printStackTrace();
- return;
- }
- // Safety check. The input Cas should not be null here
- if ( inputCasReferenceId != null )
- {
- try
- {
- Endpoint endp = null;
-
- // Located the origin of the parent Cas. The produced Cas will inherit the origin from its parent.
- // Once the origin is identified, save the origin using the produced Cas id as a key.
- if ( endp == null )
- {
- boolean gotTheEndpoint = false;
- String parentCasId = inputCasReferenceId;
- // Loop through the parent tree until an origin is found
- while( !gotTheEndpoint )
- {
- // Check if the current parent has an associated origin
- endp = ((AggregateAnalysisEngineController)getController()).getMessageOrigin(parentCasId);
- // Check if there is an origin. If so, we are done
- if ( endp != null )
- {
+ try {
+ // Save the endpoint of the CM which produced the Cas
+ getController().getInProcessCache().setCasProducer(casReferenceId, newCASProducedBy);
+ } catch (Exception e) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "handleProcessRequestWithCASReference",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING",
+ new Object[] { e });
+ }
+ e.printStackTrace();
+ return;
+ }
+ // Safety check. The input Cas should not be null here
+ if (inputCasReferenceId != null) {
+ try {
+ Endpoint endp = null;
+
+ // Located the origin of the parent Cas. The produced Cas will inherit the origin from
+ // its parent.
+ // Once the origin is identified, save the origin using the produced Cas id as a key.
+ if (endp == null) {
+ boolean gotTheEndpoint = false;
+ String parentCasId = inputCasReferenceId;
+ // Loop through the parent tree until an origin is found
+ while (!gotTheEndpoint) {
+ // Check if the current parent has an associated origin
+ endp = ((AggregateAnalysisEngineController) getController())
+ .getMessageOrigin(parentCasId);
+ // Check if there is an origin. If so, we are done
+ if (endp != null) {
break;
- }
- // The current parent has no origin, get its parent and try again
- CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(parentCasId);
+ }
+ // The current parent has no origin, get its parent and try again
+ CacheEntry entry = getController().getInProcessCache().getCacheEntryForCAS(
+ parentCasId);
parentCasId = entry.getInputCasReferenceId();
- // Check if we reached the top of the hierarchy tree. If so, we have no origin. This should
- // never be the case. Every Cas must have an origin
- if ( parentCasId == null )
- {
+ // Check if we reached the top of the hierarchy tree. If so, we have no origin.
+ // This should
+ // never be the case. Every Cas must have an origin
+ if (parentCasId == null) {
break;
}
- }
- }
- // If origin not found log it as this indicates an error
- if ( endp == null )
- {
- System.out.println("Endpoint Not Found For Cas Id:"+inputCasReferenceId);
+ }
+ }
+ // If origin not found log it as this indicates an error
+ if (endp == null) {
+ System.out.println("Endpoint Not Found For Cas Id:" + inputCasReferenceId);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_msg_origin_not_found__INFO",
- new Object[] { getController().getComponentName(), inputCasReferenceId });
+ "handleProcessRequestWithCASReference",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_msg_origin_not_found__INFO",
+ new Object[] { getController().getComponentName(), inputCasReferenceId });
}
- }
- else
- {
- ((AggregateAnalysisEngineController)getController()).addMessageOrigin(casReferenceId, endp);
+ } else {
+ ((AggregateAnalysisEngineController) getController()).addMessageOrigin(
+ casReferenceId, endp);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
- "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_msg_origin_added__FINEST",
- new Object[] { getController().getComponentName(), casReferenceId, newCASProducedBy });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINEST,
+ CLASS_NAME.getName(),
+ "handleProcessRequestWithCASReference",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_msg_origin_added__FINEST",
+ new Object[] { getController().getComponentName(), casReferenceId,
+ newCASProducedBy });
}
}
- }
- catch( Exception e)
- {
- e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "handleProcessRequestWithCASReference",
- UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+ "handleProcessRequestWithCASReference",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
}
- }
- }
- else
- {
+ }
+ } else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_input_cas_invalid__INFO",
- new Object[] { getController().getComponentName(), newCASProducedBy, casReferenceId });
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "handleProcessRequestWithCASReference",
+ UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_input_cas_invalid__INFO",
+ new Object[] { getController().getComponentName(), newCASProducedBy,
+ casReferenceId });
}
- }
- }
+ }
+ }
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_new_cas__FINE",
- new Object[] { casReferenceId, newCASProducedBy });
+ "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_new_cas__FINE", new Object[] { casReferenceId, newCASProducedBy });
}
- aMessageContext.getEndpoint().setEndpoint(casMultiplierEndpoint.getEndpoint());
- aMessageContext.getEndpoint().setServerURI(casMultiplierEndpoint.getServerURI());
- }
- else
- {
- if ( getController() instanceof AggregateAnalysisEngineController )
- {
- ((AggregateAnalysisEngineController)getController()).addMessageOrigin(casReferenceId, aMessageContext.getEndpoint());
- }
-
- }
- cas = getController().getInProcessCache().getCasByReference(casReferenceId);
-
- long arrivalTime = System.nanoTime();
- getController().saveTime(arrivalTime, casReferenceId, getController().getName());//aMessageContext.getEndpointName());
+ aMessageContext.getEndpoint().setEndpoint(casMultiplierEndpoint.getEndpoint());
+ aMessageContext.getEndpoint().setServerURI(casMultiplierEndpoint.getServerURI());
+ } else {
+ if (getController() instanceof AggregateAnalysisEngineController) {
+ ((AggregateAnalysisEngineController) getController()).addMessageOrigin(casReferenceId,
+ aMessageContext.getEndpoint());
+ }
+
+ }
+ cas = getController().getInProcessCache().getCasByReference(casReferenceId);
+
+ long arrivalTime = System.nanoTime();
+ getController().saveTime(arrivalTime, casReferenceId, getController().getName());// aMessageContext.getEndpointName());
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_analyzing_cas__FINE",
- new Object[] { casReferenceId});
+ "handleProcessRequestWithCASReference", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_analyzing_cas__FINE", new Object[] { casReferenceId });
+ }
+ // Save Process command in the client endpoint.
+ cacheProcessCommandInClientEndpoint();
+
+ if (getController().isStopped()) {
+ return;
+ }
+
+ if (isNewCAS) {
+ invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
+ } else {
+ invokeProcess(cas, casReferenceId, null, aMessageContext, newCASProducedBy);
+ }
+ } catch (AsynchAEException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+
+ }
+
+ private void handleProcessRequestWithXCAS(MessageContext aMessageContext)
+ throws AsynchAEException {
+
+ try {
+ // Get the CAS Reference Id of the input CAS
+ String casReferenceId = getCasReferenceId(aMessageContext);
+ String inputCasReferenceId = casReferenceId;
+ // This is only used when handling CASes produced by CAS Multiplier
+ String newCASProducedBy = null;
+
+ if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
+ // This CAS came in from the CAS Multiplier. Treat it differently than the
+ // input CAS. First, in case the Aggregate needs to send this CAS to the
+ // client, retrieve the client destination by looking up the client endpoint
+ // using input CAS reference id. CASes generated by the CAS multiplier will have
+ // the same Cas Reference id.
+ Endpoint replyToEndpoint = getController().getInProcessCache().getCacheEntryForCAS(
+ casReferenceId).getMessageOrigin();
+
+ //
+ if (getController() instanceof AggregateAnalysisEngineController) {
+ newCASProducedBy = ((AggregateAnalysisEngineController) getController())
+ .lookUpDelegateKey(replyToEndpoint.getEndpoint());
+ }
+ // MessageContext contains endpoint set by the CAS Multiplier service. Overwrite
+ // this with the endpoint of the client who sent the input CAS. In case this
+ // aggregate is configured to send new CASes to the client we know where to send them.
+ aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
+ aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
+ inputCasReferenceId = String.valueOf(casReferenceId);
+ // Set this to null so that the new CAS gets its own Cas Reference Id below
+ casReferenceId = null;
}
- // Save Process command in the client endpoint.
- cacheProcessCommandInClientEndpoint();
- if ( getController().isStopped() )
- {
- return;
- }
-
- if ( isNewCAS )
- {
- invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
- }
- else
- {
- invokeProcess(cas, casReferenceId, null, aMessageContext, newCASProducedBy);
- }
- }
- catch ( AsynchAEException e)
- {
- throw e;
- }
- catch ( Exception e)
- {
- throw new AsynchAEException(e);
- }
-
- }
-
-
- private void handleProcessRequestWithXCAS(MessageContext aMessageContext) throws AsynchAEException
- {
-
- try
- {
- // Get the CAS Reference Id of the input CAS
- String casReferenceId = getCasReferenceId(aMessageContext);
- String inputCasReferenceId = casReferenceId;
- // This is only used when handling CASes produced by CAS Multiplier
- String newCASProducedBy = null;
-
- if ( aMessageContext.propertyExists(AsynchAEMessage.CasSequence) )
- {
- // This CAS came in from the CAS Multiplier. Treat it differently than the
- // input CAS. First, in case the Aggregate needs to send this CAS to the
- // client, retrieve the client destination by looking up the client endpoint
- // using input CAS reference id. CASes generated by the CAS multiplier will have
- // the same Cas Reference id.
- Endpoint replyToEndpoint =
- getController().
- getInProcessCache().
- getCacheEntryForCAS(casReferenceId).getMessageOrigin();
-
- //
- if ( getController() instanceof AggregateAnalysisEngineController )
- {
- newCASProducedBy =
- ((AggregateAnalysisEngineController)getController()).lookUpDelegateKey(replyToEndpoint.getEndpoint());
- }
- // MessageContext contains endpoint set by the CAS Multiplier service. Overwrite
- // this with the endpoint of the client who sent the input CAS. In case this
- // aggregate is configured to send new CASes to the client we know where to send them.
- aMessageContext.getEndpoint().setEndpoint(replyToEndpoint.getEndpoint());
- aMessageContext.getEndpoint().setServerURI(replyToEndpoint.getServerURI());
- inputCasReferenceId = String.valueOf(casReferenceId);
- // Set this to null so that the new CAS gets its own Cas Reference Id below
- casReferenceId = null;
- }
-
- long arrivalTime = System.nanoTime();
- getController().saveTime(arrivalTime, casReferenceId, getController().getName());//aMessageContext.getEndpointName());
-
- // To prevent processing multiple messages with the same CasReferenceId, check the CAS cache
- // to see if the message with a given CasReferenceId is already being processed. It is, the
- // message contains the same request possibly issued by the caller due to timeout. Also this
- // mechanism helps with dealing with scenario when this service is not up when the client sends
- // request. The client can keep re-sending the same request until its timeout thresholds are
- // exceeded. By that time, there may be multiple messages in this service queue with the same
- // CasReferenceId. When the service finally comes back up, it will have multiple messages in
- // its queue possibly from the same client. Only the first message for any given CasReferenceId
- // should be processed.
- if ( casReferenceId == null || !getController().getInProcessCache().entryExists(casReferenceId) )
- {
- String xmi = aMessageContext.getStringMessage();
-
- // *****************************************************************
- // ***** NO XMI In Message. Kick this back to sender with exception
- // *****************************************************************
- if ( xmi == null )
- {
+ long arrivalTime = System.nanoTime();
+ getController().saveTime(arrivalTime, casReferenceId, getController().getName());// aMessageContext.getEndpointName());
+
+ // To prevent processing multiple messages with the same CasReferenceId, check the CAS cache
+ // to see if the message with a given CasReferenceId is already being processed. It is, the
+ // message contains the same request possibly issued by the caller due to timeout. Also this
+ // mechanism helps with dealing with scenario when this service is not up when the client
+ // sends
+ // request. The client can keep re-sending the same request until its timeout thresholds are
+ // exceeded. By that time, there may be multiple messages in this service queue with the same
+ // CasReferenceId. When the service finally comes back up, it will have multiple messages in
+ // its queue possibly from the same client. Only the first message for any given
+ // CasReferenceId
+ // should be processed.
+ if (casReferenceId == null
+ || !getController().getInProcessCache().entryExists(casReferenceId)) {
+ String xmi = aMessageContext.getStringMessage();
+
+ // *****************************************************************
+ // ***** NO XMI In Message. Kick this back to sender with exception
+ // *****************************************************************
+ if (xmi == null) {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_message_has_no_cargo__INFO",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
- }
- getController().
- getOutputChannel().
- sendReply(new InvalidMessageException("No XMI data in message"), casReferenceId,null, aMessageContext.getEndpoint(),AsynchAEMessage.Process);
- // Dont process this empty message
- return;
- }
-
+ "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_message_has_no_cargo__INFO",
+ new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ }
+ getController().getOutputChannel().sendReply(
+ new InvalidMessageException("No XMI data in message"), casReferenceId, null,
+ aMessageContext.getEndpoint(), AsynchAEMessage.Process);
+ // Dont process this empty message
+ return;
+ }
+
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas__FINE",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_request_cas__FINE",
+ new Object[] { aMessageContext.getEndpoint().getEndpoint() });
}
- CAS cas = getController().getCasManagerWrapper().getNewCas();
+ CAS cas = getController().getCasManagerWrapper().getNewCas();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_request_cas_granted__FINE",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_request_cas_granted__FINE",
+ new Object[] { aMessageContext.getEndpoint().getEndpoint() });
}
- XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
-// UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
+ XmiSerializationSharedData deserSharedData = new XmiSerializationSharedData();
+ // UimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1);
-
- if (casReferenceId == null)
- {
- CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData);
- casReferenceId = entry.getCasReferenceId();
- }
- else
- {
- if (getController() instanceof PrimitiveAnalysisEngineController)
- {
- getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, casReferenceId);
- }
- }
+
+ if (casReferenceId == null) {
+ CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext,
+ deserSharedData);
+ casReferenceId = entry.getCasReferenceId();
+ } else {
+ if (getController() instanceof PrimitiveAnalysisEngineController) {
+ getController().getInProcessCache().register(cas, aMessageContext, deserSharedData,
+ casReferenceId);
+ }
+ }
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_deserialized_cas_ready_to_process_FINE",
- new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+ "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_deserialized_cas_ready_to_process_FINE",
+ new Object[] { aMessageContext.getEndpoint().getEndpoint() });
}
- cacheProcessCommandInClientEndpoint();
+ cacheProcessCommandInClientEndpoint();
- invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
- }
- else
- {
+ invokeProcess(cas, inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy);
+ } else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
- "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_duplicate_request__INFO",
- new Object[] { casReferenceId});
+ "handleProcessRequestWithXCAS", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_duplicate_request__INFO", new Object[] { casReferenceId });
}
- }
-
- }
- catch ( AsynchAEException e)
- {
- throw e;
- }
- catch ( Exception e)
- {
- throw new AsynchAEException(e);
- }
-
- }
-
-
- private void cacheProcessCommandInClientEndpoint()
- {
- Endpoint clientEndpoint = getController().getClientEndpoint();
- if ( clientEndpoint != null )
- {
- clientEndpoint.setCommand(AsynchAEMessage.Process);
- }
- }
-
- private void handleCollectionProcessCompleteRequest(MessageContext aMessageContext)
- throws AsynchAEException
- {
- Endpoint replyToEndpoint = aMessageContext.getEndpoint();
- getController().collectionProcessComplete(replyToEndpoint);
- }
-
- private void handleReleaseCASRequest(MessageContext aMessageContext) throws AsynchAEException
- {
- String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
+ }
+
+ } catch (AsynchAEException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+
+ }
+
+ private void cacheProcessCommandInClientEndpoint() {
+ Endpoint clientEndpoint = getController().getClientEndpoint();
+ if (clientEndpoint != null) {
+ clientEndpoint.setCommand(AsynchAEMessage.Process);
+ }
+ }
+
+ private void handleCollectionProcessCompleteRequest(MessageContext aMessageContext)
+ throws AsynchAEException {
+ Endpoint replyToEndpoint = aMessageContext.getEndpoint();
+ getController().collectionProcessComplete(replyToEndpoint);
+ }
+
+ private void handleReleaseCASRequest(MessageContext aMessageContext) throws AsynchAEException {
+ String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "handleReleaseCASRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_release_cas_req__FINE",
- new Object[] { getController().getName(), casReferenceId });
+ "handleReleaseCASRequest", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_release_cas_req__FINE",
+ new Object[] { getController().getName(), casReferenceId });
}
- getController().releaseNextCas(casReferenceId);
- }
- private void handlePingRequest(MessageContext aMessageContext) {
- try {
- getController().getOutputChannel().sendReply(AsynchAEMessage.Ping, aMessageContext.getEndpoint());
- } catch ( Exception e) {
- e.printStackTrace();
- }
-
- }
-
- private void handleStopRequest(MessageContext aMessageContext)
- {
- try {
- String casReferenceId = aMessageContext.getMessageStringProperty(AsynchAEMessage.CasReference);
- System.out.println("###################Controller::"+getController().getComponentName()+" Received <<<STOP>>> Request For CAS:"+casReferenceId);
- if ( getController() instanceof PrimitiveAnalysisEngineController ) {
- getController().addAbortedCasReferenceId(casReferenceId);
- } else if ( getController() instanceof AggregateAnalysisEngineController_impl ) {
- try {
- CasStateEntry casStateEntry =
- getController().getLocalCache().lookupEntry(casReferenceId);
- // Mark the CAS as if it have failed. In this case we dont associate any
- // exceptions with this CAS so its really not a failure of a CAS or any
- // of its children. We simply use the same logic here as if the CAS failed.
- // The Aggregate replyToClient() method will know that this CAS was stopped
- // as opposed to failed by the fact that the CAS has no exceptions associated
- // with it. In such case the replyToClient() method returns an input CAS as if
- // it has been fully processed.
- casStateEntry.setFailed();
- ((AggregateAnalysisEngineController_impl)getController()).stopCasMultipliers();
- } catch (Exception ex) {} // CAS may have already been deleted
-
- }
-
-
- }
- catch( Exception e){}
- }
- /**
- * Main method called by the predecessor handler.
- *
- *
- */
- public void handle(Object anObjectToHandle) //throws AsynchAEException
- {
- try
- {
- super.validate(anObjectToHandle);
-
- MessageContext messageContext = (MessageContext) anObjectToHandle;
- if ( isHandlerForMessage(messageContext, AsynchAEMessage.Request, AsynchAEMessage.Process) ||
- isHandlerForMessage(messageContext, AsynchAEMessage.Request, AsynchAEMessage.CollectionProcessComplete) ||
- isHandlerForMessage(messageContext, AsynchAEMessage.Request, AsynchAEMessage.ReleaseCAS ) ||
- isHandlerForMessage(messageContext, AsynchAEMessage.Request, AsynchAEMessage.Stop )
- )
- {
- int payload = messageContext.getMessageIntProperty(AsynchAEMessage.Payload);
- int command = messageContext.getMessageIntProperty(AsynchAEMessage.Command);
-
- getController().getControllerLatch().waitUntilInitialized();
-
- // If a Process Request, increment number of CASes processed
- if (messageContext.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request
- && command == AsynchAEMessage.Process &&!messageContext.propertyExists(AsynchAEMessage.CasSequence)) {
- // Increment number of CASes processed by this service
- getController().getServicePerformance().incrementNumberOfCASesProcessed();
- }
- if ( getController().isStopped() )
- {
- return;
- }
-
- if (AsynchAEMessage.CASRefID == payload)
- {
- // Fetch id of the CAS from the message.
- if ( getCasReferenceId(messageContext) == null )
- {
- return; // Invalid message. Nothing to do
- }
-
- handleProcessRequestWithCASReference(messageContext);
- }
- else if (AsynchAEMessage.XMIPayload == payload || AsynchAEMessage.BinaryPayload == payload)
- {
- // Fetch id of the CAS from the message.
- if ( getCasReferenceId(messageContext) == null )
- {
- return; // Invalid message. Nothing to do
- }
- handleProcessRequestFromRemoteClient(messageContext);
- }
- else if (AsynchAEMessage.XCASPayload == payload)
- {
- // Fetch id of the CAS from the message.
- if ( getCasReferenceId(messageContext) == null )
- {
- return; // Invalid message. Nothing to do
- }
- handleProcessRequestWithXCAS(messageContext);
- }
- else if ( AsynchAEMessage.None == payload && AsynchAEMessage.CollectionProcessComplete == command)
- {
- handleCollectionProcessCompleteRequest(messageContext);
- }
- else if ( AsynchAEMessage.None == payload && AsynchAEMessage.ReleaseCAS == command)
- {
- handleReleaseCASRequest(messageContext);
- }
- else if ( AsynchAEMessage.None == payload && AsynchAEMessage.Stop == command)
- {
+ getController().releaseNextCas(casReferenceId);
+ }
+
+ private void handlePingRequest(MessageContext aMessageContext) {
+ try {
+ getController().getOutputChannel().sendReply(AsynchAEMessage.Ping,
+ aMessageContext.getEndpoint());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ private void handleStopRequest(MessageContext aMessageContext) {
+ try {
+ String casReferenceId = aMessageContext
+ .getMessageStringProperty(AsynchAEMessage.CasReference);
+ System.out.println("###################Controller::" + getController().getComponentName()
+ + " Received <<<STOP>>> Request For CAS:" + casReferenceId);
+ if (getController() instanceof PrimitiveAnalysisEngineController) {
+ getController().addAbortedCasReferenceId(casReferenceId);
+ } else if (getController() instanceof AggregateAnalysisEngineController_impl) {
+ try {
+ CasStateEntry casStateEntry = getController().getLocalCache().lookupEntry(casReferenceId);
+ // Mark the CAS as if it have failed. In this case we dont associate any
+ // exceptions with this CAS so its really not a failure of a CAS or any
+ // of its children. We simply use the same logic here as if the CAS failed.
+ // The Aggregate replyToClient() method will know that this CAS was stopped
+ // as opposed to failed by the fact that the CAS has no exceptions associated
+ // with it. In such case the replyToClient() method returns an input CAS as if
+ // it has been fully processed.
+ casStateEntry.setFailed();
+ ((AggregateAnalysisEngineController_impl) getController()).stopCasMultipliers();
+ } catch (Exception ex) {
+ } // CAS may have already been deleted
+
+ }
+
+ } catch (Exception e) {
+ }
+ }
+
+ /**
+ * Main method called by the predecessor handler.
+ *
+ *
+ */
+ public void handle(Object anObjectToHandle) // throws AsynchAEException
+ {
+ try {
+ super.validate(anObjectToHandle);
+
+ MessageContext messageContext = (MessageContext) anObjectToHandle;
+ if (isHandlerForMessage(messageContext, AsynchAEMessage.Request, AsynchAEMessage.Process)
+ || isHandlerForMessage(messageContext, AsynchAEMessage.Request,
+ AsynchAEMessage.CollectionProcessComplete)
+ || isHandlerForMessage(messageContext, AsynchAEMessage.Request,
+ AsynchAEMessage.ReleaseCAS)
+ || isHandlerForMessage(messageContext, AsynchAEMessage.Request, AsynchAEMessage.Stop)) {
+ int payload = messageContext.getMessageIntProperty(AsynchAEMessage.Payload);
+ int command = messageContext.getMessageIntProperty(AsynchAEMessage.Command);
+
+ getController().getControllerLatch().waitUntilInitialized();
+
+ // If a Process Request, increment number of CASes processed
+ if (messageContext.getMessageIntProperty(AsynchAEMessage.MessageType) == AsynchAEMessage.Request
+ && command == AsynchAEMessage.Process
+ && !messageContext.propertyExists(AsynchAEMessage.CasSequence)) {
+ // Increment number of CASes processed by this service
+ getController().getServicePerformance().incrementNumberOfCASesProcessed();
+ }
+ if (getController().isStopped()) {
+ return;
+ }
+
+ if (AsynchAEMessage.CASRefID == payload) {
+ // Fetch id of the CAS from the message.
+ if (getCasReferenceId(messageContext) == null) {
+ return; // Invalid message. Nothing to do
+ }
+
+ handleProcessRequestWithCASReference(messageContext);
+ } else if (AsynchAEMessage.XMIPayload == payload
+ || AsynchAEMessage.BinaryPayload == payload) {
+ // Fetch id of the CAS from the message.
+ if (getCasReferenceId(messageContext) == null) {
+ return; // Invalid message. Nothing to do
+ }
+ handleProcessRequestFromRemoteClient(messageContext);
+ } else if (AsynchAEMessage.XCASPayload == payload) {
+ // Fetch id of the CAS from the message.
+ if (getCasReferenceId(messageContext) == null) {
+ return; // Invalid message. Nothing to do
+ }
+ handleProcessRequestWithXCAS(messageContext);
+ } else if (AsynchAEMessage.None == payload
+ && AsynchAEMessage.CollectionProcessComplete == command) {
+ handleCollectionProcessCompleteRequest(messageContext);
+ } else if (AsynchAEMessage.None == payload && AsynchAEMessage.ReleaseCAS == command) {
+ handleReleaseCASRequest(messageContext);
+ } else if (AsynchAEMessage.None == payload && AsynchAEMessage.Stop == command) {
handleStopRequest(messageContext);
- }
- else if ( AsynchAEMessage.None == payload && AsynchAEMessage.Ping == command)
- {
+ } else if (AsynchAEMessage.None == payload && AsynchAEMessage.Ping == command) {
handlePingRequest(messageContext);
}
- // Handled Request
- return;
- }
- // Not a Request nor Command. Delegate to the next handler in the chain
- super.delegate(messageContext);
- }
- catch( Exception e)
- {
- e.printStackTrace();
- getController().getErrorHandlerChain().handle(e, HandlerBase.populateErrorContext( (MessageContext)anObjectToHandle ), getController());
- }
- }
+ // Handled Request
+ return;
+ }
+ // Not a Request nor Command. Delegate to the next handler in the chain
+ super.delegate(messageContext);
+ } catch (Exception e) {
+ e.printStackTrace();
+ getController().getErrorHandlerChain().handle(e,
+ HandlerBase.populateErrorContext((MessageContext) anObjectToHandle), getController());
+ }
+ }
}