You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2012/03/05 20:23:55 UTC

svn commit: r1297187 - in /uima/uima-as/trunk: uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ uimaj-as-core/src/main/java/org/apache/uima/aae/ uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ uimaj-as-jms/src/main/j...

Author: cwiklik
Date: Mon Mar  5 19:23:55 2012
New Revision: 1297187

URL: http://svn.apache.org/viewvc?rev=1297187&view=rev
Log:
UIMA-2180 extended support for collecting perf stats to uima-as aggregates 

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java
    uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
    uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1297187&r1=1297186&r2=1297187&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Mon Mar  5 19:23:55 2012
@@ -1209,11 +1209,14 @@ public class JmsOutputChannel implements
                 anEndpoint.getEndpoint());
       } else {
         try {
-          CasStateEntry entry = 
-            getAnalysisEngineController().getLocalCache().lookupEntry(aCasReferenceId);
-          if ( entry.getAEPerformanceList().size() > 0 ) {
-            aTextMessage.setStringProperty(AsynchAEMessage.CASPerComponentMetrics, 
-                    xstream.toXML(entry.getAEPerformanceList()));
+          if ( getAnalysisEngineController().getInProcessCache().entryExists(aCasReferenceId)) {
+            CacheEntry entry = 
+                    getAnalysisEngineController().getInProcessCache().getCacheEntryForCAS(aCasReferenceId);
+            //  getDelegateMetrics returns an empty list if no metrics are found
+            if ( entry.getDelegateMetrics().size() > 0 ) {
+              aTextMessage.setStringProperty(AsynchAEMessage.CASPerComponentMetrics, 
+                      xstream.toXML(entry.getDelegateMetrics()));
+            }
           }
         } catch( Exception ex) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=1297187&r1=1297186&r2=1297187&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java Mon Mar  5 19:23:55 2012
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Stack;
 import java.util.concurrent.ConcurrentHashMap;
@@ -36,6 +37,7 @@ import org.apache.uima.aae.controller.Ev
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
 import org.apache.uima.aae.monitor.statistics.DelegateStats;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.Marker;
@@ -471,6 +473,18 @@ public class InProcessCache implements I
     return getTopAncestorEndpoint(parentEntry);
   }
 
+  public CacheEntry getTopAncestorCasEntry(CacheEntry anEntry) throws Exception {
+    if (anEntry == null) {
+      return null;
+    }
+
+    if (anEntry.getInputCasReferenceId() == null) {
+      return anEntry;
+    }
+    CacheEntry parentEntry = getCacheEntryForCAS(anEntry.getInputCasReferenceId());
+    return getTopAncestorCasEntry(parentEntry);
+  }
+
   public synchronized CacheEntry getCacheEntryForCAS(String aCasReferenceId)
           throws AsynchAEException {
     CacheEntry casRefEntry = getEntry(aCasReferenceId);
@@ -566,6 +580,8 @@ public class InProcessCache implements I
 		//  via ThreadLocal var
     private Semaphore threadCompletionSemaphore;
     
+    private Map<String,List<AnalysisEnginePerformanceMetrics>> delegateMetrics =
+            new ConcurrentHashMap<String, List<AnalysisEnginePerformanceMetrics>>();
     
     public Semaphore getThreadCompletionSemaphore() {
       return threadCompletionSemaphore;
@@ -866,7 +882,44 @@ public class InProcessCache implements I
     public void setMarker(Marker mark) {
 			this.marker = mark;
 		}
+    public void addDelegateMetrics(String delegateKey, List<AnalysisEnginePerformanceMetrics> metrics) {
+      addDelegateMetrics(delegateKey, metrics, false);
+    }
 
+    public void addDelegateMetrics(String delegateKey, List<AnalysisEnginePerformanceMetrics> metrics, boolean remote) {
+/*
+      System.out.println("................ Adding metrics for delegate:"+delegateKey+" Metrics Size:"+metrics.size()+" CAS:"+getCasReferenceId());
+      if ( remote && delegateMetrics.containsKey(delegateKey)) {
+//        List<AnalysisEnginePerformanceMetrics> delegateMetrics = 
+//                delegateMetrics.get(delegateKey);
+        List<AnalysisEnginePerformanceMetrics> currentMetrics = 
+            delegateMetrics.get(delegateKey);
+        for( AnalysisEnginePerformanceMetrics rm : metrics) {
+          for( AnalysisEnginePerformanceMetrics cm : currentMetrics ) {
+            if ( cm.getUniqueName().equals(rm.getUniqueName())) {
+              AnalysisEnginePerformanceMetrics apm = 
+                      new AnalysisEnginePerformanceMetrics(rm.getName(),rm.getUniqueName(),rm.getAnalysisTime(),cm.getNumProcessed()+rm.getNumProcessed());
+              currentMetrics.remove(cm);
+              currentMetrics.add(apm);
+              break;
+            }
+          }
+        }
+      } else {
+        delegateMetrics.put(delegateKey, metrics);
+      }
+*/
+      delegateMetrics.put(delegateKey, metrics);
+    }
+    public List<AnalysisEnginePerformanceMetrics> getDelegateMetrics() {
+      List<AnalysisEnginePerformanceMetrics> metrics = new ArrayList<AnalysisEnginePerformanceMetrics>();
+      for( Entry<String,List<AnalysisEnginePerformanceMetrics>> dm : delegateMetrics.entrySet()) {
+        for(AnalysisEnginePerformanceMetrics metric : dm.getValue()) {
+          metrics.add(metric);
+        }
+      }
+      return metrics;
+    }
   }
 
 }

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java?rev=1297187&r1=1297186&r2=1297187&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/UimaSerializer.java Mon Mar  5 19:23:55 2012
@@ -27,6 +27,8 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.io.Writer; //import java.util.concurrent.ConcurrentHashMap;
+import java.util.ArrayList;
+import java.util.List;
 
 import javax.xml.parsers.FactoryConfigurationError;
 import javax.xml.parsers.ParserConfigurationException;
@@ -34,6 +36,7 @@ import javax.xml.parsers.SAXParser;
 import javax.xml.parsers.SAXParserFactory;
 import javax.xml.transform.OutputKeys;
 
+import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.Marker;
 import org.apache.uima.cas.TypeSystem;
@@ -52,6 +55,9 @@ import org.xml.sax.SAXException;
 import org.xml.sax.XMLReader;
 import org.xml.sax.helpers.XMLReaderFactory;
 
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
+
 public class UimaSerializer {
   private final ThreadLocal<XMLReader> localXmlReader = new ThreadLocal<XMLReader>();
 
@@ -235,4 +241,16 @@ public class UimaSerializer {
       }
     }
   }
