You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2010/12/22 22:19:25 UTC
svn commit: r1052071 -
/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Author: cwiklik
Date: Wed Dec 22 21:19:24 2010
New Revision: 1052071
URL: http://svn.apache.org/viewvc?rev=1052071&view=rev
Log:
UIMA-1979 Synchronized access to JMS Session instance. Renamed the lock
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1052071&r1=1052070&r2=1052071&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Wed Dec 22 21:19:24 2010
@@ -28,7 +28,6 @@ import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
@@ -38,7 +37,6 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ConnectionFailedException;
import org.apache.activemq.advisory.ConsumerEvent;
@@ -61,7 +59,6 @@ import org.apache.uima.aae.message.Async
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.adapter.jms.activemq.JmsOutputChannel.BrokerConnectionEntry;
import org.apache.uima.util.Level;
-import org.springframework.util.Assert;
public class JmsEndpointConnection_impl implements ConsumerListener {
@@ -97,7 +94,7 @@ public class JmsEndpointConnection_impl
private volatile boolean failed = false;
- private Object recoveryMux = new Object();
+ private Object lock = new Object();
private final String componentName;
@@ -140,10 +137,12 @@ public class JmsEndpointConnection_impl
}
public boolean isOpen() {
- if (failed || producerSession == null || connectionClosedOrFailed(brokerDestinations)) {
- return false;
- }
- return ((ActiveMQSession) producerSession).isRunning();
+ synchronized (lock) {
+ if (failed || producerSession == null || connectionClosedOrFailed(brokerDestinations)) {
+ return false;
+ }
+ return ((ActiveMQSession) producerSession).isRunning();
+ }
}
protected static boolean connectionClosedOrFailed(BrokerConnectionEntry aBrokerDestinationMap) {
@@ -164,7 +163,7 @@ public class JmsEndpointConnection_impl
private void openChannel(String brokerUri, String aComponentName,
String anEndpointName, AnalysisEngineController aController) throws AsynchAEException,
ServiceShutdownException {
- synchronized (recoveryMux) {
+ synchronized (lock) {
try {
// If replying to http request, reply to a queue managed by this service broker using tcp
@@ -338,7 +337,7 @@ public class JmsEndpointConnection_impl
}
public void close() throws Exception {
- synchronized (recoveryMux) {
+ synchronized (lock) {
if (producer != null) {
try {
producer.close();
@@ -380,81 +379,87 @@ public class JmsEndpointConnection_impl
}
public TextMessage produceTextMessage(String aTextMessage) throws AsynchAEException {
- if ( producerSession == null ) {
- throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
- }
- try {
- if (aTextMessage == null) {
- return producerSession.createTextMessage();
- } else {
- return producerSession.createTextMessage(aTextMessage);
- }
- } catch (javax.jms.IllegalStateException e) {
- try {
- open();
- } catch (ServiceShutdownException ex) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "produceTextMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_service_exception_WARNING", controller.getComponentName());
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "produceTextMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", ex);
- }
- } catch (AsynchAEException ex) {
- throw ex;
- }
- } catch (Exception e) {
- throw new AsynchAEException(e);
- }
- throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
+ synchronized( lock ) {
+ if ( producerSession == null ) {
+ throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
+ }
+ try {
+ if (aTextMessage == null) {
+ return producerSession.createTextMessage();
+ } else {
+ return producerSession.createTextMessage(aTextMessage);
+ }
+ } catch (javax.jms.IllegalStateException e) {
+ try {
+ open();
+ } catch (ServiceShutdownException ex) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "produceTextMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_exception_WARNING", controller.getComponentName());
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "produceTextMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", ex);
+ }
+ } catch (AsynchAEException ex) {
+ throw ex;
+ }
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+ throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
+ }
}
public BytesMessage produceByteMessage() throws AsynchAEException {
- if ( producerSession == null ) {
- throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
- }
- boolean done = false;
- int retryCount = 4;
- while (retryCount > 0) {
- try {
- retryCount--;
- return producerSession.createBytesMessage();
- } catch (javax.jms.IllegalStateException e) {
- try {
- open();
- } catch (ServiceShutdownException ex) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "produceByteMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_service_exception_WARNING", controller.getComponentName());
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "produceByteMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", ex);
+ synchronized( lock ) {
+ if ( producerSession == null ) {
+ throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
}
- }
+ boolean done = false;
+ int retryCount = 4;
+ while (retryCount > 0) {
+ try {
+ retryCount--;
+ return producerSession.createBytesMessage();
+ } catch (javax.jms.IllegalStateException e) {
+ try {
+ open();
+ } catch (ServiceShutdownException ex) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "produceByteMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_exception_WARNING", controller.getComponentName());
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "produceByteMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", ex);
+ }
+ }
- } catch (Exception e) {
- throw new AsynchAEException(e);
- }
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
+ }
+ throw new AsynchAEException(
+ new InvalidMessageException("Unable to produce BytesMessage Object"));
}
- throw new AsynchAEException(
- new InvalidMessageException("Unable to produce BytesMessage Object"));
}
public ObjectMessage produceObjectMessage() throws AsynchAEException {
- if ( producerSession == null ) {
- throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
- }
- try {
- if (!((ActiveMQSession) producerSession).isRunning()) {
- open();
- }
- return producerSession.createObjectMessage();
- } catch (Exception e) {
- throw new AsynchAEException(e);
+ synchronized( lock ) {
+ if ( producerSession == null ) {
+ throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
+ }
+ try {
+ if (!((ActiveMQSession) producerSession).isRunning()) {
+ open();
+ }
+ return producerSession.createObjectMessage();
+ } catch (Exception e) {
+ throw new AsynchAEException(e);
+ }
}
}
@@ -471,7 +476,7 @@ public class JmsEndpointConnection_impl
// endpoint for the delegate is marked as FAILED. This will be the case if the listener
// on the reply queue for the endpoint has failed.
String endpointName = delegateEndpoint.getEndpoint();
- synchronized (recoveryMux) {
+ synchronized (lock) {
if (controller instanceof AggregateAnalysisEngineController) {
// Using the queue name lookup the delegate key
String key = ((AggregateAnalysisEngineController) controller)