You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/03/27 22:26:38 UTC

svn commit: r1461853 - in /uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq: JmsEndpointConnection_impl.java JmsOutputChannel.java

Author: cwiklik
Date: Wed Mar 27 21:26:38 2013
New Revision: 1461853

URL: http://svn.apache.org/r1461853
Log:
UIMA-2776 Added jms session management 

Modified:
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1461853&r1=1461852&r2=1461853&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Wed Mar 27 21:26:38 2013
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -69,7 +70,13 @@ import org.apache.uima.util.Level;
 public class JmsEndpointConnection_impl implements ConsumerListener {
   private static final Class CLASS_NAME = JmsEndpointConnection_impl.class;
 
-  private Destination destination;
+  // timestamp containing time when the last message was dispatched from this dispatcher to 
+  // jms destination. This is updated every time a message is dispatched to a queue. 
+  // At fixed intervals a cleanup thread wakes up and checks for unused dispatchers by
+  // comparing value in lastDispatchTimestamp to a max allowed which by default is 5 minutes.
+  protected AtomicLong lastDispatchTimestamp = new AtomicLong(0);
+
+  protected Destination destination;
 
   protected Session producerSession;
 
@@ -83,7 +90,7 @@ public class JmsEndpointConnection_impl 
 
   private String endpointName;
 
-  private Endpoint delegateEndpoint;
+  protected Endpoint delegateEndpoint;
 
   private volatile boolean retryEnabled;
 
@@ -95,7 +102,7 @@ public class JmsEndpointConnection_impl 
 
   private Object semaphore = new Object();
 
-  private boolean isReplyEndpoint;
+  protected boolean isReplyEndpoint;
 
   private volatile boolean failed = false;
 
@@ -183,7 +190,7 @@ public class JmsEndpointConnection_impl 
 		                    "open",
 		                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
 		                    "UIMAJMS_override_connection_to_endpoint__FINE",
-		                    new Object[] { aComponentName, getEndpoint(),
+		                    new Object[] {  aComponentName, getEndpoint(),
 		                      ((JmsOutputChannel) aController.getOutputChannel()).getServerURI() });
 		          }
 		        }
@@ -193,6 +200,7 @@ public class JmsEndpointConnection_impl 
 		          //  Check connection status and create a new one (if necessary) as an atomic operation
 		          try {
 		            connectionSemaphore.acquire();
+		            
 		            if (connectionClosedOrFailed(brokerDestinations)) {
 		              // Create one shared connection per unique brokerURL.
 		              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -209,6 +217,7 @@ public class JmsEndpointConnection_impl 
 		                  //  Ignore exceptions on a close of a bad connection
 		                }
 		              }
+		              System.out.println("---------- Opening New Broker Connection ---------------");
 		              ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
 		              //  Create shared jms connection to a broker
 		              conn = factory.createConnection();
@@ -240,6 +249,8 @@ public class JmsEndpointConnection_impl 
 		          
 		          connectionCreationTimestamp = System.nanoTime();
 		          failed = false;
+		        } else {
+		        	System.out.println("...... Reusing Existing Broker Connetion");
 		        }
 		        Connection conn = brokerDestinations.getConnection();
 		        if (failed) {
@@ -595,9 +606,12 @@ public class JmsEndpointConnection_impl 
       // restarted. The main purpose of the timer is to close connections
       // that are not used.
       if (startTimer) {
-        brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp,
-                delegateEndpoint);
+//        brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp,
+//                delegateEndpoint);
       }
+      // record the time when this dispatches sent a message. This time will be used
+      // to find inactive sessions.
+	  lastDispatchTimestamp.set(System.currentTimeMillis());
       
       // Succeeded sending the CAS
       return true;

Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1461853&r1=1461852&r2=1461853&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Wed Mar 27 21:26:38 2013
@@ -84,7 +84,7 @@ public class JmsOutputChannel implements
 
   private static final Class CLASS_NAME = JmsOutputChannel.class;
 
