You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/12/13 17:23:20 UTC
svn commit: r1213768 -
/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Author: cwiklik
Date: Tue Dec 13 16:23:19 2011
New Revision: 1213768
URL: http://svn.apache.org/viewvc?rev=1213768&view=rev
Log:
UIMA-2180 refactor code handling per cas performance breakdown list
Modified:
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1213768&r1=1213767&r2=1213768&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Tue Dec 13 16:23:19 2011
@@ -210,6 +210,8 @@ public abstract class BaseUIMAAsynchrono
private ExecutorService exec = Executors.newFixedThreadPool(1);
+ private volatile boolean casMultiplierDelegate;
+
abstract public String getEndPointName() throws Exception;
abstract protected TextMessage createTextMessage() throws Exception;
@@ -745,7 +747,7 @@ public abstract class BaseUIMAAsynchrono
try {
getMetaSemaphore.acquire();
} catch (InterruptedException e) {
-
+ e.printStackTrace();
} finally {
getMetaSemaphore.release();
}
@@ -1073,6 +1075,7 @@ public abstract class BaseUIMAAsynchrono
// Adam - store ResouceMetaData in field so we can return it from getMetaData().
resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser()
.parseResourceMetaData(in1);
+ casMultiplierDelegate = resourceMetadata.getOperationalProperties().getOutputsNewCASes();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
"handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
@@ -1097,13 +1100,16 @@ public abstract class BaseUIMAAsynchrono
if ( aCommand == AsynchAEMessage.Process) {
for (int i = 0; listeners != null && i < listeners.size(); i++) {
UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
- XStream xstream = new XStream(new DomDriver());
statCL.entityProcessComplete(aCAS, aStatus,
- (List<AnalysisEnginePerformanceMetrics>)xstream.fromXML(serializedComponentStats));
+ deserializePerformanceMetrics(serializedComponentStats));
}
}
}
-
+ @SuppressWarnings("unchecked")
+ private List<AnalysisEnginePerformanceMetrics> deserializePerformanceMetrics(String serializedComponentStats) {
+ XStream xstream = new XStream(new DomDriver());
+ return (List<AnalysisEnginePerformanceMetrics>)xstream.fromXML(serializedComponentStats);
+ }
protected void notifyListeners(CAS aCAS, EntityProcessStatus aStatus, int aCommand) {
for (int i = 0; listeners != null && i < listeners.size(); i++) {
UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
@@ -1231,20 +1237,7 @@ public abstract class BaseUIMAAsynchrono
String.valueOf(casCachedRequest.getCAS().hashCode())
});
}
- } else {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.INFO,
- CLASS_NAME.getName(),
- "handleServiceInfo",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_skipping_onBeforeProcessCAS_INFO",
- new Object[] {
- casReferenceId,
- String.valueOf(casCachedRequest.getCAS().hashCode()),
- nodeIP, pid
- });
-
- }
+ }
}
} catch( Exception e) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
@@ -1315,6 +1308,14 @@ public abstract class BaseUIMAAsynchrono
handleException(message, cachedRequest, true);
return;
}
+ // cachedRequest is only null if we are receiving child CASes from a
+ // Cas Multiplier. Otherwise, we drop the message as it is out of band
+ if ( cachedRequest == null && !casMultiplierDelegate ) {
+ // most likely a reply came in after the thread was interrupted
+ return;
+ }
+
+
// If the Cas Reference id not in the message check if the message contains an
// exception and if so, handle the exception and return.
if (casReferenceId == null) {
@@ -1577,6 +1578,14 @@ public abstract class BaseUIMAAsynchrono
// Add CAS identifier to enable matching replies with requests
notifyListeners(cas, status, AsynchAEMessage.Process);
}
+ } else { // synchronous sendAndReceive() was used
+ if (casReferenceId != null && message.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
+ cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
+ if ( cachedRequest != null && cachedRequest.getComponentMetricsList() != null ) {
+ cachedRequest.getComponentMetricsList().
+ addAll(deserializePerformanceMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics)));
+ }
+ }
}
} finally {
// Dont release the CAS if the application uses synchronous API
@@ -1895,12 +1904,21 @@ public abstract class BaseUIMAAsynchrono
public ProcessingResourceMetaData getMetaData() throws ResourceInitializationException {
return resourceMetadata;
}
+ public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException {
+ return sendAndReceiveCAS(aCAS, null, null);
+ }
+ public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException {
+ return sendAndReceiveCAS(aCAS, pt, null);
+ }
+ public String sendAndReceiveCAS(CAS aCAS, List<AnalysisEnginePerformanceMetrics> componentMetricsList) throws ResourceProcessException {
+ return sendAndReceiveCAS(aCAS, null, componentMetricsList);
+ }
/**
* This is a synchronous method which sends a message to a destination and blocks waiting for a
* reply.
*/
- public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException {
+ public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt, List<AnalysisEnginePerformanceMetrics> componentMetricsList) throws ResourceProcessException {
if (!running) {
throw new ResourceProcessException(new Exception("Uima EE Client Not In Running State"));
}
@@ -1926,6 +1944,11 @@ public abstract class BaseUIMAAsynchrono
ClientRequest cachedRequest = produceNewClientRequestObject();
cachedRequest.setSynchronousInvocation();
+
+ // save application provided List where the performance stats will be copied
+ // when reply comes back
+ cachedRequest.setComponentMetricsList(componentMetricsList);
+
// This is synchronous call, acquire and hold the semaphore before
// dispatching a CAS to a service. The semaphore will be released
// iff:
@@ -1949,7 +1972,7 @@ public abstract class BaseUIMAAsynchrono
// for the second oldest CAS in the outstanding list.
serviceDelegate.cancelTimerForCasOrPurge(casReferenceId);
throw new ResourceProcessException(e);
- }
+ }
}
try {
// send CAS. This call does not block. Instead we will block the sending thread below.
@@ -1962,6 +1985,7 @@ public abstract class BaseUIMAAsynchrono
} catch( ResourceProcessException e) {
threadMonitor.getMonitor().release();
+ removeFromCache(casReferenceId);
throw e;
}
if (threadMonitor != null && threadMonitor.getMonitor() != null) {
@@ -2004,12 +2028,18 @@ public abstract class BaseUIMAAsynchrono
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_client_interrupted_INFO", new Object[] { casReferenceId, aCAS.hashCode()});
+ "UIMAJMS_client_interrupted_INFO", new Object[] { Thread.currentThread().getId(), casReferenceId, String.valueOf(aCAS.hashCode())});
}
// cancel the timer if it is associated with a CAS this thread is waiting for. This would be
// the oldest CAS submitted to a queue for processing. The timer will be canceled and restarted
// for the second oldest CAS in the outstanding list.
serviceDelegate.cancelTimerForCasOrPurge(casReferenceId);
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_client_canceled_timer_INFO", new Object[] { Thread.currentThread().getId(), casReferenceId, String.valueOf(aCAS.hashCode())});
+ }
+ removeFromCache(casReferenceId);
throw new ResourceProcessException(e);
} finally {
threadMonitor.getMonitor().release();
@@ -2071,10 +2101,6 @@ public abstract class BaseUIMAAsynchrono
}
}
- public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException {
- return sendAndReceiveCAS(aCAS, null);
- }
-
protected void notifyOnTimout(CAS aCAS, String anEndpoint, int aTimeoutKind, String casReferenceId) {
ProcessTrace pt = new ProcessTrace_impl();
@@ -2272,7 +2298,18 @@ public abstract class BaseUIMAAsynchrono
private String hostIpProcessingCAS;
- public String getHostIpProcessingCAS() {
+ List<AnalysisEnginePerformanceMetrics> componentMetricsList;
+
+ public List<AnalysisEnginePerformanceMetrics> getComponentMetricsList() {
+ return componentMetricsList;
+ }
+
+ public void setComponentMetricsList(
+ List<AnalysisEnginePerformanceMetrics> componentMetricsList) {
+ this.componentMetricsList = componentMetricsList;
+ }
+
+ public String getHostIpProcessingCAS() {
return hostIpProcessingCAS;
}