You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2010/08/09 17:41:12 UTC
svn commit: r983684 -
/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Author: cwiklik
Date: Mon Aug 9 15:41:12 2010
New Revision: 983684
URL: http://svn.apache.org/viewvc?rev=983684&view=rev
Log:
UIMA-1855 Fixes inconsistent synchronization reported by Findbugs
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=983684&r1=983683&r2=983684&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Mon Aug 9 15:41:12 2010
@@ -161,151 +161,154 @@ public class JmsEndpointConnection_impl
openChannel(getServerUri(), componentName, endpoint, controller);
}
- private synchronized void openChannel(String brokerUri, String aComponentName,
+ private void openChannel(String brokerUri, String aComponentName,
String anEndpointName, AnalysisEngineController aController) throws AsynchAEException,
ServiceShutdownException {
- try {
-
- // If replying to http request, reply to a queue managed by this service broker using tcp
- // protocol
- if (isReplyEndpoint && brokerUri.startsWith("http")) {
- brokerUri = ((JmsOutputChannel) aController.getOutputChannel()).getServerURI();
-
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(
- Level.FINE,
- CLASS_NAME.getName(),
- "open",
- JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_override_connection_to_endpoint__FINE",
- new Object[] { aComponentName, getEndpoint(),
- ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() });
- }
- }
+ synchronized (recoveryMux) {
+ try {
- if (!isOpen()) {
- Connection conn = null;
- // Check connection status and create a new one (if necessary) as an atomic operation
- try {
- connectionSemaphore.acquire();
- if (connectionClosedOrFailed(brokerDestinations)) {
- // Create one shared connection per unique brokerURL.
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_activemq_open__FINE",
- new Object[] { aController.getComponentName(), anEndpointName, brokerUri });
- }
- if ( brokerDestinations.getConnection() != null ) {
- try {
- // Close the connection to avoid leaks in the broker
- brokerDestinations.getConnection().close();
- } catch( Exception e) {
- // Ignore exceptions on a close of a bad connection
- }
- }
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
- // Create shared jms connection to a broker
- conn = factory.createConnection();
- factory.setDispatchAsync(true);
- factory.setUseAsyncSend(true);
- factory.setCopyMessageOnSend(false);
- // Cache the connection. There should only be one connection in the jvm
- // per unique broker url.
- brokerDestinations.setConnection(conn);
- // Close and invalidate all sessions previously created from the old connection
- Iterator<Map.Entry<Object, JmsEndpointConnection_impl>> it = brokerDestinations.endpointMap
- .entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<Object, JmsEndpointConnection_impl> entry = it.next();
- if (entry.getValue().producerSession != null) {
- // Close session
- entry.getValue().producerSession.close();
- // Since we created a new connection invalidate session that
- // have been created with the old connection
- entry.getValue().producerSession = null;
- }
- }
- }
- } catch( Exception exc) {
- throw exc; // rethrow
- } finally {
- connectionSemaphore.release();
- }
-
- connectionCreationTimestamp = System.nanoTime();
- failed = false;
- }
- Connection conn = brokerDestinations.getConnection();
- if (failed) {
- // Unable to create a connection
- return;
- }
-
- producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
- if ((delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint)
- && delegateEndpoint.getDestination() != null) {
- producer = producerSession.createProducer(null);
- if (aController != null) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_temp_conn_starting__FINE",
- new Object[] { aComponentName, anEndpointName, brokerUri });
- }
- }
- } else {
- destination = producerSession.createQueue(getEndpoint());
- producer = producerSession.createProducer(destination);
- if (controller != null) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_conn_starting__FINE",
- new Object[] { aComponentName, anEndpointName, brokerUri });
- }
- }
- }
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- // Since the connection is shared, start it only once
- if (!((ActiveMQConnection) brokerDestinations.getConnection()).isStarted()) {
- brokerDestinations.getConnection().start();
- }
- if (controller != null) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_conn_started__FINE", new Object[] { endpoint, brokerUri });
- if (controller.getInputChannel() != null) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_connection_open_to_endpoint__FINE",
- new Object[] { aComponentName, getEndpoint(), brokerUri });
- }
- }
- }
- failed = false;
- } catch (Exception e) {
- boolean rethrow = true;
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAEE_service_exception_WARNING", controller.getComponentName());
-
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
- "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_exception__WARNING", e);
- }
-
- if (e instanceof JMSException) {
- rethrow = handleJmsException((JMSException) e);
-
- }
- if (rethrow) {
- throw new AsynchAEException(e);
- }
- }
+ // If replying to http request, reply to a queue managed by this service broker using tcp
+ // protocol
+ if (isReplyEndpoint && brokerUri.startsWith("http")) {
+ brokerUri = ((JmsOutputChannel) aController.getOutputChannel()).getServerURI();
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "open",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_override_connection_to_endpoint__FINE",
+ new Object[] { aComponentName, getEndpoint(),
+ ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() });
+ }
+ }
+
+ if (!isOpen()) {
+ Connection conn = null;
+ // Check connection status and create a new one (if necessary) as an atomic operation
+ try {
+ connectionSemaphore.acquire();
+ if (connectionClosedOrFailed(brokerDestinations)) {
+ // Create one shared connection per unique brokerURL.
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_activemq_open__FINE",
+ new Object[] { aController.getComponentName(), anEndpointName, brokerUri });
+ }
+ if ( brokerDestinations.getConnection() != null ) {
+ try {
+ // Close the connection to avoid leaks in the broker
+ brokerDestinations.getConnection().close();
+ } catch( Exception e) {
+ // Ignore exceptions on a close of a bad connection
+ }
+ }
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
+ // Create shared jms connection to a broker
+ conn = factory.createConnection();
+ factory.setDispatchAsync(true);
+ factory.setUseAsyncSend(true);
+ factory.setCopyMessageOnSend(false);
+ // Cache the connection. There should only be one connection in the jvm
+ // per unique broker url.
+ brokerDestinations.setConnection(conn);
+ // Close and invalidate all sessions previously created from the old connection
+ Iterator<Map.Entry<Object, JmsEndpointConnection_impl>> it = brokerDestinations.endpointMap
+ .entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Object, JmsEndpointConnection_impl> entry = it.next();
+ if (entry.getValue().producerSession != null) {
+ // Close session
+ entry.getValue().producerSession.close();
+ // Since we created a new connection invalidate session that
+ // have been created with the old connection
+ entry.getValue().producerSession = null;
+ }
+ }
+ }
+ } catch( Exception exc) {
+ throw exc; // rethrow
+ } finally {
+ connectionSemaphore.release();
+ }
+
+ connectionCreationTimestamp = System.nanoTime();
+ failed = false;
+ }
+ Connection conn = brokerDestinations.getConnection();
+ if (failed) {
+ // Unable to create a connection
+ return;
+ }
+
+ producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ if ((delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint)
+ && delegateEndpoint.getDestination() != null) {
+ producer = producerSession.createProducer(null);
+ if (aController != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_temp_conn_starting__FINE",
+ new Object[] { aComponentName, anEndpointName, brokerUri });
+ }
+ }
+ } else {
+ destination = producerSession.createQueue(getEndpoint());
+ producer = producerSession.createProducer(destination);
+ if (controller != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_conn_starting__FINE",
+ new Object[] { aComponentName, anEndpointName, brokerUri });
+ }
+ }
+ }
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ // Since the connection is shared, start it only once
+ if (!((ActiveMQConnection) brokerDestinations.getConnection()).isStarted()) {
+ brokerDestinations.getConnection().start();
+ }
+ if (controller != null) {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_conn_started__FINE", new Object[] { endpoint, brokerUri });
+ if (controller.getInputChannel() != null) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+ "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_connection_open_to_endpoint__FINE",
+ new Object[] { aComponentName, getEndpoint(), brokerUri });
+ }
+ }
+ }
+ failed = false;
+ } catch (Exception e) {
+ boolean rethrow = true;
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAEE_service_exception_WARNING", controller.getComponentName());
+
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+ "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_exception__WARNING", e);
+ }
+
+ if (e instanceof JMSException) {
+ rethrow = handleJmsException((JMSException) e);
+
+ }
+ if (rethrow) {
+ throw new AsynchAEException(e);
+ }
+ }
+
+ }
}
public synchronized void open() throws AsynchAEException, ServiceShutdownException {
@@ -334,26 +337,28 @@ public class JmsEndpointConnection_impl
}
}
- public synchronized void close() throws Exception {
- if (producer != null) {
- try {
- producer.close();
- } catch (Exception e) {
- // Ignore we are shutting down
- }
- }
- if (producerSession != null) {
- try {
- producerSession.close();
- } catch (Exception e) {
- // Ignore we are shutting down
- }
- producerSession = null;
- }
- if (destination != null) {
- destination = null;
- }
- }
+ public void close() throws Exception {
+ synchronized (recoveryMux) {
+ if (producer != null) {
+ try {
+ producer.close();
+ } catch (Exception e) {
+ // Ignore we are shutting down
+ }
+ }
+ if (producerSession != null) {
+ try {
+ producerSession.close();
+ } catch (Exception e) {
+ // Ignore we are shutting down
+ }
+ producerSession = null;
+ }
+ if (destination != null) {
+ destination = null;
+ }
+ }
+ }
protected String getEndpoint() {
return endpoint;