-  private static final long INACTIVITY_TIMEOUT = 1800000; // 30 minutes in term of millis
+  private static final long INACTIVITY_TIMEOUT = 300000; // 5 minutes in term of millis
 
   private CountDownLatch controllerLatch = new CountDownLatch(1);
 
@@ -343,13 +343,13 @@ public class JmsOutputChannel implements
     Connection conn = brokerConnectionEntry.getConnection();
     try {
        if ( conn != null && ((ActiveMQConnection)conn).isClosed()) {
-         brokerConnectionEntry.getConnection().stop();
-         brokerConnectionEntry.getConnection().close();
-         brokerConnectionEntry.setConnection(null);
-         for (Entry<Object, JmsEndpointConnection_impl> endpoints : brokerConnectionEntry.endpointMap
-                .entrySet()) {
-           endpoints.getValue().close(); // close session and producer
-         }
+           for (Entry<Object, JmsEndpointConnection_impl> endpoints : brokerConnectionEntry.endpointMap
+                   .entrySet()) {
+              endpoints.getValue().close(); // close session and producer
+           }
+           brokerConnectionEntry.getConnection().stop();
+           brokerConnectionEntry.getConnection().close();
+           brokerConnectionEntry.setConnection(null);
        }
     } catch (Exception e) {
       // Ignore this for now. Attempting to close connection that has been closed
@@ -395,17 +395,18 @@ public class JmsOutputChannel implements
     ConnectionTimer connectionTimer = new ConnectionTimer(brokerConnectionEntry);
     connectionTimer.setAnalysisEngineController(getAnalysisEngineController());
     brokerConnectionEntry.setConnectionTimer(connectionTimer);
+    brokerConnectionEntry.setBrokerURL(brokerURL);
     return brokerConnectionEntry;
   }
   /**
    * Returns {@link JmsEndpointConnection_impl} instance bound to a destination defined in the
    * {@link Endpoint} The endpoint identifies the destination that should receive the message. This
-   * method refrences a cache that stores active connections. Active connections are those that are
+   * method references a cache that stores active connections. Active connections are those that are
    * fully bound and being used for communication. The key to locate the entry in the connection
    * cache is the queue name + broker URI. This uniquely identifies the destination. If an entry
    * does not exist in the cache, this routine will create a new connection, initialize it, and
-   * cache it for future use. The cache is purely for optimization, to prevent openinig a connection
-   * for every message which is a costly operation. Instead the connection is open, cached and
+   * cache it for future use. The cache is purely for optimization, to prevent opening a connection
+   * for every message which is a costly operation. Instead the connection is opened, cached and
    * reused. The {@link JmsEndpointConnection_impl} instance is stored in the cache, and uses a
    * timer to make sure stale connection are removed. If a connection is not used in a given time
    * interval, the connection is considered stale and is dropped from the cache.
@@ -435,8 +436,13 @@ public class JmsOutputChannel implements
       brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
       
     }
+    String key = getLookupKey(anEndpoint);
+    String destination = getDestinationName(anEndpoint);
+
     // First get a Map containing destinations managed by a broker provided by the client
     BrokerConnectionEntry brokerConnectionEntry = null;
+    boolean startInactivityReaperTimer = false;
+
     if (connectionMap.containsKey(brokerConnectionURL)) {
       brokerConnectionEntry = (BrokerConnectionEntry) connectionMap.get(brokerConnectionURL);
       // Findbugs thinks that the above may return null, perhaps due to a race condition. Add
@@ -448,14 +454,25 @@ public class JmsOutputChannel implements
       } 
       brokerConnectionEntry.setBrokerURL(brokerConnectionURL);
       if ( JmsEndpointConnection_impl.connectionClosedOrFailed(brokerConnectionEntry) ) {
+    	  brokerConnectionEntry.getConnectionTimer().cancelTimer();
         invalidateConnectionAndEndpoints(brokerConnectionEntry);
         brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
+        startInactivityReaperTimer = true;
       }
     } else {
       brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
+      System.out.println("---------------- New Broker "+brokerConnectionURL);
+//      long replyQueueInactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
+      startInactivityReaperTimer = true;
+    }
+    if ( startInactivityReaperTimer ) {
+        long inactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
+        brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(inactivityTimeout);
+        // Start the FixedRate timer which wakes up at regular intervals defined by 'replyQueueInactivityTimeout'.
+        // The purpose is to find inactive jms sessions. All sessions found the be inactive will be
+        // closed.
+        brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());
     }
-    String key = getLookupKey(anEndpoint);
-    String destination = getDestinationName(anEndpoint);
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(
               Level.FINE,
@@ -524,8 +541,12 @@ public class JmsOutputChannel implements
       endpointConnection = new JmsEndpointConnection_impl(brokerConnectionEntry, anEndpoint,
               getAnalysisEngineController());
       brokerConnectionEntry.addEndpointConnection(key, endpointConnection);
-      long replyQueueInactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
-      brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(replyQueueInactivityTimeout);
+//      long replyQueueInactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
+//      brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(replyQueueInactivityTimeout);
+//      // Start the FixedRate timer which wakes up at regular intervals defined by 'replyQueueInactivityTimeout'.
+//      // The purpose is to find inactive jms sessions. All sessions found the be inactive will be
+//      // closed.
+//      brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());
 
       // Connection is not in the cache, create a new connection, initialize it and cache it
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -588,6 +609,8 @@ public class JmsOutputChannel implements
         } 
       }
     }
+    
+    //System.out.println("+++++++++++++++++++++ ConnectionMap Size:"+connectionMap.size());
     return endpointConnection;
   }
 
@@ -1854,12 +1877,106 @@ public class JmsOutputChannel implements
       connectionCreationTimestamp = aConnectionCreationTimestamp;
     }
 
-    public void startTimer(long aConnectionCreationTimestamp, final Endpoint endpoint) {
-      startTimer(aConnectionCreationTimestamp, endpoint, inactivityTimeout, componentName);
+//    public void startTimer(long aConnectionCreationTimestamp, final Endpoint endpoint) {
+//      startTimer(aConnectionCreationTimestamp, endpoint, inactivityTimeout, componentName);
+//    }
+
+    public synchronized void startSessionReaperTimer( String aComponentName) {
+      //Date timeToRun = new Date(System.currentTimeMillis() + inactivityTimeout);
+      if (timer != null) {
+        timer.cancel();
+      }
+      if (controller != null) {
+        timer = new Timer("Controller:" + aComponentName + ":Session Reaper TimerThread-:"
+                + System.nanoTime());
+      } else {
+        timer = new Timer("Session Reaper TimerThread-:" + System.nanoTime());
+      }
+	  System.out.println("Controller:"+controller.getComponentName()+" Starting Session Cleanup Thread with Expiration Time:"+(inactivityTimeout/1000) +" secs"+" Broker:"+brokerDestinations.getBrokerURL());
+      timer.scheduleAtFixedRate(new TimerTask() {
+          public void run() {
+        	 // System.out.println("Hashcode:"+hashCode()+" Controller:"+controller.getComponentName()+" Session Cleanup Thread Woke Up After "+(inactivityTimeout/1000) +" secs of Sleep to Clean Up Unused JMS Sessions");
+              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(
+                        Level.FINE,
+                        CLASS_NAME.getName(),
+                        "startTimer",
+                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                        "UIMAJMS_inactivity_timer_expired__FINE",
+                        new Object[] { Thread.currentThread().getId(), componentName,
+                            inactivityTimeout, brokerDestinations.getBrokerURL() });
+              }
+              try {
+                  if (brokerDestinations.getConnection() != null
+                          && !((ActiveMQConnection) brokerDestinations.getConnection()).isClosed()) {
+                    try {
+                    	Iterator<Entry<Object, JmsEndpointConnection_impl>> it = 
+                    			brokerDestinations.endpointMap.entrySet().iterator();
+                    	while( it.hasNext() ) {
+                    		Entry<Object, JmsEndpointConnection_impl> value = it.next();
+                    		long lastDispatchTime = value.getValue().lastDispatchTimestamp.get();
+                        	String dest = (value.getValue().isReplyEndpoint )? 
+                        			value.getValue().delegateEndpoint.getDestination().toString(): 
+                        			value.getValue().destination.toString();
+                    	//	System.out.println("\t\tController:"+controller.getComponentName()+" Session Iterator: Session:"+dest);
+                    		if ( (System.currentTimeMillis() - lastDispatchTime) >= inactivityTimeout ) {
+                    			value.getValue().close();
+                    			System.out.println("Controller:"+controller.getComponentName()+" Closing Session for Destination:"+dest);
+
+                    			if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                                    UIMAFramework.getLogger(CLASS_NAME).logrb(
+                                            Level.FINE,
+                                            CLASS_NAME.getName(),
+                                            "startTimer",
+                                            JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                            "UIMAJMS_removed_expired_session__FINE",
+                                            new Object[] { Thread.currentThread().getId(), componentName,
+                                                inactivityTimeout, brokerDestinations.getBrokerURL(),dest });
+                                  }
+                    			it.remove();
+                    		}
+                    	}
+                    } catch (Exception e) {
+                    	e.printStackTrace();
+                      // Ignore this for now. Attempting to close connection that has been closed
+                      // Ignore we are shutting down
+                    } finally {
+                      try {
+                    	  if ( brokerDestinations.endpointMap.isEmpty() ) {
+                        	  brokerDestinations.getConnection().stop();
+                              brokerDestinations.getConnection().close();
+                              brokerDestinations.setConnection(null);
+                              brokerDestinations.endpointMap.clear();
+                              connectionMap.remove(brokerDestinations);
+                              
+                              if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                                  UIMAFramework.getLogger(CLASS_NAME).logrb(
+                                          Level.FINE,
+                                          CLASS_NAME.getName(),
+                                          "startTimer",
+                                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                          "UIMAJMS_closing_broker_connection__FINE",
+                                          new Object[] { Thread.currentThread().getId(), componentName,
+                                              brokerDestinations.getBrokerURL(),inactivityTimeout  });
+                                }
+                    	  }
+                      } catch( Exception e) {
+                      }
+                    }
+                  }
+//                  brokerDestinations.setConnection(null);
+                } catch (Exception e) {
+                } 
+//              finally {
+//                  removeDestinationFromManagedList(brokerDestinations, endpoint);
+//                }
+          }
+      }, inactivityTimeout, inactivityTimeout);
+      
     }
 
     public synchronized void startTimer(long aConnectionCreationTimestamp, final Endpoint endpoint,
-            long currentInactivityTimeout, String aComponentName) {
+            final long currentInactivityTimeout, String aComponentName) {
       final long cachedConnectionCreationTimestamp = aConnectionCreationTimestamp;
       Date timeToRun = new Date(System.currentTimeMillis() + currentInactivityTimeout);
       if (timer != null) {
@@ -1871,6 +1988,7 @@ public class JmsOutputChannel implements
       } else {
         timer = new Timer("Reply TimerThread-:" + endpoint + ":" + System.nanoTime());
       }
+     
       timer.schedule(new TimerTask() {
         public void run() {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
@@ -1920,8 +2038,8 @@ public class JmsOutputChannel implements
           cancelTimer();
         }
       }, timeToRun);
-    }
 
+    }
     private void removeDestinationFromManagedList(BrokerConnectionEntry brokerDestinations,
             Endpoint endpoint) {
       //  If this is a reply to a client, use the same broker URL that manages this service input queue.