+  @SuppressWarnings("unchecked")
+  public static List<AnalysisEnginePerformanceMetrics> deserializePerformanceMetrics(String serializedComponentStats) {
+    // check if we received components stats. Currently UIMA AS is not supporting per component
+    // stats in asynch aggregates. If the service is asynch, just return an empty list
+    if ( serializedComponentStats == null || serializedComponentStats.trim().length() == 0 ) {
+      // return an empty list
+      return new ArrayList<AnalysisEnginePerformanceMetrics>();
+    }
+    XStream xstream = new XStream(new DomDriver());
+    return (List<AnalysisEnginePerformanceMetrics>)xstream.fromXML(serializedComponentStats);
+  }
+  
 }

Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java?rev=1297187&r1=1297186&r2=1297187&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java (original)
+++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessResponseHandler.java Mon Mar  5 19:23:55 2012
@@ -20,6 +20,8 @@
 package org.apache.uima.aae.handler.input;
 
 import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.uima.UIMAException;
 import org.apache.uima.UIMAFramework;
@@ -45,6 +47,7 @@ import org.apache.uima.aae.jmx.ServicePe
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.MessageContext;
 import org.apache.uima.aae.monitor.Monitor;
+import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
 import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.impl.AllowPreexistingFS;
@@ -186,7 +189,37 @@ public class ProcessResponseHandler exte
       // during deserialization
       CacheEntry cacheEntry = getController().getInProcessCache().getCacheEntryForCAS(
               casReferenceId);
