You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/07/12 20:40:28 UTC
svn commit: r793392 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Author: cwiklik
Date: Sun Jul 12 18:40:28 2009
New Revision: 793392
URL: http://svn.apache.org/viewvc?rev=793392&view=rev
Log:
UIMA-1433 Optimized to create single jms connection and use it to create per client session and message producer
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=793392&r1=793391&r2=793392&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Sun Jul 12 18:40:28 2009
@@ -24,6 +24,7 @@
import java.util.Timer;
import java.util.TimerTask;
+import javax.activity.InvalidActivityException;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -35,6 +36,7 @@
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQPrefetchPolicy;
@@ -55,6 +57,7 @@
import org.apache.uima.aae.error.ServiceShutdownException;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.adapter.jms.JmsConstants;
+import org.apache.uima.adapter.jms.activemq.JmsOutputChannel.BrokerConnectionEntry;
import org.apache.uima.util.Level;
import org.springframework.jms.JmsException;
import org.springframework.util.Assert;
@@ -69,8 +72,8 @@
private MessageProducer producer;
- private Connection conn;
-
+ private BrokerConnectionEntry brokerDestinations;
+
private Timer timer;
private String serverUri;
@@ -83,8 +86,6 @@
private long inactivityTimeout = 3600000;
- private Map connectionMap;
-
private volatile boolean retryEnabled;
private AnalysisEngineController controller = null;
@@ -101,9 +102,11 @@
private Object recoveryMux = new Object();
- public JmsEndpointConnection_impl(Map aConnectionMap, Endpoint anEndpoint)
+
+
+ public JmsEndpointConnection_impl(BrokerConnectionEntry aBrokerDestinationMap, Endpoint anEndpoint)
{
- connectionMap = aConnectionMap;
+ brokerDestinations = aBrokerDestinationMap;
serverUri = anEndpoint.getServerURI();
isReplyEndpoint = anEndpoint.isReplyEndpoint();
@@ -149,8 +152,8 @@
{
inactivityTimeout = aTimeout;
}
-
- private void openChannel() throws AsynchAEException, ServiceShutdownException
+
+ private synchronized void openChannel() throws AsynchAEException, ServiceShutdownException
{
try
{
@@ -175,14 +178,17 @@
"openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_activemq_open__FINE",
new Object[] { endpoint, serverUri });
}
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
-
- factory.setDispatchAsync(true);
- factory.setUseAsyncSend(true);
- factory.setCopyMessageOnSend(false);
- conn = factory.createConnection();
+
+ if ( brokerDestinations.getConnection() == null ) {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
+ factory.setDispatchAsync(true);
+ factory.setUseAsyncSend(true);
+ factory.setCopyMessageOnSend(false);
+ Connection conn = factory.createConnection();
+ brokerDestinations.setConnection(conn);
+ }
connectionCreationTimestamp = System.nanoTime();
- producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ producerSession = brokerDestinations.getConnection().createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
if ( (delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint ) && delegateEndpoint.getDestination() != null )
{
producer = producerSession.createProducer(null);
@@ -209,10 +215,12 @@
}
}
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- conn.start();
+ // Since the connection is shared, start it only once
+ if ( !((ActiveMQConnection)brokerDestinations.getConnection()).isStarted() ) {
+ brokerDestinations.getConnection().start();
+ }
if ( controller != null )
{
-
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
"openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_conn_started__FINE",
@@ -230,7 +238,6 @@
}
throw new AsynchAEException(e);
}
-
}
public synchronized void open() throws AsynchAEException, ServiceShutdownException
{
@@ -254,12 +261,15 @@
timer.cancel();
timer = null;
}
- this.close();
+ try {
+ this.close();
+
+ } catch( Exception e) {
+ }
}
- public synchronized void close()
+ public synchronized void close() throws Exception
{
-
if (producer != null)
{
try
@@ -287,12 +297,13 @@
{
destination = null;
}
- if (conn != null)
+ if (brokerDestinations.getConnection() != null &&
+ !((ActiveMQConnection)brokerDestinations.getConnection()).isClosed() )
{
try
{
- conn.stop();
- conn.close();
+ brokerDestinations.getConnection().stop();
+ brokerDestinations.getConnection().close();
}
catch ( Exception e)
{
@@ -300,7 +311,7 @@
// Ignore we are shutting down
}
}
- conn = null;
+ brokerDestinations.setConnection(null);
}
@@ -334,12 +345,6 @@
try
{
retryCount--;
-/*
- if (!((ActiveMQSession) producerSession).isRunning())
- {
- open();
- }
-*/
if (aTextMessage == null)
{
@@ -452,12 +457,20 @@
{
close();
}
+ catch( Exception e) {
+ }
finally
{
- if (connectionMap.containsKey(getEndpoint()))
- {
- connectionMap.remove(getEndpoint());
- }
+ String key = delegateEndpoint.getEndpoint()+delegateEndpoint.getServerURI();
+ String destination = delegateEndpoint.getEndpoint();
+ if ( delegateEndpoint.getDestination() != null && delegateEndpoint.getDestination() instanceof ActiveMQDestination )
+ {
+ destination = ((ActiveMQDestination)delegateEndpoint.getDestination()).getPhysicalName();
+ key = destination;
+ }
+ if ( brokerDestinations.endpointExists(key) ) {
+ brokerDestinations.removeEndpoint(key);
+ }
}
}
cancelTimer();
@@ -491,7 +504,7 @@
int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
int command = aMessage.getIntProperty(AsynchAEMessage.Command);
- if ( failed || conn == null || producerSession == null || !((ActiveMQSession) producerSession).isRunning())
+ if ( failed || brokerDestinations.getConnection() == null || producerSession == null || !((ActiveMQSession) producerSession).isRunning())
{
if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open_connection_to_endpoint__FINE", new Object[] { getEndpoint() });
@@ -582,12 +595,12 @@
{
if ( e instanceof JMSException ) {
handleJmsException( (JMSException)e );
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { controller.getComponentName(), e});
+ }
}
- e.printStackTrace();
- if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
- UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { controller.getComponentName(), e});
- }
}
}
stopTimer();
@@ -641,7 +654,14 @@
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
"handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_failed_deleted_queue_INFO",
new Object[] { controller.getName(), destName});
+ return;
+ } else {
+ if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+ UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { controller.getComponentName(), ex});
+ }
+ ex.printStackTrace();
}
+
}
if ( failed ) {
return; // Already marked failed