You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2012/03/05 20:23:55 UTC
svn commit: r1297187 - in /uima/uima-as/trunk:
uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/
uimaj-as-core/src/main/java/org/apache/uima/aae/
uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/
uimaj-as-jms/src/main/j...
Author: cwiklik
Date: Mon Mar 5 19:23:55 2012
New Revision: 1297187
URL: http://svn.apache.org/viewvc?rev=1297187&view=rev
Log:
UIMA-2180 extended support for collecting perf stats to uima-as aggregates
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1297187&r1=1297186&r2=1297187&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Mon Mar 5 19:23:55 2012
@@ -1209,11 +1209,14 @@ public class JmsOutputChannel implements
anEndpoint.getEndpoint());
} else {
try {
- CasStateEntry entry =
- getAnalysisEngineController().getLocalCache().lookupEntry(aCasReferenceId);
- if ( entry.getAEPerformanceList().size() > 0 ) {
- aTextMessage.setStringProperty(AsynchAEMessage.CASPerComponentMetrics,
- xstream.toXML(entry.getAEPerformanceList()));
+ if ( getAnalysisEngineController().getInProcessCache().entryExists(aCasReferenceId)) {
+ CacheEntry entry =
+ getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+ // getDelegateMetrics returns an empty list if no metrics are found
+ if ( entry.getDelegateMetrics().size() > 0 ) {
+ aTextMessage.setStringProperty(AsynchAEMessage.CASPerComponentMetrics,
+ xstream.toXML(entry.getDelegateMetrics()));
+ }
}
} catch( Exception ex) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=1297187&r1=1297186&r2=1297187&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java Mon Mar 5 19:23:55 2012
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
@@ -36,6 +37,7 @@ import org.apache.uima.aae.controller.Ev
import org.apache.uima.aae.error.AsynchAEException;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
import org.apache.uima.aae.monitor.statistics.DelegateStats;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.Marker;
@@ -471,6 +473,18 @@ public class InProcessCache implements I
return getTopAncestorEndpoint(parentEntry);
}
+ public CacheEntry getTopAncestorCasEntry(CacheEntry anEntry) throws Exception {
+ if (anEntry == null) {
+ return null;
+ }
+
+ if (anEntry.getInputCasReferenceId() == null) {
+ return anEntry;
+ }
+ CacheEntry parentEntry = getCacheEntryForCAS(anEntry.getInputCasReferenceId());
+ return getTopAncestorCasEntry(parentEntry);
+ }
+
public synchronized CacheEntry getCacheEntryForCAS(String aCasReferenceId)
throws AsynchAEException {
CacheEntry casRefEntry = getEntry(aCasReferenceId);
@@ -566,6 +580,8 @@ public class InProcessCache implements I
// via ThreadLocal var
private Semaphore threadCompletionSemaphore;
+ private Map<String,List<AnalysisEnginePerformanceMetrics>> delegateMetrics =
+ new ConcurrentHashMap<String, List<AnalysisEnginePerformanceMetrics>>();
public Semaphore getThreadCompletionSemaphore() {
return threadCompletionSemaphore;
@@ -866,7 +882,44 @@ public class InProcessCache implements I
public void setMarker(Marker mark) {
this.marker = mark;
}
+ public void addDelegateMetrics(String delegateKey, List<AnalysisEnginePerformanceMetrics> metrics) {
+ addDelegateMetrics(delegateKey, metrics, false);
+ }
+ public void addDelegateMetrics(String delegateKey, List<AnalysisEnginePerformanceMetrics> metrics, boolean remote) {
+/*
+ System.out.println("................ Adding metrics for delegate:"+delegateKey+" Metrics Size:"+metrics.size()+" CAS:"+getCasReferenceId());
+ if ( remote && delegateMetrics.containsKey(delegateKey)) {
+// List<AnalysisEnginePerformanceMetrics> delegateMetrics =
+// delegateMetrics.get(delegateKey);
+ List<AnalysisEnginePerformanceMetrics> currentMetrics =
+ delegateMetrics.get(delegateKey);
+ for( AnalysisEnginePerformanceMetrics rm : metrics) {
+ for( AnalysisEnginePerformanceMetrics cm : currentMetrics ) {
+ if ( cm.getUniqueName().equals(rm.getUniqueName())) {
+ AnalysisEnginePerformanceMetrics apm =
+ new AnalysisEnginePerformanceMetrics(rm.getName(),rm.getUniqueName(),rm.getAnalysisTime(),cm.getNumProcessed()+rm.getNumProcessed());
+ currentMetrics.remove(cm);
+ currentMetrics.add(apm);
+ break;
+ }
+ }
+ }
+ } else {
+ delegateMetrics.put(delegateKey, metrics);
+ }
+*/
+ delegateMetrics.put(delegateKey, metrics);
+ }
+ public List<AnalysisEnginePerformanceMetrics> getDelegateMetrics() {
+ List<AnalysisEnginePerformanceMetrics> metrics = new ArrayList<AnalysisEnginePerformanceMetrics>();
+ for( Entry<String,List<AnalysisEnginePerformanceMetrics>> dm : delegateMetrics.entrySet()) {
+ for(AnalysisEnginePerformanceMetrics metric : dm.getValue()) {
+ metrics.add(metric);
+ }
+ }
+ return metrics;
+ }
}
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java?rev=1297187&r1=1297186&r2=1297187&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java Mon Mar 5 19:23:55 2012
@@ -27,6 +27,8 @@ import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer; //import java.util.concurrent.ConcurrentHashMap;
+import java.util.ArrayList;
+import java.util.List;
import javax.xml.parsers.FactoryConfigurationError;
import javax.xml.parsers.ParserConfigurationException;
@@ -34,6 +36,7 @@ import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;
import javax.xml.transform.OutputKeys;
+import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.Marker;
import org.apache.uima.cas.TypeSystem;
@@ -52,6 +55,9 @@ import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;
import org.xml.sax.helpers.XMLReaderFactory;
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
+
public class UimaSerializer {
private final ThreadLocal<XMLReader> localXmlReader = new ThreadLocal<XMLReader>();
@@ -235,4 +241,16 @@ public class UimaSerializer {
}
}
}
+ @SuppressWarnings("unchecked")
+ public static List<AnalysisEnginePerformanceMetrics> deserializePerformanceMetrics(String serializedComponentStats) {
+ // check if we received components stats. Currently UIMA AS is not supporting per component
+ // stats in asynch aggregates. If the service is asynch, just return an empty list
+ if ( serializedComponentStats == null || serializedComponentStats.trim().length() == 0 ) {
+ // return an empty list
+ return new ArrayList<AnalysisEnginePerformanceMetrics>();
+ }
+ XStream xstream = new XStream(new DomDriver());
+ return (List<AnalysisEnginePerformanceMetrics>)xstream.fromXML(serializedComponentStats);
+ }
+
}
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=1297187&r1=1297186&r2=1297187&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java Mon Mar 5 19:23:55 2012
@@ -20,6 +20,8 @@
package org.apache.uima.aae.handler.input;
import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.uima.UIMAException;
import org.apache.uima.UIMAFramework;
@@ -45,6 +47,7 @@ import org.apache.uima.aae.jmx.ServicePe
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.aae.message.MessageContext;
import org.apache.uima.aae.monitor.Monitor;
+import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.impl.AllowPreexistingFS;
@@ -186,7 +189,37 @@ public class ProcessResponseHandler exte
// during deserialization
CacheEntry cacheEntry = getController().getInProcessCache().getCacheEntryForCAS(
casReferenceId);
-
+ if ( aMessageContext.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
+ try {
+ CacheEntry ancestor =
+ getController().
+ getInProcessCache().
+ getTopAncestorCasEntry(cacheEntry);
+ if ( ancestor != null ) {
+ List<AnalysisEnginePerformanceMetrics> metrics =
+ UimaSerializer.deserializePerformanceMetrics(aMessageContext.getMessageStringProperty(AsynchAEMessage.CASPerComponentMetrics));
+
+ List<AnalysisEnginePerformanceMetrics> adjustedMetrics =
+ new ArrayList<AnalysisEnginePerformanceMetrics>();
+ for(AnalysisEnginePerformanceMetrics delegateMetric : metrics ) {
+ String tmp =
+ delegateMetric.getUniqueName().substring(delegateMetric.getUniqueName().indexOf(","));
+ String adjustedUniqueName =
+ ((AggregateAnalysisEngineController) getController()).getJMXDomain()+((AggregateAnalysisEngineController) getController()).getJmxContext()+tmp;
+ AnalysisEnginePerformanceMetrics metric =
+ new AnalysisEnginePerformanceMetrics(delegateMetric.getName(),adjustedUniqueName,delegateMetric.getAnalysisTime(),delegateMetric.getNumProcessed());
+ adjustedMetrics.add(metric);
+ }
+ ancestor.addDelegateMetrics(delegateKey, adjustedMetrics, true); // true=remote
+ }
+ } catch (Exception e) {
+ // An exception be be thrown here if the service is being stopped.
+ // The top level controller may have already cleaned up the cache
+ // and the getCacheEntryForCAS() will throw an exception. Ignore it
+ // here, we are shutting down.
+ }
+
+ }
CasStateEntry casStateEntry = ((AggregateAnalysisEngineController) getController())
.getLocalCache().lookupEntry(casReferenceId);
if (casStateEntry != null) {
Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1297187&r1=1297186&r2=1297187&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Mon Mar 5 19:23:55 2012
@@ -25,6 +25,7 @@ import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
@@ -1096,24 +1097,14 @@ public abstract class BaseUIMAAsynchrono
@SuppressWarnings("unchecked")
protected void notifyListeners(CAS aCAS, EntityProcessStatus aStatus, int aCommand, String serializedComponentStats) {
if ( aCommand == AsynchAEMessage.Process) {
- ((UimaASProcessStatusImpl)aStatus).setPerformanceMetrics(deserializePerformanceMetrics(serializedComponentStats));
+ ((UimaASProcessStatusImpl)aStatus).
+ setPerformanceMetrics(UimaSerializer.deserializePerformanceMetrics(serializedComponentStats));
for (int i = 0; listeners != null && i < listeners.size(); i++) {
UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
statCL.entityProcessComplete(aCAS, aStatus);
}
}
}
- @SuppressWarnings("unchecked")
- private List<AnalysisEnginePerformanceMetrics> deserializePerformanceMetrics(String serializedComponentStats) {
- // check if we received components stats. Currently UIMA AS is not supporting per component
- // stats in asynch aggregates. If the service is asynch, just return an empty list
- if ( serializedComponentStats == null || serializedComponentStats.trim().length() == 0 ) {
- // return an empty list
- return new ArrayList<AnalysisEnginePerformanceMetrics>();
- }
- XStream xstream = new XStream(new DomDriver());
- return (List<AnalysisEnginePerformanceMetrics>)xstream.fromXML(serializedComponentStats);
- }
protected void notifyListeners(CAS aCAS, EntityProcessStatus aStatus, int aCommand) {
for (int i = 0; listeners != null && i < listeners.size(); i++) {
UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
@@ -1587,7 +1578,7 @@ public abstract class BaseUIMAAsynchrono
cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
if ( cachedRequest != null && cachedRequest.getComponentMetricsList() != null ) {
cachedRequest.getComponentMetricsList().
- addAll(deserializePerformanceMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics)));
+ addAll(UimaSerializer.deserializePerformanceMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics)));
}
}
}