You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/03/27 22:26:38 UTC
svn commit: r1461853 - in
/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq:
JmsEndpointConnection_impl.java JmsOutputChannel.java
Author: cwiklik
Date: Wed Mar 27 21:26:38 2013
New Revision: 1461853
URL: http://svn.apache.org/r1461853
Log:
UIMA-2776 Added jms session management
Modified:
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1461853&r1=1461852&r2=1461853&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Wed Mar 27 21:26:38 2013
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -69,7 +70,13 @@ import org.apache.uima.util.Level;
public class JmsEndpointConnection_impl implements ConsumerListener {
private static final Class CLASS_NAME = JmsEndpointConnection_impl.class;
- private Destination destination;
+ // timestamp containing time when the last message was dispatched from this dispatcher to
+ // jms destination. This is updated every time a message is dispatched to a queue.
+ // At fixed intervals a cleanup thread wakes up and checks for unused dispatchers by
+ // comparing value in lastDispatchTimestamp to a max allowed which by default is 5 minutes.
+ protected AtomicLong lastDispatchTimestamp = new AtomicLong(0);
+
+ protected Destination destination;
protected Session producerSession;
@@ -83,7 +90,7 @@ public class JmsEndpointConnection_impl
private String endpointName;
- private Endpoint delegateEndpoint;
+ protected Endpoint delegateEndpoint;
private volatile boolean retryEnabled;
@@ -95,7 +102,7 @@ public class JmsEndpointConnection_impl
private Object semaphore = new Object();
- private boolean isReplyEndpoint;
+ protected boolean isReplyEndpoint;
private volatile boolean failed = false;
@@ -183,7 +190,7 @@ public class JmsEndpointConnection_impl
"open",
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_override_connection_to_endpoint__FINE",
- new Object[] { aComponentName, getEndpoint(),
+ new Object[] { aComponentName, getEndpoint(),
((JmsOutputChannel) aController.getOutputChannel()).getServerURI() });
}
}
@@ -193,6 +200,7 @@ public class JmsEndpointConnection_impl
// Check connection status and create a new one (if necessary) as an atomic operation
try {
connectionSemaphore.acquire();
+
if (connectionClosedOrFailed(brokerDestinations)) {
// Create one shared connection per unique brokerURL.
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -209,6 +217,7 @@ public class JmsEndpointConnection_impl
// Ignore exceptions on a close of a bad connection
}
}
+ System.out.println("---------- Opening New Broker Connection ---------------");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
// Create shared jms connection to a broker
conn = factory.createConnection();
@@ -240,6 +249,8 @@ public class JmsEndpointConnection_impl
connectionCreationTimestamp = System.nanoTime();
failed = false;
+ } else {
+ System.out.println("...... Reusing Existing Broker Connetion");
}
Connection conn = brokerDestinations.getConnection();
if (failed) {
@@ -595,9 +606,12 @@ public class JmsEndpointConnection_impl
// restarted. The main purpose of the timer is to close connections
// that are not used.
if (startTimer) {
- brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp,
- delegateEndpoint);
+// brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp,
+// delegateEndpoint);
}
+ // record the time when this dispatches sent a message. This time will be used
+ // to find inactive sessions.
+ lastDispatchTimestamp.set(System.currentTimeMillis());
// Succeeded sending the CAS
return true;
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1461853&r1=1461852&r2=1461853&view=diff
==============================================================================
--- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Wed Mar 27 21:26:38 2013
@@ -84,7 +84,7 @@ public class JmsOutputChannel implements
private static final Class CLASS_NAME = JmsOutputChannel.class;
- private static final long INACTIVITY_TIMEOUT = 1800000; // 30 minutes in term of millis
+ private static final long INACTIVITY_TIMEOUT = 300000; // 5 minutes in term of millis
private CountDownLatch controllerLatch = new CountDownLatch(1);
@@ -343,13 +343,13 @@ public class JmsOutputChannel implements
Connection conn = brokerConnectionEntry.getConnection();
try {
if ( conn != null && ((ActiveMQConnection)conn).isClosed()) {
- brokerConnectionEntry.getConnection().stop();
- brokerConnectionEntry.getConnection().close();
- brokerConnectionEntry.setConnection(null);
- for (Entry<Object, JmsEndpointConnection_impl> endpoints : brokerConnectionEntry.endpointMap
- .entrySet()) {
- endpoints.getValue().close(); // close session and producer
- }
+ for (Entry<Object, JmsEndpointConnection_impl> endpoints : brokerConnectionEntry.endpointMap
+ .entrySet()) {
+ endpoints.getValue().close(); // close session and producer
+ }
+ brokerConnectionEntry.getConnection().stop();
+ brokerConnectionEntry.getConnection().close();
+ brokerConnectionEntry.setConnection(null);
}
} catch (Exception e) {
// Ignore this for now. Attempting to close connection that has been closed
@@ -395,17 +395,18 @@ public class JmsOutputChannel implements
ConnectionTimer connectionTimer = new ConnectionTimer(brokerConnectionEntry);
connectionTimer.setAnalysisEngineController(getAnalysisEngineController());
brokerConnectionEntry.setConnectionTimer(connectionTimer);
+ brokerConnectionEntry.setBrokerURL(brokerURL);
return brokerConnectionEntry;
}
/**
* Returns {@link JmsEndpointConnection_impl} instance bound to a destination defined in the
* {@link Endpoint} The endpoint identifies the destination that should receive the message. This
- * method refrences a cache that stores active connections. Active connections are those that are
+ * method references a cache that stores active connections. Active connections are those that are
* fully bound and being used for communication. The key to locate the entry in the connection
* cache is the queue name + broker URI. This uniquely identifies the destination. If an entry
* does not exist in the cache, this routine will create a new connection, initialize it, and
- * cache it for future use. The cache is purely for optimization, to prevent openinig a connection
- * for every message which is a costly operation. Instead the connection is open, cached and
+ * cache it for future use. The cache is purely for optimization, to prevent opening a connection
+ * for every message which is a costly operation. Instead the connection is opened, cached and
* reused. The {@link JmsEndpointConnection_impl} instance is stored in the cache, and uses a
* timer to make sure stale connection are removed. If a connection is not used in a given time
* interval, the connection is considered stale and is dropped from the cache.
@@ -435,8 +436,13 @@ public class JmsOutputChannel implements
brokerConnectionURL = (anEndpoint.isReplyEndpoint()) ? serverURI : anEndpoint.getServerURI();
}
+ String key = getLookupKey(anEndpoint);
+ String destination = getDestinationName(anEndpoint);
+
// First get a Map containing destinations managed by a broker provided by the client
BrokerConnectionEntry brokerConnectionEntry = null;
+ boolean startInactivityReaperTimer = false;
+
if (connectionMap.containsKey(brokerConnectionURL)) {
brokerConnectionEntry = (BrokerConnectionEntry) connectionMap.get(brokerConnectionURL);
// Findbugs thinks that the above may return null, perhaps due to a race condition. Add
@@ -448,14 +454,25 @@ public class JmsOutputChannel implements
}
brokerConnectionEntry.setBrokerURL(brokerConnectionURL);
if ( JmsEndpointConnection_impl.connectionClosedOrFailed(brokerConnectionEntry) ) {
+ brokerConnectionEntry.getConnectionTimer().cancelTimer();
invalidateConnectionAndEndpoints(brokerConnectionEntry);
brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
+ startInactivityReaperTimer = true;
}
} else {
brokerConnectionEntry = createConnectionEntry(brokerConnectionURL);
+ System.out.println("---------------- New Broker "+brokerConnectionURL);
+// long replyQueueInactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
+ startInactivityReaperTimer = true;
+ }
+ if ( startInactivityReaperTimer ) {
+ long inactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
+ brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(inactivityTimeout);
+ // Start the FixedRate timer which wakes up at regular intervals defined by 'replyQueueInactivityTimeout'.
+ // The purpose is to find inactive jms sessions. All sessions found the be inactive will be
+ // closed.
+ brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());
}
- String key = getLookupKey(anEndpoint);
- String destination = getDestinationName(anEndpoint);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
@@ -524,8 +541,12 @@ public class JmsOutputChannel implements
endpointConnection = new JmsEndpointConnection_impl(brokerConnectionEntry, anEndpoint,
getAnalysisEngineController());
brokerConnectionEntry.addEndpointConnection(key, endpointConnection);
- long replyQueueInactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
- brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(replyQueueInactivityTimeout);
+// long replyQueueInactivityTimeout = getInactivityTimeout(destination, brokerConnectionURL);
+// brokerConnectionEntry.getConnectionTimer().setInactivityTimeout(replyQueueInactivityTimeout);
+// // Start the FixedRate timer which wakes up at regular intervals defined by 'replyQueueInactivityTimeout'.
+// // The purpose is to find inactive jms sessions. All sessions found the be inactive will be
+// // closed.
+// brokerConnectionEntry.getConnectionTimer().startSessionReaperTimer(getAnalysisEngineController().getComponentName());
// Connection is not in the cache, create a new connection, initialize it and cache it
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
@@ -588,6 +609,8 @@ public class JmsOutputChannel implements
}
}
}
+
+ //System.out.println("+++++++++++++++++++++ ConnectionMap Size:"+connectionMap.size());
return endpointConnection;
}
@@ -1854,12 +1877,106 @@ public class JmsOutputChannel implements
connectionCreationTimestamp = aConnectionCreationTimestamp;
}
- public void startTimer(long aConnectionCreationTimestamp, final Endpoint endpoint) {
- startTimer(aConnectionCreationTimestamp, endpoint, inactivityTimeout, componentName);
+// public void startTimer(long aConnectionCreationTimestamp, final Endpoint endpoint) {
+// startTimer(aConnectionCreationTimestamp, endpoint, inactivityTimeout, componentName);
+// }
+
+ public synchronized void startSessionReaperTimer( String aComponentName) {
+ //Date timeToRun = new Date(System.currentTimeMillis() + inactivityTimeout);
+ if (timer != null) {
+ timer.cancel();
+ }
+ if (controller != null) {
+ timer = new Timer("Controller:" + aComponentName + ":Session Reaper TimerThread-:"
+ + System.nanoTime());
+ } else {
+ timer = new Timer("Session Reaper TimerThread-:" + System.nanoTime());
+ }
+ System.out.println("Controller:"+controller.getComponentName()+" Starting Session Cleanup Thread with Expiration Time:"+(inactivityTimeout/1000) +" secs"+" Broker:"+brokerDestinations.getBrokerURL());
+ timer.scheduleAtFixedRate(new TimerTask() {
+ public void run() {
+ // System.out.println("Hashcode:"+hashCode()+" Controller:"+controller.getComponentName()+" Session Cleanup Thread Woke Up After "+(inactivityTimeout/1000) +" secs of Sleep to Clean Up Unused JMS Sessions");
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "startTimer",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_inactivity_timer_expired__FINE",
+ new Object[] { Thread.currentThread().getId(), componentName,
+ inactivityTimeout, brokerDestinations.getBrokerURL() });
+ }
+ try {
+ if (brokerDestinations.getConnection() != null
+ && !((ActiveMQConnection) brokerDestinations.getConnection()).isClosed()) {
+ try {
+ Iterator<Entry<Object, JmsEndpointConnection_impl>> it =
+ brokerDestinations.endpointMap.entrySet().iterator();
+ while( it.hasNext() ) {
+ Entry<Object, JmsEndpointConnection_impl> value = it.next();
+ long lastDispatchTime = value.getValue().lastDispatchTimestamp.get();
+ String dest = (value.getValue().isReplyEndpoint )?
+ value.getValue().delegateEndpoint.getDestination().toString():
+ value.getValue().destination.toString();
+ // System.out.println("\t\tController:"+controller.getComponentName()+" Session Iterator: Session:"+dest);
+ if ( (System.currentTimeMillis() - lastDispatchTime) >= inactivityTimeout ) {
+ value.getValue().close();
+ System.out.println("Controller:"+controller.getComponentName()+" Closing Session for Destination:"+dest);
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "startTimer",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_removed_expired_session__FINE",
+ new Object[] { Thread.currentThread().getId(), componentName,
+ inactivityTimeout, brokerDestinations.getBrokerURL(),dest });
+ }
+ it.remove();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ // Ignore this for now. Attempting to close connection that has been closed
+ // Ignore we are shutting down
+ } finally {
+ try {
+ if ( brokerDestinations.endpointMap.isEmpty() ) {
+ brokerDestinations.getConnection().stop();
+ brokerDestinations.getConnection().close();
+ brokerDestinations.setConnection(null);
+ brokerDestinations.endpointMap.clear();
+ connectionMap.remove(brokerDestinations);
+
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.FINE,
+ CLASS_NAME.getName(),
+ "startTimer",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_closing_broker_connection__FINE",
+ new Object[] { Thread.currentThread().getId(), componentName,
+ brokerDestinations.getBrokerURL(),inactivityTimeout });
+ }
+ }
+ } catch( Exception e) {
+ }
+ }
+ }
+// brokerDestinations.setConnection(null);
+ } catch (Exception e) {
+ }
+// finally {
+// removeDestinationFromManagedList(brokerDestinations, endpoint);
+// }
+ }
+ }, inactivityTimeout, inactivityTimeout);
+
}
public synchronized void startTimer(long aConnectionCreationTimestamp, final Endpoint endpoint,
- long currentInactivityTimeout, String aComponentName) {
+ final long currentInactivityTimeout, String aComponentName) {
final long cachedConnectionCreationTimestamp = aConnectionCreationTimestamp;
Date timeToRun = new Date(System.currentTimeMillis() + currentInactivityTimeout);
if (timer != null) {
@@ -1871,6 +1988,7 @@ public class JmsOutputChannel implements
} else {
timer = new Timer("Reply TimerThread-:" + endpoint + ":" + System.nanoTime());
}
+
timer.schedule(new TimerTask() {
public void run() {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
@@ -1920,8 +2038,8 @@ public class JmsOutputChannel implements
cancelTimer();
}
}, timeToRun);
- }
+ }
private void removeDestinationFromManagedList(BrokerConnectionEntry brokerDestinations,
Endpoint endpoint) {
// If this is a reply to a client, use the same broker URL that manages this service input queue.