You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/24 18:46:37 UTC

svn commit: r818542 - /incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java

Author: cwiklik
Date: Thu Sep 24 16:46:36 2009
New Revision: 818542

URL: http://svn.apache.org/viewvc?rev=818542&view=rev
Log:
UIMA-1433 Improved detection of a bad jms connection.

Modified:
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java

Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=818542&r1=818541&r2=818542&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Thu Sep 24 16:46:36 2009
@@ -19,6 +19,10 @@
 
 package org.apache.uima.adapter.jms.activemq;
 
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -52,12 +56,13 @@
 import org.apache.uima.util.Level;
 import org.springframework.util.Assert;
 
+
 public class JmsEndpointConnection_impl implements ConsumerListener {
   private static final Class CLASS_NAME = JmsEndpointConnection_impl.class;
 
   private Destination destination;
 
-  private Session producerSession;
+  protected Session producerSession;
 
   private MessageProducer producer;
 
@@ -89,6 +94,10 @@
 
   private final String componentName;
 
+  // Create semaphore to control creation of a JMS connection.
+  // This semaphore is shared by all instances of this class
+  private static Semaphore connectionSemaphore = new Semaphore(1);
+  
   public JmsEndpointConnection_impl(BrokerConnectionEntry aBrokerDestinationMap,
           Endpoint anEndpoint, AnalysisEngineController aController) {
     brokerDestinations = aBrokerDestinationMap;
@@ -117,13 +126,20 @@
   }
 
   public boolean isOpen() {
-    if (failed || producerSession == null || brokerDestinations.getConnection() == null
+    if (failed || producerSession == null || connectionClosedOrFailed()) {
+      return false;
+    }
+    return ((ActiveMQSession) producerSession).isRunning();
+  }
+
+  private boolean connectionClosedOrFailed() {
+    if (brokerDestinations.getConnection() == null
             || ((ActiveMQConnection) brokerDestinations.getConnection()).isClosed()
             || ((ActiveMQConnection) brokerDestinations.getConnection()).isClosing()
             || ((ActiveMQConnection) brokerDestinations.getConnection()).isTransportFailed()) {
-      return false;
+      return true;
     }
-    return ((ActiveMQSession) producerSession).isRunning();
+    return false;
   }
 
   private void openChannel() throws AsynchAEException, ServiceShutdownException {
@@ -134,11 +150,11 @@
           String anEndpointName, AnalysisEngineController aController) throws AsynchAEException,
           ServiceShutdownException {
     try {
-      
+
       // If replying to http request, reply to a queue managed by this service broker using tcp
       // protocol
-      if (isReplyEndpoint && brokerUri.startsWith("http") ) { 
-        brokerUri = ((JmsOutputChannel)aController.getOutputChannel()).getServerURI();
+      if (isReplyEndpoint && brokerUri.startsWith("http")) {
+        brokerUri = ((JmsOutputChannel) aController.getOutputChannel()).getServerURI();
 
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(
@@ -154,19 +170,54 @@
 
       if (!isOpen()) {
         Connection conn = null;
-        if (brokerDestinations.getConnection() == null) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
-                    "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_activemq_open__FINE", new Object[] { anEndpointName, brokerUri });
+        //  Check connection status and create a new one (if necessary) as an atomic operation
+        try {
+          connectionSemaphore.acquire();
+          if (connectionClosedOrFailed()) {
+            // Create one shared connection per unique brokerURL.
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+                      "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                      "UIMAJMS_activemq_open__INFO",
+                      new Object[] { aController.getComponentName(), anEndpointName, brokerUri });
+            }
+            if ( brokerDestinations.getConnection() != null ) {
+              try {
+                //  Close the connection to avoid leaks in the broker
+                brokerDestinations.getConnection().close();
+              } catch( Exception e) {
+                //  Ignore exceptions on a close of a bad connection
+              }
+            }
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
+            //  Create shared jms connection to a broker
+            conn = factory.createConnection();
+            factory.setDispatchAsync(true);
+            factory.setUseAsyncSend(true);
+            factory.setCopyMessageOnSend(false);
+            //  Cache the connection. There should only be one connection in the jvm
+            //  per unique broker url. 
+            brokerDestinations.setConnection(conn);
+            // Close and invalidate all sessions previously created from the old connection
+            Iterator<Map.Entry<Object, JmsEndpointConnection_impl>> it = brokerDestinations.endpointMap
+                    .entrySet().iterator();
+            while (it.hasNext()) {
+              Map.Entry<Object, JmsEndpointConnection_impl> entry = it.next();
+              if (entry.getValue().producerSession != null) {
+                // Close session
+                entry.getValue().producerSession.close();
+                // Since we created a new connection invalidate session that
+                // have been created with the old connection
+                entry.getValue().producerSession = null;
+              }
+            }
           }
-          ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
-          conn = factory.createConnection();
-          factory.setDispatchAsync(true);
-          factory.setUseAsyncSend(true);
-          factory.setCopyMessageOnSend(false);
-          brokerDestinations.setConnection(conn);
+        } catch( Exception exc) {
+          throw exc; // rethrow
+        } finally {
+          connectionSemaphore.release();
         }
+        
         connectionCreationTimestamp = System.nanoTime();
         failed = false;
       }