You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2010/08/09 17:41:12 UTC

svn commit: r983684 - /uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java

Author: cwiklik
Date: Mon Aug  9 15:41:12 2010
New Revision: 983684

URL: http://svn.apache.org/viewvc?rev=983684&view=rev
Log:
UIMA-1855 Fixes inconsistent synchronization reported by Findbugs

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=983684&r1=983683&r2=983684&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Mon Aug  9 15:41:12 2010
@@ -161,151 +161,154 @@ public class JmsEndpointConnection_impl 
     openChannel(getServerUri(), componentName, endpoint, controller);
   }
 
-  private synchronized void openChannel(String brokerUri, String aComponentName,
+  private void openChannel(String brokerUri, String aComponentName,
           String anEndpointName, AnalysisEngineController aController) throws AsynchAEException,
           ServiceShutdownException {
-    try {
-
-      // If replying to http request, reply to a queue managed by this service broker using tcp
-      // protocol
-      if (isReplyEndpoint && brokerUri.startsWith("http")) {
-        brokerUri = ((JmsOutputChannel) aController.getOutputChannel()).getServerURI();
-
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(
-                  Level.FINE,
-                  CLASS_NAME.getName(),
-                  "open",
-                  JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_override_connection_to_endpoint__FINE",
-                  new Object[] { aComponentName, getEndpoint(),
-                    ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() });
-        }
-      }
+	  synchronized (recoveryMux) {
+		    try {
 
-      if (!isOpen()) {
-        Connection conn = null;
-        //  Check connection status and create a new one (if necessary) as an atomic operation
-        try {
-          connectionSemaphore.acquire();
-          if (connectionClosedOrFailed(brokerDestinations)) {
-            // Create one shared connection per unique brokerURL.
-            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                      "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                      "UIMAJMS_activemq_open__FINE",
-                      new Object[] { aController.getComponentName(), anEndpointName, brokerUri });
-            }
-            if ( brokerDestinations.getConnection() != null ) {
-              try {
-                //  Close the connection to avoid leaks in the broker
-                brokerDestinations.getConnection().close();
-              } catch( Exception e) {
-                //  Ignore exceptions on a close of a bad connection
-              }
-            }
-            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
-            //  Create shared jms connection to a broker
-            conn = factory.createConnection();
-            factory.setDispatchAsync(true);
-            factory.setUseAsyncSend(true);
-            factory.setCopyMessageOnSend(false);
-            //  Cache the connection. There should only be one connection in the jvm
-            //  per unique broker url. 
-            brokerDestinations.setConnection(conn);
-            // Close and invalidate all sessions previously created from the old connection
-            Iterator<Map.Entry<Object, JmsEndpointConnection_impl>> it = brokerDestinations.endpointMap
-                    .entrySet().iterator();
-            while (it.hasNext()) {
-              Map.Entry<Object, JmsEndpointConnection_impl> entry = it.next();
-              if (entry.getValue().producerSession != null) {
-                // Close session
-                entry.getValue().producerSession.close();
-                // Since we created a new connection invalidate session that
-                // have been created with the old connection
-                entry.getValue().producerSession = null;
-              }
-            }
-          }
-        } catch( Exception exc) {
-          throw exc; // rethrow
-        } finally {
-          connectionSemaphore.release();
-        }
-        
-        connectionCreationTimestamp = System.nanoTime();
-        failed = false;
-      }
-      Connection conn = brokerDestinations.getConnection();
-      if (failed) {
-        // Unable to create a connection
-        return;
-      }
-
-      producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
-      if ((delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint)
-              && delegateEndpoint.getDestination() != null) {
-        producer = producerSession.createProducer(null);
-        if (aController != null) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_temp_conn_starting__FINE",
-                    new Object[] { aComponentName, anEndpointName, brokerUri });
-          }
-        }
-      } else {
-        destination = producerSession.createQueue(getEndpoint());
-        producer = producerSession.createProducer(destination);
-        if (controller != null) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_conn_starting__FINE",
-                    new Object[] { aComponentName, anEndpointName, brokerUri });
-          }
-        }
-      }
-      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-      // Since the connection is shared, start it only once
-      if (!((ActiveMQConnection) brokerDestinations.getConnection()).isStarted()) {
-        brokerDestinations.getConnection().start();
-      }
-      if (controller != null) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                  "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_conn_started__FINE", new Object[] { endpoint, brokerUri });
-          if (controller.getInputChannel() != null) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_connection_open_to_endpoint__FINE",
-                    new Object[] { aComponentName, getEndpoint(), brokerUri });
-          }
-        }
-      }
-      failed = false;
-    } catch (Exception e) {
-      boolean rethrow = true;
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_service_exception_WARNING", controller.getComponentName());
-        
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
-                "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_exception__WARNING", e);
-      }
-
-      if (e instanceof JMSException) {
-        rethrow = handleJmsException((JMSException) e);
-
-      }
-      if (rethrow) {
-        throw new AsynchAEException(e);
-      }
-    }
+		        // If replying to http request, reply to a queue managed by this service broker using tcp
+		        // protocol
+		        if (isReplyEndpoint && brokerUri.startsWith("http")) {
+		          brokerUri = ((JmsOutputChannel) aController.getOutputChannel()).getServerURI();
+
+		          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+		            UIMAFramework.getLogger(CLASS_NAME).logrb(
+		                    Level.FINE,
+		                    CLASS_NAME.getName(),
+		                    "open",
+		                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+		                    "UIMAJMS_override_connection_to_endpoint__FINE",
+		                    new Object[] { aComponentName, getEndpoint(),
+		                      ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() });
+		          }
+		        }
+
+		        if (!isOpen()) {
+		          Connection conn = null;
+		          //  Check connection status and create a new one (if necessary) as an atomic operation
+		          try {
+		            connectionSemaphore.acquire();
+		            if (connectionClosedOrFailed(brokerDestinations)) {
+		              // Create one shared connection per unique brokerURL.
+		              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+		                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+		                        "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+		                        "UIMAJMS_activemq_open__FINE",
+		                        new Object[] { aController.getComponentName(), anEndpointName, brokerUri });
+		              }
+		              if ( brokerDestinations.getConnection() != null ) {
+		                try {
+		                  //  Close the connection to avoid leaks in the broker
+		                  brokerDestinations.getConnection().close();
+		                } catch( Exception e) {
+		                  //  Ignore exceptions on a close of a bad connection
+		                }
+		              }
+		              ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
+		              //  Create shared jms connection to a broker
+		              conn = factory.createConnection();
+		              factory.setDispatchAsync(true);
+		              factory.setUseAsyncSend(true);
+		              factory.setCopyMessageOnSend(false);
+		              //  Cache the connection. There should only be one connection in the jvm
+		              //  per unique broker url. 
+		              brokerDestinations.setConnection(conn);
+		              // Close and invalidate all sessions previously created from the old connection
+		              Iterator<Map.Entry<Object, JmsEndpointConnection_impl>> it = brokerDestinations.endpointMap
+		                      .entrySet().iterator();
+		              while (it.hasNext()) {
+		                Map.Entry<Object, JmsEndpointConnection_impl> entry = it.next();
+		                if (entry.getValue().producerSession != null) {
+		                  // Close session
+		                  entry.getValue().producerSession.close();
+		                  // Since we created a new connection invalidate session that
+		                  // have been created with the old connection
+		                  entry.getValue().producerSession = null;
+		                }
+		              }
+		            }
+		          } catch( Exception exc) {
+		            throw exc; // rethrow
+		          } finally {
+		            connectionSemaphore.release();
+		          }
+		          
+		          connectionCreationTimestamp = System.nanoTime();
+		          failed = false;
+		        }
+		        Connection conn = brokerDestinations.getConnection();
+		        if (failed) {
+		          // Unable to create a connection
+		          return;
+		        }
+
+		        producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+		        if ((delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint)
+		                && delegateEndpoint.getDestination() != null) {
+		          producer = producerSession.createProducer(null);
+		          if (aController != null) {
+		            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+		              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+		                      "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+		                      "UIMAJMS_temp_conn_starting__FINE",
+		                      new Object[] { aComponentName, anEndpointName, brokerUri });
+		            }
+		          }
+		        } else {
+		          destination = producerSession.createQueue(getEndpoint());
+		          producer = producerSession.createProducer(destination);
+		          if (controller != null) {
+		            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+		              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+		                      "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+		                      "UIMAJMS_conn_starting__FINE",
+		                      new Object[] { aComponentName, anEndpointName, brokerUri });
+		            }
+		          }
+		        }
+		        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+		        // Since the connection is shared, start it only once
+		        if (!((ActiveMQConnection) brokerDestinations.getConnection()).isStarted()) {
+		          brokerDestinations.getConnection().start();
+		        }
+		        if (controller != null) {
+		          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+		            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+		                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+		                    "UIMAJMS_conn_started__FINE", new Object[] { endpoint, brokerUri });
+		            if (controller.getInputChannel() != null) {
+		              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
+		                      "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+		                      "UIMAJMS_connection_open_to_endpoint__FINE",
+		                      new Object[] { aComponentName, getEndpoint(), brokerUri });
+		            }
+		          }
+		        }
+		        failed = false;
+		      } catch (Exception e) {
+		        boolean rethrow = true;
+		        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+		          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+		                  "openChannel", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+		                  "UIMAEE_service_exception_WARNING", controller.getComponentName());
+		          
+		          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
+		                  "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+		                  "UIMAJMS_exception__WARNING", e);
+		        }
+
+		        if (e instanceof JMSException) {
+		          rethrow = handleJmsException((JMSException) e);
+
+		        }
+		        if (rethrow) {
+		          throw new AsynchAEException(e);
+		        }
+		      }
+		  
+	  }
   }
 
   public synchronized void open() throws AsynchAEException, ServiceShutdownException {
@@ -334,26 +337,28 @@ public class JmsEndpointConnection_impl 
     }
   }
 
-  public synchronized void close() throws Exception {
-    if (producer != null) {
-      try {
-        producer.close();
-      } catch (Exception e) {
-        // Ignore we are shutting down
-      }
-    }
-    if (producerSession != null) {
-      try {
-        producerSession.close();
-      } catch (Exception e) {
-        // Ignore we are shutting down
-      }
-      producerSession = null;
-    }
-    if (destination != null) {
-      destination = null;
-    }
-  }
+	public void close() throws Exception {
+		synchronized (recoveryMux) {
+			if (producer != null) {
+				try {
+					producer.close();
+				} catch (Exception e) {
+					// Ignore we are shutting down
+				}
+			}
+			if (producerSession != null) {
+				try {
+					producerSession.close();
+				} catch (Exception e) {
+					// Ignore we are shutting down
+				}
+				producerSession = null;
+			}
+			if (destination != null) {
+				destination = null;
+			}
+		}
+	}
 
   protected String getEndpoint() {
     return endpoint;