You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2011/12/13 17:23:20 UTC

svn commit: r1213768 - /uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Author: cwiklik
Date: Tue Dec 13 16:23:19 2011
New Revision: 1213768

URL: http://svn.apache.org/viewvc?rev=1213768&view=rev
Log:
UIMA-2180 refactor code handling per cas performance breakdown list  

Modified:
    uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1213768&r1=1213767&r2=1213768&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Tue Dec 13 16:23:19 2011
@@ -210,6 +210,8 @@ public abstract class BaseUIMAAsynchrono
 
   private ExecutorService exec = Executors.newFixedThreadPool(1);
   
+  private volatile boolean casMultiplierDelegate;
+  
   abstract public String getEndPointName() throws Exception;
 
   abstract protected TextMessage createTextMessage() throws Exception;
@@ -745,7 +747,7 @@ public abstract class BaseUIMAAsynchrono
     try {
       getMetaSemaphore.acquire();
     } catch (InterruptedException e) {
-
+    	e.printStackTrace();
     } finally {
       getMetaSemaphore.release();
     }
@@ -1073,6 +1075,7 @@ public abstract class BaseUIMAAsynchrono
         // Adam - store ResouceMetaData in field so we can return it from getMetaData().
         resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser()
                 .parseResourceMetaData(in1);
+        casMultiplierDelegate = resourceMetadata.getOperationalProperties().getOutputsNewCASes();
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(),
                   "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
@@ -1097,13 +1100,16 @@ public abstract class BaseUIMAAsynchrono
     if ( aCommand == AsynchAEMessage.Process) {
       for (int i = 0; listeners != null && i < listeners.size(); i++) {
         UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
-        XStream xstream = new XStream(new DomDriver());
         statCL.entityProcessComplete(aCAS, aStatus, 
-                (List<AnalysisEnginePerformanceMetrics>)xstream.fromXML(serializedComponentStats));
+        		deserializePerformanceMetrics(serializedComponentStats));
       }
     }
   }
-
+  @SuppressWarnings("unchecked")
+  private List<AnalysisEnginePerformanceMetrics> deserializePerformanceMetrics(String serializedComponentStats) {
+      XStream xstream = new XStream(new DomDriver());
+      return (List<AnalysisEnginePerformanceMetrics>)xstream.fromXML(serializedComponentStats);
+  }
   protected void notifyListeners(CAS aCAS, EntityProcessStatus aStatus, int aCommand) {
     for (int i = 0; listeners != null && i < listeners.size(); i++) {
       UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i);
@@ -1231,20 +1237,7 @@ public abstract class BaseUIMAAsynchrono
                              	 String.valueOf(casCachedRequest.getCAS().hashCode())
                               });
                   }
-    	     } else {
-                 UIMAFramework.getLogger(CLASS_NAME).logrb(
-                         Level.INFO,
-                         CLASS_NAME.getName(),
-                         "handleServiceInfo",
-                         JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                         "UIMAJMS_skipping_onBeforeProcessCAS_INFO",
-                         new Object[] {
-                       	  casReferenceId,
-                        	 String.valueOf(casCachedRequest.getCAS().hashCode()),
-                        	 nodeIP, pid 
-                         });
-    	    	 
-    	     }
+    	     } 
     	}
     } catch( Exception e) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
@@ -1315,6 +1308,14 @@ public abstract class BaseUIMAAsynchrono
       handleException(message, cachedRequest, true);
       return;
     }
+    // cachedRequest is only null if we are receiving child CASes from a 
+    // Cas Multiplier. Otherwise, we drop the message as it is out of band
+    if ( cachedRequest == null && !casMultiplierDelegate ) {
+    	// most likely a reply came in after the thread was interrupted
+    	return;
+    }
+    
+    
     // If the Cas Reference id not in the message check if the message contains an
     // exception and if so, handle the exception and return.
     if (casReferenceId == null) {
@@ -1577,6 +1578,14 @@ public abstract class BaseUIMAAsynchrono
             // Add CAS identifier to enable matching replies with requests
             notifyListeners(cas, status, AsynchAEMessage.Process);
           }
