You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/01/27 21:11:42 UTC

svn commit: r738216 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Author: cwiklik
Date: Tue Jan 27 20:11:42 2009
New Revision: 738216

URL: http://svn.apache.org/viewvc?rev=738216&view=rev
Log:
UIMA-1279 Fixes hangs in client when handling errors during a deployment of synch aggregate that uses jms service descriptor

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=738216&r1=738215&r2=738216&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Tue Jan 27 20:11:42 2009
@@ -67,6 +67,7 @@
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.impl.AllowPreexistingFS;
 import org.apache.uima.cas.impl.XmiSerializationSharedData;
@@ -884,23 +885,26 @@
 	{
 		//	Save reply message in the cache
 		cachedRequest.setMessage(message);
-		//	Signal a thread that we received a reply
-		if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
-		{
-			ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
-			//	Unblock the sending thread so that it can complete processing
-			//	of the reply. The message has been stored in the cache and 
-			//	when the thread wakes up due to notification below, it will
-			//	retrieve the reply and process it.
-			synchronized( threadMonitor.getMonitor() )
-			{
-				threadMonitor.setWasSignaled();
-				cachedRequest.setReceivedProcessCasReply();
-				threadMonitor.getMonitor().notifyAll();
-			}
-		}
+		wakeUpSendThread(cachedRequest);
 	}
 
+	private void wakeUpSendThread(ClientRequest cachedRequest) throws Exception {
+    if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
+    {
+      ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
+      //  Unblock the sending thread so that it can complete processing
+      //  of the reply. The message has been stored in the cache and 
+      //  when the thread wakes up due to notification below, it will
+      //  retrieve the reply and process it.
+      synchronized( threadMonitor.getMonitor() )
+      {
+        threadMonitor.setWasSignaled();
+        cachedRequest.setReceivedProcessCasReply();
+        threadMonitor.getMonitor().notifyAll();
+      }
+    }
+	  
+	}
 	
 	/**
 	 * Handles response to Process CAS request. If the message originated in a service that is running in a separate jvm (remote), 
@@ -1080,10 +1084,15 @@
 			clientSideJmxStats.incrementProcessErrorCount();
 		}
 		Exception exception = retrieveExceptionFromMessage(message);
-		
-		
-		exception.printStackTrace();
-		
+		if ( exception != null && cachedRequest != null ) {
+	    cachedRequest.setException(exception);
+	    if ( exception instanceof AnalysisEngineProcessException ||
+	         ( exception.getCause() != null && 
+	           exception.getCause() instanceof AnalysisEngineProcessException) ) {
+	      // Indicate that this is a process exception. 
+	      cachedRequest.setProcessException();
+	    }
+		}
 		receivedCpcReply = true; // change state as if the CPC reply came in. This is done to prevent a hang on CPC request 
 		synchronized(endOfCollectionMonitor)
 		{
@@ -1128,12 +1137,19 @@
       throw e;
     }
     finally {
-      //  Dont release the CAS if the application uses synchronous API
-      if ( cachedRequest != null && 
-           !cachedRequest.isSynchronousInvocation() && 
-           cachedRequest.getCAS() != null )
-      {
-         cachedRequest.getCAS().release();
+      
+      if ( cachedRequest != null ) {
+        if ( cachedRequest.isSynchronousInvocation() && cachedRequest.isProcessException() ) {
+          //  Wake up the send thread that is blocking waiting for a reply. When the thread
+          //  receives the signal, it checks if the reply contains an exception and will 
+          //  not return control back to the client
+          wakeUpSendThread(cachedRequest);
+        }
+        //  Dont release the CAS if the application uses synchronous API
+        if ( !cachedRequest.isSynchronousInvocation() && cachedRequest.getCAS() != null )
+        {
+           cachedRequest.getCAS().release();
+        }
       }
       removeFromCache(casReferenceId);
       serviceDelegate.removeCasFromOutstandingList(casReferenceId);
@@ -1499,7 +1515,8 @@
             //  If there service is in the ok state and the CAS is in the
             //  list of CASes pending dispatch, remove the CAS from the list
             //  and send it to the service.
-            if (cachedRequest.isTimeoutException()) {
+            if (cachedRequest.isTimeoutException() || 
+                cachedRequest.isProcessException() ) {
               // Handled below
               break;
             }
@@ -1515,11 +1532,16 @@
     if ( abort ) {
       throw new ResourceProcessException(new RuntimeException("Uima AS Client API Stopping"));
     }
+    // check if timeout exception
+    if (cachedRequest.isTimeoutException()) {
+      throw new ResourceProcessException(new UimaASProcessCasTimeout());
+    }
+    //  If a reply contains process exception, throw an exception and let the
+    //  listener decide what happens next
+    if (cachedRequest.isProcessException()) {
+      throw new ResourceProcessException(cachedRequest.getException());
+    }
     try {
-      // check if timeout exception
-      if (cachedRequest.isTimeoutException()) {
-        throw new ResourceProcessException(new UimaASProcessCasTimeout());
-      }
       // Process reply in the send thread
       Message message = cachedRequest.getMessage();
       deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, false);
@@ -1704,29 +1726,29 @@
 
 		private BaseUIMAAsynchronousEngineCommon_impl uimaEEEngine = null;
 
-		private boolean isSerializedCAS;
+		private volatile boolean isSerializedCAS;
 
 		private String serializedCAS;
 
 		private CAS cas;
 
-		private boolean isMetaRequest = false;
+		private volatile boolean isMetaRequest = false;
 
-		private boolean isCPCRequest = false;
+		private volatile boolean isCPCRequest = false;
 
-		private boolean isRemote = true;
+		private volatile boolean isRemote = true;
 
 		private String endpoint;
 
-		private boolean receivedProcessCasReply = false;
+		private volatile boolean receivedProcessCasReply = false;
 		
 		private long threadId=-1;
 		
 		private Message message;
 		
-		private boolean synchronousInvocation;  
+		private volatile boolean synchronousInvocation;  
 		
-    private boolean timeoutException;  
+    private volatile boolean timeoutException;  
     
 		private long casDepartureTime;
 		
@@ -1748,8 +1770,24 @@
     
     private volatile boolean isBinaryCas = false;
 
-		
-		public long getMetaTimeoutErrorCount() {
+    private Exception exception;
+
+    private volatile boolean processException;
+    
+    public boolean isProcessException() {
+      return processException;
+    }
+    public void setProcessException() {
+      this.processException = true;
+    }
+    public Exception getException() {
+      return exception;
+    }
+    public void setException(Exception exception) {
+      this.exception = exception;
+    }
+
+    public long getMetaTimeoutErrorCount() {
 			return metaTimeoutErrorCount;
 		}
 		public void setMetaTimeoutErrorCount(long timeoutErrorCount) {