You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2016/03/16 21:52:55 UTC
svn commit: r1735296 -
/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Author: cwiklik
Date: Wed Mar 16 20:52:55 2016
New Revision: 1735296
URL: http://svn.apache.org/viewvc?rev=1735296&view=rev
Log:
UIMA-4857 fixed race condition causing NPE
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1735296&r1=1735295&r2=1735296&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Wed Mar 16 20:52:55 2016
@@ -678,6 +678,20 @@ public class JmsEndpointConnection_impl
new Object[] { destinationName });
}
logMessageSize(aMessage, msgSize, destinationName);
+ // If in ParallelStep its possible to receive a reply from one of the delegates in parallel
+ // step *before* a CAS is dispatched to all of the delegates. This can cause a problem
+ // as replies are merged which causes the CAS to be in an inconsistent state.
+ // The following code calls dispatchCasToParallelDelegate() which count down
+ // a java latch. The same latch is used when receiving replies. If the latch is non zero
+ // the code blocks a thread from performing deserialization.
+ if ( msgType == AsynchAEMessage.Request && command == AsynchAEMessage.Process ) {
+ String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
+ CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
+ if ( casStateEntry.getNumberOfParallelDelegates() > 0) {
+ casStateEntry.dispatchedCasToParallelDelegate();
+ }
+ }
+
synchronized (producer) {
producer.send(aMessage);
}
@@ -694,20 +708,6 @@ public class JmsEndpointConnection_impl
// record the time when this dispatches sent a message. This time will be used
// to find inactive sessions.
lastDispatchTimestamp.set(System.currentTimeMillis());
-
- // If in ParallelStep its possible to receive a reply from one of the delegates in parallel
- // step *before* a CAS is dispatched to all of the delegates. This can cause a problem
- // as replies are merged which causes the CAS to be in an inconsistent state.
- // The following code calls dispatchCasToParallelDelegate() which count down
- // a java latch. The same latch is used when receiving replies. If the latch is non zero
- // the code blocks a thread from performing deserialization.
- if ( msgType == AsynchAEMessage.Request && command == AsynchAEMessage.Process ) {
- String casReferenceId = aMessage.getStringProperty(AsynchAEMessage.CasReference);
- CasStateEntry casStateEntry = controller.getLocalCache().lookupEntry(casReferenceId);
- if ( casStateEntry.getNumberOfParallelDelegates() > 0) {
- casStateEntry.dispatchedCasToParallelDelegate();
- }
- }
// Succeeded sending the CAS
return true;
} catch (Exception e) {