+        } else {  // synchronous sendAndReceive() was used
+            if (casReferenceId != null && message.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
+                cachedRequest = (ClientRequest) clientCache.get(casReferenceId);
+                if ( cachedRequest != null && cachedRequest.getComponentMetricsList() != null ) {
+                	cachedRequest.getComponentMetricsList().
+                		addAll(deserializePerformanceMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics)));
+                }
+            }
         }
       } finally {
         // Dont release the CAS if the application uses synchronous API
@@ -1895,12 +1904,21 @@ public abstract class BaseUIMAAsynchrono
   public ProcessingResourceMetaData getMetaData() throws ResourceInitializationException {
     return resourceMetadata;
   }
+  public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException {
+    return sendAndReceiveCAS(aCAS, null, null);
+  }
+  public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException {
+    return sendAndReceiveCAS(aCAS, pt, null);
+  }
+  public String sendAndReceiveCAS(CAS aCAS, List<AnalysisEnginePerformanceMetrics> componentMetricsList) throws ResourceProcessException {
+    return sendAndReceiveCAS(aCAS, null, componentMetricsList);
+  }
 
   /**
    * This is a synchronous method which sends a message to a destination and blocks waiting for a
    * reply.
    */
-  public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException {
+  public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt, List<AnalysisEnginePerformanceMetrics> componentMetricsList) throws ResourceProcessException {
     if (!running) {
       throw new ResourceProcessException(new Exception("Uima EE Client Not In Running State"));
     }
@@ -1926,6 +1944,11 @@ public abstract class BaseUIMAAsynchrono
 
     ClientRequest cachedRequest = produceNewClientRequestObject();
     cachedRequest.setSynchronousInvocation();
+    
+    //	save application provided List where the performance stats will be copied
+    //  when reply comes back
+    cachedRequest.setComponentMetricsList(componentMetricsList);
+    
     // This is synchronous call, acquire and hold the semaphore before
     // dispatching a CAS to a service. The semaphore will be released
     // iff:
@@ -1949,7 +1972,7 @@ public abstract class BaseUIMAAsynchrono
     	// for the second oldest CAS in the outstanding list.
     	serviceDelegate.cancelTimerForCasOrPurge(casReferenceId);
     	throw new ResourceProcessException(e);
-      }
+      } 
     }
     try {
       // send CAS. This call does not block. Instead we will block the sending thread below.
@@ -1962,6 +1985,7 @@ public abstract class BaseUIMAAsynchrono
 
     } catch( ResourceProcessException e) {
       threadMonitor.getMonitor().release();
+      removeFromCache(casReferenceId);
       throw e;
     }
     if (threadMonitor != null && threadMonitor.getMonitor() != null) {
@@ -2004,12 +2028,18 @@ public abstract class BaseUIMAAsynchrono
         	if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
                 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                         "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                        "UIMAJMS_client_interrupted_INFO", new Object[] { casReferenceId, aCAS.hashCode()});
+                        "UIMAJMS_client_interrupted_INFO", new Object[] { Thread.currentThread().getId(), casReferenceId, String.valueOf(aCAS.hashCode())});
             }
         	// cancel the timer if it is associated with a CAS this thread is waiting for. This would be
         	// the oldest CAS submitted to a queue for processing. The timer will be canceled and restarted
         	// for the second oldest CAS in the outstanding list.
         	serviceDelegate.cancelTimerForCasOrPurge(casReferenceId);
+        	if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                        "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                        "UIMAJMS_client_canceled_timer_INFO", new Object[] { Thread.currentThread().getId(), casReferenceId, String.valueOf(aCAS.hashCode())});
+            }
+            removeFromCache(casReferenceId);
         	throw new ResourceProcessException(e);
         } finally {
           threadMonitor.getMonitor().release();
@@ -2071,10 +2101,6 @@ public abstract class BaseUIMAAsynchrono
     }
   }
 
-  public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException {
-    return sendAndReceiveCAS(aCAS, null);
-  }
-
   protected void notifyOnTimout(CAS aCAS, String anEndpoint, int aTimeoutKind, String casReferenceId) {
 
     ProcessTrace pt = new ProcessTrace_impl();
@@ -2272,7 +2298,18 @@ public abstract class BaseUIMAAsynchrono
 
     private String hostIpProcessingCAS;
     
-    public String getHostIpProcessingCAS() {
+    List<AnalysisEnginePerformanceMetrics> componentMetricsList;
+    
+    public List<AnalysisEnginePerformanceMetrics> getComponentMetricsList() {
+		return componentMetricsList;
+	}
+
+	public void setComponentMetricsList(
+			List<AnalysisEnginePerformanceMetrics> componentMetricsList) {
+		this.componentMetricsList = componentMetricsList;
+	}
+
+	public String getHostIpProcessingCAS() {
       return hostIpProcessingCAS;
     }