You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/01/27 21:11:42 UTC
svn commit: r738216 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Author: cwiklik
Date: Tue Jan 27 20:11:42 2009
New Revision: 738216
URL: http://svn.apache.org/viewvc?rev=738216&view=rev
Log:
UIMA-1279 Fixes hangs in client when handling errors during a deployment of synch aggregate that uses jms service descriptor
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=738216&r1=738215&r2=738216&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Tue Jan 27 20:11:42 2009
@@ -67,6 +67,7 @@
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.impl.AllowPreexistingFS;
import org.apache.uima.cas.impl.XmiSerializationSharedData;
@@ -884,23 +885,26 @@
{
// Save reply message in the cache
cachedRequest.setMessage(message);
- // Signal a thread that we received a reply
- if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
- {
- ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
- // Unblock the sending thread so that it can complete processing
- // of the reply. The message has been stored in the cache and
- // when the thread wakes up due to notification below, it will
- // retrieve the reply and process it.
- synchronized( threadMonitor.getMonitor() )
- {
- threadMonitor.setWasSignaled();
- cachedRequest.setReceivedProcessCasReply();
- threadMonitor.getMonitor().notifyAll();
- }
- }
+ wakeUpSendThread(cachedRequest);
}
+ private void wakeUpSendThread(ClientRequest cachedRequest) throws Exception {
+ if ( threadMonitorMap.containsKey(cachedRequest.getThreadId()))
+ {
+ ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest.getThreadId());
+ // Unblock the sending thread so that it can complete processing
+ // of the reply. The message has been stored in the cache and
+ // when the thread wakes up due to notification below, it will
+ // retrieve the reply and process it.
+ synchronized( threadMonitor.getMonitor() )
+ {
+ threadMonitor.setWasSignaled();
+ cachedRequest.setReceivedProcessCasReply();
+ threadMonitor.getMonitor().notifyAll();
+ }
+ }
+
+ }
/**
* Handles response to Process CAS request. If the message originated in a service that is running in a separate jvm (remote),
@@ -1080,10 +1084,15 @@
clientSideJmxStats.incrementProcessErrorCount();
}
Exception exception = retrieveExceptionFromMessage(message);
-
-
- exception.printStackTrace();
-
+ if ( exception != null && cachedRequest != null ) {
+ cachedRequest.setException(exception);
+ if ( exception instanceof AnalysisEngineProcessException ||
+ ( exception.getCause() != null &&
+ exception.getCause() instanceof AnalysisEngineProcessException) ) {
+ // Indicate that this is a process exception.
+ cachedRequest.setProcessException();
+ }
+ }
receivedCpcReply = true; // change state as if the CPC reply came in. This is done to prevent a hang on CPC request
synchronized(endOfCollectionMonitor)
{
@@ -1128,12 +1137,19 @@
throw e;
}
finally {
- // Dont release the CAS if the application uses synchronous API
- if ( cachedRequest != null &&
- !cachedRequest.isSynchronousInvocation() &&
- cachedRequest.getCAS() != null )
- {
- cachedRequest.getCAS().release();
+
+ if ( cachedRequest != null ) {
+ if ( cachedRequest.isSynchronousInvocation() && cachedRequest.isProcessException() ) {
+ // Wake up the send thread that is blocking waiting for a reply. When the thread
+ // receives the signal, it checks if the reply contains an exception and will
+ // not return control back to the client
+ wakeUpSendThread(cachedRequest);
+ }
+ // Dont release the CAS if the application uses synchronous API
+ if ( !cachedRequest.isSynchronousInvocation() && cachedRequest.getCAS() != null )
+ {
+ cachedRequest.getCAS().release();
+ }
}
removeFromCache(casReferenceId);
serviceDelegate.removeCasFromOutstandingList(casReferenceId);
@@ -1499,7 +1515,8 @@
// If there service is in the ok state and the CAS is in the
// list of CASes pending dispatch, remove the CAS from the list
// and send it to the service.
- if (cachedRequest.isTimeoutException()) {
+ if (cachedRequest.isTimeoutException() ||
+ cachedRequest.isProcessException() ) {
// Handled below
break;
}
@@ -1515,11 +1532,16 @@
if ( abort ) {
throw new ResourceProcessException(new RuntimeException("Uima AS Client API Stopping"));
}
+ // check if timeout exception
+ if (cachedRequest.isTimeoutException()) {
+ throw new ResourceProcessException(new UimaASProcessCasTimeout());
+ }
+ // If a reply contains process exception, throw an exception and let the
+ // listener decide what happens next
+ if (cachedRequest.isProcessException()) {
+ throw new ResourceProcessException(cachedRequest.getException());
+ }
try {
- // check if timeout exception
- if (cachedRequest.isTimeoutException()) {
- throw new ResourceProcessException(new UimaASProcessCasTimeout());
- }
// Process reply in the send thread
Message message = cachedRequest.getMessage();
deserializeAndCompleteProcessingReply(casReferenceId, message, cachedRequest, pt, false);
@@ -1704,29 +1726,29 @@
private BaseUIMAAsynchronousEngineCommon_impl uimaEEEngine = null;
- private boolean isSerializedCAS;
+ private volatile boolean isSerializedCAS;
private String serializedCAS;
private CAS cas;
- private boolean isMetaRequest = false;
+ private volatile boolean isMetaRequest = false;
- private boolean isCPCRequest = false;
+ private volatile boolean isCPCRequest = false;
- private boolean isRemote = true;
+ private volatile boolean isRemote = true;
private String endpoint;
- private boolean receivedProcessCasReply = false;
+ private volatile boolean receivedProcessCasReply = false;
private long threadId=-1;
private Message message;
- private boolean synchronousInvocation;
+ private volatile boolean synchronousInvocation;
- private boolean timeoutException;
+ private volatile boolean timeoutException;
private long casDepartureTime;
@@ -1748,8 +1770,24 @@
private volatile boolean isBinaryCas = false;
-
- public long getMetaTimeoutErrorCount() {
+ private Exception exception;
+
+ private volatile boolean processException;
+
+ public boolean isProcessException() {
+ return processException;
+ }
+ public void setProcessException() {
+ this.processException = true;
+ }
+ public Exception getException() {
+ return exception;
+ }
+ public void setException(Exception exception) {
+ this.exception = exception;
+ }
+
+ public long getMetaTimeoutErrorCount() {
return metaTimeoutErrorCount;
}
public void setMetaTimeoutErrorCount(long timeoutErrorCount) {