You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/09/24 18:46:37 UTC
svn commit: r818542 -
/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Author: cwiklik
Date: Thu Sep 24 16:46:36 2009
New Revision: 818542
URL: http://svn.apache.org/viewvc?rev=818542&view=rev
Log:
UIMA-1433 Improved detection of a bad jms connection.
Modified:
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=818542&r1=818541&r2=818542&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Thu Sep 24 16:46:36 2009
@@ -19,6 +19,10 @@
package org.apache.uima.adapter.jms.activemq;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -52,12 +56,13 @@
import org.apache.uima.util.Level;
import org.springframework.util.Assert;
+
public class JmsEndpointConnection_impl implements ConsumerListener {
private static final Class CLASS_NAME = JmsEndpointConnection_impl.class;
private Destination destination;
- private Session producerSession;
+ protected Session producerSession;
private MessageProducer producer;
@@ -89,6 +94,10 @@
private final String componentName;
+ // Create semaphore to control creation of a JMS connection.
+ // This semaphore is shared by all instances of this class
+ private static Semaphore connectionSemaphore = new Semaphore(1);
+
public JmsEndpointConnection_impl(BrokerConnectionEntry aBrokerDestinationMap,
Endpoint anEndpoint, AnalysisEngineController aController) {
brokerDestinations = aBrokerDestinationMap;
@@ -117,13 +126,20 @@
}
public boolean isOpen() {
- if (failed || producerSession == null || brokerDestinations.getConnection() == null
+ if (failed || producerSession == null || connectionClosedOrFailed()) {
+ return false;
+ }
+ return ((ActiveMQSession) producerSession).isRunning();
+ }
+
+ private boolean connectionClosedOrFailed() {
+ if (brokerDestinations.getConnection() == null
|| ((ActiveMQConnection) brokerDestinations.getConnection()).isClosed()
|| ((ActiveMQConnection) brokerDestinations.getConnection()).isClosing()
|| ((ActiveMQConnection) brokerDestinations.getConnection()).isTransportFailed()) {
- return false;
+ return true;
}
- return ((ActiveMQSession) producerSession).isRunning();
+ return false;
}
private void openChannel() throws AsynchAEException, ServiceShutdownException {
@@ -134,11 +150,11 @@
String anEndpointName, AnalysisEngineController aController) throws AsynchAEException,
ServiceShutdownException {
try {
-
+
// If replying to http request, reply to a queue managed by this service broker using tcp
// protocol
- if (isReplyEndpoint && brokerUri.startsWith("http") ) {
- brokerUri = ((JmsOutputChannel)aController.getOutputChannel()).getServerURI();
+ if (isReplyEndpoint && brokerUri.startsWith("http")) {
+ brokerUri = ((JmsOutputChannel) aController.getOutputChannel()).getServerURI();
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
@@ -154,19 +170,54 @@
if (!isOpen()) {
Connection conn = null;
- if (brokerDestinations.getConnection() == null) {
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
- "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
- "UIMAJMS_activemq_open__FINE", new Object[] { anEndpointName, brokerUri });
+ // Check connection status and create a new one (if necessary) as an atomic operation
+ try {
+ connectionSemaphore.acquire();
+ if (connectionClosedOrFailed()) {
+ // Create one shared connection per unique brokerURL.
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
+ "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_activemq_open__INFO",
+ new Object[] { aController.getComponentName(), anEndpointName, brokerUri });
+ }
+ if ( brokerDestinations.getConnection() != null ) {
+ try {
+ // Close the connection to avoid leaks in the broker
+ brokerDestinations.getConnection().close();
+ } catch( Exception e) {
+ // Ignore exceptions on a close of a bad connection
+ }
+ }
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
+ // Create shared jms connection to a broker
+ conn = factory.createConnection();
+ factory.setDispatchAsync(true);
+ factory.setUseAsyncSend(true);
+ factory.setCopyMessageOnSend(false);
+ // Cache the connection. There should only be one connection in the jvm
+ // per unique broker url.
+ brokerDestinations.setConnection(conn);
+ // Close and invalidate all sessions previously created from the old connection
+ Iterator<Map.Entry<Object, JmsEndpointConnection_impl>> it = brokerDestinations.endpointMap
+ .entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Object, JmsEndpointConnection_impl> entry = it.next();
+ if (entry.getValue().producerSession != null) {
+ // Close session
+ entry.getValue().producerSession.close();
+ // Since we created a new connection invalidate session that
+ // have been created with the old connection
+ entry.getValue().producerSession = null;
+ }
+ }
}
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
- conn = factory.createConnection();
- factory.setDispatchAsync(true);
- factory.setUseAsyncSend(true);
- factory.setCopyMessageOnSend(false);
- brokerDestinations.setConnection(conn);
+ } catch( Exception exc) {
+ throw exc; // rethrow
+ } finally {
+ connectionSemaphore.release();
}
+
connectionCreationTimestamp = System.nanoTime();
failed = false;
}