-
+      if ( aMessageContext.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
+        try {
+          CacheEntry ancestor = 
+                  getController().
+                    getInProcessCache().
+                      getTopAncestorCasEntry(cacheEntry);
+          if ( ancestor != null ) {
+            List<AnalysisEnginePerformanceMetrics> metrics = 
+                    UimaSerializer.deserializePerformanceMetrics(aMessageContext.getMessageStringProperty(AsynchAEMessage.CASPerComponentMetrics));
+            
+            List<AnalysisEnginePerformanceMetrics> adjustedMetrics =
+                    new ArrayList<AnalysisEnginePerformanceMetrics>();
+            for(AnalysisEnginePerformanceMetrics delegateMetric : metrics ) {
+              String tmp =
+                      delegateMetric.getUniqueName().substring(delegateMetric.getUniqueName().indexOf(","));
+              String adjustedUniqueName =
+                ((AggregateAnalysisEngineController) getController()).getJMXDomain()+((AggregateAnalysisEngineController) getController()).getJmxContext()+tmp;
+              AnalysisEnginePerformanceMetrics metric =
+                      new AnalysisEnginePerformanceMetrics(delegateMetric.getName(),adjustedUniqueName,delegateMetric.getAnalysisTime(),delegateMetric.getNumProcessed());
+              adjustedMetrics.add(metric);
+            }
+            ancestor.addDelegateMetrics(delegateKey, adjustedMetrics, true);  // true=remote
+          }
+        } catch (Exception e) {
+          // An exception be be thrown here if the service is being stopped.
+          // The top level controller may have already cleaned up the cache
+          // and the getCacheEntryForCAS() will throw an exception. Ignore it
+          // here, we are shutting down.
+        }
+        
+      }
       CasStateEntry casStateEntry = ((AggregateAnalysisEngineController) getController())
               .getLocalCache().lookupEntry(casReferenceId);
       if (casStateEntry != null) {

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1297187&r1=1297186&r2=1297187&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Mon Mar  5 19:23:55 2012
@@ -25,6 +25,7 @@ import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
@@ -1096,24 +1097,14 @@ public abstract class BaseUIMAAsynchrono
   @SuppressWarnings("unchecked")
   protected void notifyListeners(CAS aCAS, EntityProcessStatus aStatus, int aCommand, String serializedComponentStats) {
     if ( aCommand == AsynchAEMessage.Process) {
-      ((UimaASProcessStatusImpl)aStatus).setPerformanceMetrics(deserializePerformanceMetrics(serializedComponentStats));
+      ((UimaASProcessStatusImpl)aStatus).
+        setPerformanceMetrics(UimaSerializer.deserializePerformanceMetrics(serializedComponentStats));
       for (int i = 0; listeners != null && i < listeners.size(); i++) {
         UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
         statCL.entityProcessComplete(aCAS, aStatus);
       }
     }
   }
-  @SuppressWarnings("unchecked")
-  private List<AnalysisEnginePerformanceMetrics> deserializePerformanceMetrics(String serializedComponentStats) {
-    // check if we received components stats. Currently UIMA AS is not supporting per component
-    // stats in asynch aggregates. If the service is asynch, just return an empty list
-    if ( serializedComponentStats == null || serializedComponentStats.trim().length() == 0 ) {
-      // return an empty list
-      return new ArrayList<AnalysisEnginePerformanceMetrics>();
-    }
-    XStream xstream = new XStream(new DomDriver());
-    return (List<AnalysisEnginePerformanceMetrics>)xstream.fromXML(serializedComponentStats);
-  }
   protected void notifyListeners(CAS aCAS, EntityProcessStatus aStatus, int aCommand) {
     for (int i = 0; listeners != null && i < listeners.size(); i++) {
       UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
@@ -1587,7 +1578,7 @@ public abstract class BaseUIMAAsynchrono
                 cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
                 if ( cachedRequest != null && cachedRequest.getComponentMetricsList() != null ) {
                 	cachedRequest.getComponentMetricsList().
-                		addAll(deserializePerformanceMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics)));
+                		addAll(UimaSerializer.deserializePerformanceMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics)));
                 }
             }
         }