You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/10/21 21:41:19 UTC
svn commit: r828164 - in
/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq:
JmsEndpointConnection_impl.java JmsInputChannel.java JmsOutputChannel.java
Author: cwiklik
Date: Wed Oct 21 19:41:18 2009
New Revision: 828164
URL: http://svn.apache.org/viewvc?rev=828164&view=rev
Log:
UIMA-1632 Modified to recover from a 'bounced' broker. The code detects stale connection, invalidates sessions and recreates them. As part of the recovery the code creates a new temp queue, listener and updates endpoint objects before sending a request to a delegate
Modified:
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=828164&r1=828163&r2=828164&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Wed Oct 21 19:41:18 2009
@@ -126,17 +126,18 @@
}
public boolean isOpen() {
- if (failed || producerSession == null || connectionClosedOrFailed()) {
+ if (failed || producerSession == null || connectionClosedOrFailed(brokerDestinations)) {
return false;
}
return ((ActiveMQSession) producerSession).isRunning();
}
- private boolean connectionClosedOrFailed() {
- if (brokerDestinations.getConnection() == null
- || ((ActiveMQConnection) brokerDestinations.getConnection()).isClosed()
- || ((ActiveMQConnection) brokerDestinations.getConnection()).isClosing()
- || ((ActiveMQConnection) brokerDestinations.getConnection()).isTransportFailed()) {
+ protected static boolean connectionClosedOrFailed(BrokerConnectionEntry aBrokerDestinationMap) {
+ Connection connection = aBrokerDestinationMap.getConnection();
+ if (connection == null
+ || ((ActiveMQConnection) connection).isClosed()
+ || ((ActiveMQConnection) connection).isClosing()
+ || ((ActiveMQConnection) connection).isTransportFailed()) {
return true;
}
return false;
@@ -173,7 +174,7 @@
// Check connection status and create a new one (if necessary) as an atomic operation
try {
connectionSemaphore.acquire();
- if (connectionClosedOrFailed()) {
+ if (connectionClosedOrFailed(brokerDestinations)) {
// Create one shared connection per unique brokerURL.
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
@@ -349,37 +350,33 @@
}
public TextMessage produceTextMessage(String aTextMessage) throws AsynchAEException {
- Assert.notNull(producerSession);
- boolean done = false;
- int retryCount = 4;
- while (retryCount > 0) {
- try {
- retryCount--;
-
- if (aTextMessage == null) {
+ if ( producerSession == null ) {
+ throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
+ }
+ try {
+ if (aTextMessage == null) {
return producerSession.createTextMessage();
- } else {
+ } else {
return producerSession.createTextMessage(aTextMessage);
- }
-
- } catch (javax.jms.IllegalStateException e) {
+ }
+ } catch (javax.jms.IllegalStateException e) {
try {
open();
} catch (ServiceShutdownException ex) {
ex.printStackTrace();
} catch (AsynchAEException ex) {
-
throw ex;
}
} catch (Exception e) {
throw new AsynchAEException(e);
- }
}
throw new AsynchAEException(new InvalidMessageException("Unable to produce Message Object"));
}
public BytesMessage produceByteMessage() throws AsynchAEException {
- Assert.notNull(producerSession);
+ if ( producerSession == null ) {
+ throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
+ }
boolean done = false;
int retryCount = 4;
while (retryCount > 0) {
@@ -402,8 +399,9 @@
}
public ObjectMessage produceObjectMessage() throws AsynchAEException {
- Assert.notNull(producerSession);
-
+ if ( producerSession == null ) {
+ throw new AsynchAEException("Controller:"+controller.getComponentName()+" Unable to create JMS Message. Producer Session Not Initialized (Null)");
+ }
try {
if (!((ActiveMQSession) producerSession).isRunning()) {
open();
@@ -523,6 +521,7 @@
// Succeeded sending the CAS
return true;
} catch (Exception e) {
+ e.printStackTrace();
// If the controller has been stopped no need to send messages
if (controller.isStopped()) {
return true;
@@ -588,7 +587,7 @@
// Fetch an InputChannel that handles messages for a given delegate
InputChannel iC = controller.getReplyInputChannel(delegateKey);
// Create a new Listener, new Temp Queue and associate the listener with the Input Channel
- iC.createListener(delegateKey);
+ iC.createListener(delegateKey, null);
}
}
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=828164&r1=828163&r2=828164&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Wed Oct 21 19:41:18 2009
@@ -859,7 +859,7 @@
}
}
- public void createListener(String aDelegateKey) throws Exception {
+ public void createListener(String aDelegateKey, Endpoint endpointToUpdate) throws Exception {
if (getController() instanceof AggregateAnalysisEngineController) {
Delegate delegate = ((AggregateAnalysisEngineController) getController())
.lookupDelegate(aDelegateKey);
@@ -903,13 +903,30 @@
aDelegateKey, false);
// Override the reply destination.
endpoint.setDestination(newListener.getDestination());
+ if ( endpointToUpdate != null) {
+ endpointToUpdate.setDestination(newListener.getDestination());
+ }
Object clone = ((Endpoint_impl) endpoint).clone();
newListener.setTargetEndpoint((Endpoint) clone);
endpoint.setStatus(Endpoint.OK);
+ System.out
+ .println(".... Listener Started on New Temp Reply Queue ...");
}
}
}
-
+ public boolean isListenerActiveOnDestination(Destination destination ) {
+ List<UimaDefaultMessageListenerContainer> listeners = new ArrayList<UimaDefaultMessageListenerContainer>();
+ for (int i = 0; i < listenerContainerList.size(); i++) {
+ UimaDefaultMessageListenerContainer mListener = (UimaDefaultMessageListenerContainer) listenerContainerList
+ .get(i);
+ if ( mListener.getDestination() != null &&
+ mListener.getDestination() == destination &&
+ mListener.isRunning()) {
+ return true;
+ }
+ }
+ return false;
+ }
/**
* Given an endpoint name returns all listeners attached to this endpoint. There can be multiple
* listeners on an endpoint each with a different selector to receive targeted messages like
Modified: incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: http://svn.apache.org/viewvc/incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=828164&r1=828163&r2=828164&view=diff
==============================================================================
--- incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original)
+++ incubator/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Wed Oct 21 19:41:18 2009
@@ -317,7 +317,69 @@
}
return (int) INACTIVITY_TIMEOUT; // default
}
+ /**
+ * Stop JMS connection and close all sessions associated with this connection
+ *
+ * @param brokerConnectionEntry
+ */
+ private void invalidateConnectionAndEndpoints(BrokerConnectionEntry brokerConnectionEntry ) {
+ Connection conn = brokerConnectionEntry.getConnection();
+ try {
+ if ( conn != null && ((ActiveMQConnection)conn).isClosed()) {
+ brokerConnectionEntry.getConnection().stop();
+ brokerConnectionEntry.getConnection().close();
+ brokerConnectionEntry.setConnection(null);
+ for (Entry<Object, JmsEndpointConnection_impl> endpoints : brokerConnectionEntry.endpointMap
+ .entrySet()) {
+ endpoints.getValue().close(); // close session and producer
+ }
+ }
+ } catch (Exception e) {
+ // Ignore this for now. Attempting to close connection that has been closed
+ // Ignore we are shutting down
+ } finally {
+ brokerConnectionEntry.endpointMap.clear();
+ connectionMap.remove(brokerConnectionEntry.getBrokerURL());
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(
+ Level.INFO,
+ CLASS_NAME.getName(),
+ "invalidateConnectionAndEndpoints",
+ JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+ "UIMAJMS_service_closing_connection__INFO",
+ new Object[] { getAnalysisEngineController().getComponentName(),
+ brokerConnectionEntry.getBrokerURL() });
+ }
+ }
+ brokerConnectionEntry.setConnection(null);
+ }
+ private String getDestinationName(Endpoint anEndpoint) {
+ String destination = anEndpoint.getEndpoint();
+ if (anEndpoint.getDestination() != null
+ && anEndpoint.getDestination() instanceof ActiveMQDestination) {
+ destination = ((ActiveMQDestination) anEndpoint.getDestination()).getPhysicalName();
+ }
+ return destination;
+ }
+ private String getLookupKey(Endpoint anEndpoint) {
+ String key = anEndpoint.getEndpoint() + anEndpoint.getServerURI();
+ String destination = getDestinationName(anEndpoint);
+ if ( anEndpoint.getDelegateKey() != null ) {
+ key = anEndpoint.getDelegateKey() + "-"+destination;
+ } else {
+ key = "Client-"+destination;
+ }
+ return key;
+ }
+ private BrokerConnectionEntry createConnectionEntry(String brokerURL) {
+ BrokerConnectionEntry brokerConnectionEntry = new BrokerConnectionEntry();
+ connectionMap.put(brokerURL, brokerConnectionEntry);
+ ConnectionTimer connectionTimer = new ConnectionTimer(brokerConnectionEntry);
+ connectionTimer.setAnalysisEngineController(getAnalysisEngineController());
+ brokerConnectionEntry.setConnectionTimer(connectionTimer);
+ return brokerConnectionEntry;
+ }
/**
* Returns {@link JmsEndpointConnection_impl} instance bound to a destination defined in the
* {@link Endpoint} The endpoint identifies the destination that should receive the message. This
@@ -350,29 +412,23 @@
BrokerConnectionEntry brokerConnectionEntry = null;
if (connectionMap.containsKey(anEndpoint.getServerURI())) {
brokerConnectionEntry = (BrokerConnectionEntry) connectionMap.get(anEndpoint.getServerURI());
+ brokerConnectionEntry.setBrokerURL(anEndpoint.getServerURI());
+ if ( JmsEndpointConnection_impl.connectionClosedOrFailed(brokerConnectionEntry) ) {
+ invalidateConnectionAndEndpoints(brokerConnectionEntry);
+ brokerConnectionEntry = createConnectionEntry(anEndpoint.getServerURI());
+ }
// Findbugs thinks that the above may return null, perhaps due to a race condition. Add
// the null check just in case
if (brokerConnectionEntry == null) {
throw new AsynchAEException("Controller:"
+ getAnalysisEngineController().getComponentName()
+ " Unable to Lookup Broker Connection For URL:" + anEndpoint.getServerURI());
- }
+ }
} else {
- brokerConnectionEntry = new BrokerConnectionEntry();
- connectionMap.put(anEndpoint.getServerURI(), brokerConnectionEntry);
- ConnectionTimer connectionTimer = new ConnectionTimer(brokerConnectionEntry);
- connectionTimer.setAnalysisEngineController(getAnalysisEngineController());
- brokerConnectionEntry.setConnectionTimer(connectionTimer);
- }
-
- // create a key to lookup the endpointConnection object
- String key = anEndpoint.getEndpoint() + anEndpoint.getServerURI();
- String destination = anEndpoint.getEndpoint();
- if (anEndpoint.getDestination() != null
- && anEndpoint.getDestination() instanceof ActiveMQDestination) {
- destination = ((ActiveMQDestination) anEndpoint.getDestination()).getPhysicalName();
- key = destination;
+ brokerConnectionEntry = createConnectionEntry(anEndpoint.getServerURI());
}
+ String key = getLookupKey(anEndpoint);
+ String destination = getDestinationName(anEndpoint);
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
Level.FINE,
@@ -396,6 +452,21 @@
new Object[] { getAnalysisEngineController().getComponentName(), destination,
anEndpoint.getServerURI() });
}
+ if (getAnalysisEngineController() instanceof AggregateAnalysisEngineController && anEndpoint.isTempReplyDestination() ) {
+ try {
+ if ( !((JmsInputChannel)getAnalysisEngineController().getInputChannel()).isListenerActiveOnDestination((Destination)anEndpoint.getDestination() )) {
+ // Create a new temp queue, new listener on it and inject new temp queue into the current
+ // endpoint object
+ getAnalysisEngineController().getInputChannel().createListener(anEndpoint.getDelegateKey(), anEndpoint);
+ // The key is partly composed of a temp queue name so get the current temp queue name
+ key = getLookupKey(anEndpoint);
+ destination = getDestinationName(anEndpoint);
+ }
+ } catch( Exception e) {
+ throw new AsynchAEException(e);
+ }
+ }
+
endpointConnection = new JmsEndpointConnection_impl(brokerConnectionEntry, anEndpoint,
getAnalysisEngineController());
brokerConnectionEntry.addEndpointConnection(key, endpointConnection);
@@ -408,7 +479,7 @@
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"getEndpointConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
"UIMAJMS_open_new_connection_to_endpoint__FINE",
- new Object[] { destination, anEndpoint.getServerURI() });
+ new Object[] { getDestinationName(anEndpoint), anEndpoint.getServerURI() });
}
/**
@@ -429,8 +500,6 @@
new Object[] { getAnalysisEngineController().getComponentName(), destination,
anEndpoint.getServerURI() });
}
- // Cache the connection for future use. If not used, connections expire after 50000 millis
- // connectionMap.put( key, endpointConnection);
} else {
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(
@@ -1362,8 +1431,8 @@
aMessage.setStringProperty(AsynchAEMessage.MessageFrom, replyTo);
}
-
Object destination;
+
if ((destination = anEndpoint.getDestination()) != null) {
aMessage.setJMSReplyTo((Destination) destination);
aMessage.setStringProperty(UIMAMessage.ServerURI, anEndpoint.getServerURI());