You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/10/21 21:41:19 UTC

svn commit: r828164 - in /incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq: JmsEndpointConnection_impl.java JmsInputChannel.java JmsOutputChannel.java

Author: cwiklik
Date: Wed Oct 21 19:41:18 2009
New Revision: 828164

URL: http://svn.apache.org/viewvc?rev=828164&view=rev
Log:
UIMA-1632 Modified to recover from a 'bounced' broker. The code detects stale connection, invalidates sessions and recreates them. As part of the recovery the code creates a new temp queue, listener and updates endpoint objects before sending a request to a delegate

Modified:
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
    incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=828164&r1=828163&r2=828164&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Wed Oct 21 19:41:18 2009
@@ -126,17 +126,18 @@
   }
 
   public boolean isOpen() {
-    if (failed || producerSession == null || connectionClosedOrFailed()) {
+    if (failed || producerSession == null || connectionClosedOrFailed(brokerDestinations)) {
       return false;
     }
     return ((ActiveMQSession) producerSession).isRunning();
   }
 
-  private boolean connectionClosedOrFailed() {
-    if (brokerDestinations.getConnection() == null
-            || ((ActiveMQConnection) brokerDestinations.getConnection()).isClosed()
-            || ((ActiveMQConnection) brokerDestinations.getConnection()).isClosing()
-            || ((ActiveMQConnection) brokerDestinations.getConnection()).isTransportFailed()) {
+  protected static boolean connectionClosedOrFailed(BrokerConnectionEntry aBrokerDestinationMap) {
+    Connection connection = aBrokerDestinationMap.getConnection();
+    if (connection == null
+            || ((ActiveMQConnection) connection).isClosed()
+            || ((ActiveMQConnection) connection).isClosing()
+            || ((ActiveMQConnection) connection).isTransportFailed()) {
       return true;
     }
     return false;
@@ -173,7 +174,7 @@
         //  Check connection status and create a new one (if necessary) as an atomic operation
         try {
           connectionSemaphore.acquire();
-          if (connectionClosedOrFailed()) {
+          if (connectionClosedOrFailed(brokerDestinations)) {
             // Create one shared connection per unique brokerURL.
             if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
               UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
@@ -349,37 +350,33 @@
   }
 
   public TextMessage produceTextMessage(String aTextMessage) throws AsynchAEException {
-    Assert.notNull(producerSession);
-    boolean done = false;
-    int retryCount = 4;
-    while (retryCount > 0) {
-      try {
-        retryCount--;
-
-        if (aTextMessage == null) {
+    if ( producerSession == null ) {
+      throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
+    }
+    try {
+       if (aTextMessage == null) {
           return producerSession.createTextMessage();
-        } else {
+       } else {
           return producerSession.createTextMessage(aTextMessage);
-        }
-
-      } catch (javax.jms.IllegalStateException e) {
+       }
+     } catch (javax.jms.IllegalStateException e) {
         try {
           open();
         } catch (ServiceShutdownException ex) {
           ex.printStackTrace();
         } catch (AsynchAEException ex) {
-
           throw ex;
         }
       } catch (Exception e) {
         throw new AsynchAEException(e);
-      }
     }
     throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
   }
 
   public BytesMessage produceByteMessage() throws AsynchAEException {
-    Assert.notNull(producerSession);
+    if ( producerSession == null ) {
+      throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
+    }
     boolean done = false;
     int retryCount = 4;
     while (retryCount > 0) {
@@ -402,8 +399,9 @@
   }
 
   public ObjectMessage produceObjectMessage() throws AsynchAEException {
-    Assert.notNull(producerSession);
-
+    if ( producerSession == null ) {
+      throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
+    }
     try {
       if (!((ActiveMQSession) producerSession).isRunning()) {
         open();
@@ -523,6 +521,7 @@
       // Succeeded sending the CAS
       return true;
     } catch (Exception e) {
+      e.printStackTrace();
       // If the controller has been stopped no need to send messages
       if (controller.isStopped()) {
         return true;
@@ -588,7 +587,7 @@
       // Fetch an InputChannel that handles messages for a given delegate
       InputChannel iC = controller.getReplyInputChannel(delegateKey);
       // Create a new Listener, new Temp Queue and associate the listener with the Input Channel
-      iC.createListener(delegateKey);
+      iC.createListener(delegateKey, null);
     }
   }
 

Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=828164&r1=828163&r2=828164&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Wed Oct 21 19:41:18 2009
@@ -859,7 +859,7 @@
     }
   }
 
-  public void createListener(String aDelegateKey) throws Exception {
+  public void createListener(String aDelegateKey, Endpoint endpointToUpdate) throws Exception {
     if (getController() instanceof AggregateAnalysisEngineController) {
       Delegate delegate = ((AggregateAnalysisEngineController) getController())
               .lookupDelegate(aDelegateKey);
@@ -903,13 +903,30 @@
                 aDelegateKey, false);
         // Override the reply destination.
         endpoint.setDestination(newListener.getDestination());
+        if ( endpointToUpdate != null) {
+          endpointToUpdate.setDestination(newListener.getDestination());
+        }
         Object clone = ((Endpoint_impl) endpoint).clone();
         newListener.setTargetEndpoint((Endpoint) clone);
         endpoint.setStatus(Endpoint.OK);
+        System.out
+        .println(".... Listener Started on New Temp Reply Queue ...");
       }
     }
   }
-
+  public boolean isListenerActiveOnDestination(Destination destination ) {
+    List<UimaDefaultMessageListenerContainer> listeners = new ArrayList<UimaDefaultMessageListenerContainer>();
+    for (int i = 0; i < listenerContainerList.size(); i++) {
+      UimaDefaultMessageListenerContainer mListener = (UimaDefaultMessageListenerContainer) listenerContainerList
+              .get(i);
+      if ( mListener.getDestination() != null && 
+           mListener.getDestination() == destination &&
+           mListener.isRunning()) {
+        return true;
+      }
+    }
+    return false;
+  }
   /**
    * Given an endpoint name returns all listeners attached to this endpoint. There can be multiple
    * listeners on an endpoint each with a different selector to receive targeted messages like

Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=828164&r1=828163&r2=828164&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Wed Oct 21 19:41:18 2009
@@ -317,7 +317,69 @@
     }
     return (int) INACTIVITY_TIMEOUT; // default
   }
+  /**
+   * Stop JMS connection and close all sessions associated with this connection
+   * 
+   * @param brokerConnectionEntry
+   */
+  private void invalidateConnectionAndEndpoints(BrokerConnectionEntry brokerConnectionEntry ) {
+    Connection conn = brokerConnectionEntry.getConnection();
+    try {
+       if ( conn != null && ((ActiveMQConnection)conn).isClosed()) {
+         brokerConnectionEntry.getConnection().stop();
+         brokerConnectionEntry.getConnection().close();
+         brokerConnectionEntry.setConnection(null);
+         for (Entry<Object, JmsEndpointConnection_impl> endpoints : brokerConnectionEntry.endpointMap
+                .entrySet()) {
+           endpoints.getValue().close(); // close session and producer
+         }
+       }
+    } catch (Exception e) {
+      // Ignore this for now. Attempting to close connection that has been closed
+      // Ignore we are shutting down
+    } finally {
+       brokerConnectionEntry.endpointMap.clear();
+       connectionMap.remove(brokerConnectionEntry.getBrokerURL());
+       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+         UIMAFramework.getLogger(CLASS_NAME).logrb(
+                 Level.INFO,
+                 CLASS_NAME.getName(),
+                 "invalidateConnectionAndEndpoints",
+                 JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                 "UIMAJMS_service_closing_connection__INFO",
+                 new Object[] { getAnalysisEngineController().getComponentName(),
+                   brokerConnectionEntry.getBrokerURL() });
+       }
+    }
+    brokerConnectionEntry.setConnection(null);
 
+  }
+  private String getDestinationName(Endpoint anEndpoint) {
+    String destination = anEndpoint.getEndpoint();
+    if (anEndpoint.getDestination() != null
+            && anEndpoint.getDestination() instanceof ActiveMQDestination) {
+      destination = ((ActiveMQDestination) anEndpoint.getDestination()).getPhysicalName();
+    }
+    return destination;
+  }
+  private String getLookupKey(Endpoint anEndpoint) {
+    String key = anEndpoint.getEndpoint() + anEndpoint.getServerURI();
+    String destination = getDestinationName(anEndpoint);
+    if ( anEndpoint.getDelegateKey() != null ) {
+      key = anEndpoint.getDelegateKey() + "-"+destination;
+    } else {
+      key = "Client-"+destination;
+    }
+    return key;
+  }
+  private BrokerConnectionEntry createConnectionEntry(String brokerURL)  {
+    BrokerConnectionEntry brokerConnectionEntry = new BrokerConnectionEntry();
+    connectionMap.put(brokerURL, brokerConnectionEntry);
+    ConnectionTimer connectionTimer = new ConnectionTimer(brokerConnectionEntry);
+    connectionTimer.setAnalysisEngineController(getAnalysisEngineController());
+    brokerConnectionEntry.setConnectionTimer(connectionTimer);
+    return brokerConnectionEntry;
+  }
   /**
    * Returns {@link JmsEndpointConnection_impl} instance bound to a destination defined in the
    * {@link Endpoint} The endpoint identifies the destination that should receive the message. This
@@ -350,29 +412,23 @@
     BrokerConnectionEntry brokerConnectionEntry = null;
     if (connectionMap.containsKey(anEndpoint.getServerURI())) {
       brokerConnectionEntry = (BrokerConnectionEntry) connectionMap.get(anEndpoint.getServerURI());
+      brokerConnectionEntry.setBrokerURL(anEndpoint.getServerURI());
+      if ( JmsEndpointConnection_impl.connectionClosedOrFailed(brokerConnectionEntry) ) {
+        invalidateConnectionAndEndpoints(brokerConnectionEntry);
+        brokerConnectionEntry = createConnectionEntry(anEndpoint.getServerURI());
+      }
       // Findbugs thinks that the above may return null, perhaps due to a race condition. Add
       // the null check just in case
       if (brokerConnectionEntry == null) {
         throw new AsynchAEException("Controller:"
                 + getAnalysisEngineController().getComponentName()
                 + " Unable to Lookup Broker Connection For URL:" + anEndpoint.getServerURI());
-      }
+      } 
     } else {
-      brokerConnectionEntry = new BrokerConnectionEntry();
-      connectionMap.put(anEndpoint.getServerURI(), brokerConnectionEntry);
-      ConnectionTimer connectionTimer = new ConnectionTimer(brokerConnectionEntry);
-      connectionTimer.setAnalysisEngineController(getAnalysisEngineController());
-      brokerConnectionEntry.setConnectionTimer(connectionTimer);
-    }
-
-    // create a key to lookup the endpointConnection object
-    String key = anEndpoint.getEndpoint() + anEndpoint.getServerURI();
-    String destination = anEndpoint.getEndpoint();
-    if (anEndpoint.getDestination() != null
-            && anEndpoint.getDestination() instanceof ActiveMQDestination) {
-      destination = ((ActiveMQDestination) anEndpoint.getDestination()).getPhysicalName();
-      key = destination;
+      brokerConnectionEntry = createConnectionEntry(anEndpoint.getServerURI());
     }
+    String key = getLookupKey(anEndpoint);
+    String destination = getDestinationName(anEndpoint);
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(
               Level.FINE,
@@ -396,6 +452,21 @@
                 new Object[] { getAnalysisEngineController().getComponentName(), destination,
                     anEndpoint.getServerURI() });
       }
+      if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController && anEndpoint.isTempReplyDestination() ) {
+        try {
+          if ( !((JmsInputChannel)getAnalysisEngineController().getInputChannel()).isListenerActiveOnDestination((Destination)anEndpoint.getDestination() )) {
+            //  Create a new temp queue, new listener on it and inject new temp queue into the current
+            //  endpoint object
+            getAnalysisEngineController().getInputChannel().createListener(anEndpoint.getDelegateKey(), anEndpoint);
+            //  The key is partly composed of a temp queue name so get the current temp queue name
+            key = getLookupKey(anEndpoint);
+            destination = getDestinationName(anEndpoint);
+          }
+        } catch( Exception e) {
+          throw new AsynchAEException(e);
+        }
+      }
+
       endpointConnection = new JmsEndpointConnection_impl(brokerConnectionEntry, anEndpoint,
               getAnalysisEngineController());
       brokerConnectionEntry.addEndpointConnection(key, endpointConnection);
@@ -408,7 +479,7 @@
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
                 "getEndpointConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                 "UIMAJMS_open_new_connection_to_endpoint__FINE",
-                new Object[] { destination, anEndpoint.getServerURI() });
+                new Object[] { getDestinationName(anEndpoint), anEndpoint.getServerURI() });
       }
 
       /**
@@ -429,8 +500,6 @@
                 new Object[] { getAnalysisEngineController().getComponentName(), destination,
                     anEndpoint.getServerURI() });
       }
-      // Cache the connection for future use. If not used, connections expire after 50000 millis
-      // connectionMap.put( key, endpointConnection);
     } else {
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(
@@ -1362,8 +1431,8 @@
           aMessage.setStringProperty(AsynchAEMessage.MessageFrom, replyTo);
 
         }
-
         Object destination;
+        
         if ((destination = anEndpoint.getDestination()) != null) {
           aMessage.setJMSReplyTo((Destination) destination);
           aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());