You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/07/12 20:40:28 UTC

svn commit: r793392 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java

Author: cwiklik
Date: Sun Jul 12 18:40:28 2009
New Revision: 793392

URL: http://svn.apache.org/viewvc?rev=793392&view=rev
Log:
UIMA-1433 Optimized to create single jms connection and use it to create per client session and message producer

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=793392&r1=793391&r2=793392&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Sun Jul 12 18:40:28 2009
@@ -24,6 +24,7 @@
 import java.util.Timer;
 import java.util.TimerTask;
 
+import javax.activity.InvalidActivityException;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -35,6 +36,7 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQMessageProducer;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
@@ -55,6 +57,7 @@
 import org.apache.uima.aae.error.ServiceShutdownException;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.adapter.jms.JmsConstants;
+import org.apache.uima.adapter.jms.activemq.JmsOutputChannel.BrokerConnectionEntry;
 import org.apache.uima.util.Level;
 import org.springframework.jms.JmsException;
 import org.springframework.util.Assert;
@@ -69,8 +72,8 @@
 
 	private MessageProducer producer;
 
-	private Connection conn;
-
+	private BrokerConnectionEntry brokerDestinations;
+	
 	private Timer timer;
 
 	private String serverUri;
@@ -83,8 +86,6 @@
 
 	private long inactivityTimeout = 3600000;
 
-	private Map connectionMap;
-
 	private volatile boolean retryEnabled;
 
 	private AnalysisEngineController controller = null;
@@ -101,9 +102,11 @@
 	
 	private Object recoveryMux = new Object();
 	
-	public JmsEndpointConnection_impl(Map aConnectionMap, Endpoint anEndpoint)
+	
+	
+  public JmsEndpointConnection_impl(BrokerConnectionEntry aBrokerDestinationMap, Endpoint anEndpoint)
 	{
-		connectionMap = aConnectionMap;
+    brokerDestinations = aBrokerDestinationMap;
 		serverUri = anEndpoint.getServerURI();
     isReplyEndpoint = anEndpoint.isReplyEndpoint();
 
@@ -149,8 +152,8 @@
 	{
 		inactivityTimeout = aTimeout;
 	}
-
-	private void openChannel() throws AsynchAEException, ServiceShutdownException
+	
+	private synchronized void openChannel() throws AsynchAEException, ServiceShutdownException
 	{
 		try
 		{
@@ -175,14 +178,17 @@
 	                "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_activemq_open__FINE",
 	                new Object[] { endpoint, serverUri });
       }
-			ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
-
-			factory.setDispatchAsync(true);
-      factory.setUseAsyncSend(true);
-      factory.setCopyMessageOnSend(false);
-			conn = factory.createConnection();
+      
+      if ( brokerDestinations.getConnection() == null ) {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
+        factory.setDispatchAsync(true);
+        factory.setUseAsyncSend(true);
+        factory.setCopyMessageOnSend(false);
+        Connection conn = factory.createConnection();
+        brokerDestinations.setConnection(conn);
+      }
 			connectionCreationTimestamp = System.nanoTime();
-			producerSession = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+			producerSession = brokerDestinations.getConnection().createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
 			if ( (delegateEndpoint.getCommand() == AsynchAEMessage.Stop || isReplyEndpoint ) && delegateEndpoint.getDestination() != null )
 			{
 				producer = producerSession.createProducer(null); 
@@ -209,10 +215,12 @@
 				}
 			}
 			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-			conn.start();
+			// Since the connection is shared, start it only once 
+			if ( !((ActiveMQConnection)brokerDestinations.getConnection()).isStarted() ) {
+	      brokerDestinations.getConnection().start();
+			}
 			if ( controller != null )
 			{
-
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(),
 	                "openChannel", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_conn_started__FINE",
@@ -230,7 +238,6 @@
       }
 			throw new AsynchAEException(e);
 		}
-		
 	}
 	public synchronized void open() throws AsynchAEException, ServiceShutdownException
 	{
@@ -254,12 +261,15 @@
 			timer.cancel();
 			timer = null;
 		}
-		this.close();
+		try {
+	    this.close();
+		  
+		} catch( Exception e) {
+		}
 	}
 
-	public synchronized void close()
+	public synchronized void close() throws Exception
 	{
-
 			if (producer != null)
 			{
 				try
@@ -287,12 +297,13 @@
 			{
 				destination = null;
 			}
-			if (conn != null)
+			if (brokerDestinations.getConnection() != null &&
+			    !((ActiveMQConnection)brokerDestinations.getConnection()).isClosed() )
 			{
 				try
 				{
-					conn.stop();
-					conn.close();
+				  brokerDestinations.getConnection().stop();
+				  brokerDestinations.getConnection().close();
 				}
 				catch ( Exception e)
 				{
@@ -300,7 +311,7 @@
 					//	Ignore we are shutting down
 				}
 			}
-			conn = null;
+			brokerDestinations.setConnection(null);
 
 	}
 
@@ -334,12 +345,6 @@
 			try
 			{
 				retryCount--;
-/*				
-				if (!((ActiveMQSession) producerSession).isRunning())
-				{
-					open();
-				}
-*/
 				
 				if (aTextMessage == null)
 				{
@@ -452,12 +457,20 @@
 						{
 							close();
 						}
+						catch( Exception e) {
+						}
 						finally
 						{
-							if (connectionMap.containsKey(getEndpoint()))
-							{
-								connectionMap.remove(getEndpoint());
-							}
+	              String key = delegateEndpoint.getEndpoint()+delegateEndpoint.getServerURI();
+	              String destination = delegateEndpoint.getEndpoint();
+	              if ( delegateEndpoint.getDestination() != null && delegateEndpoint.getDestination() instanceof ActiveMQDestination )
+	              {
+	                destination = ((ActiveMQDestination)delegateEndpoint.getDestination()).getPhysicalName();
+	                key = destination;
+	              }
+	              if ( brokerDestinations.endpointExists(key) ) {
+	                brokerDestinations.removeEndpoint(key);
+	              }
 						}
 					}
 					cancelTimer();
@@ -491,7 +504,7 @@
         int msgType = aMessage.getIntProperty(AsynchAEMessage.MessageType);
         int command = aMessage.getIntProperty(AsynchAEMessage.Command);
 
-				if ( failed || conn == null || producerSession == null || !((ActiveMQSession) producerSession).isRunning())
+				if ( failed || brokerDestinations.getConnection() == null || producerSession == null || !((ActiveMQSession) producerSession).isRunning())
 				{
 	        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
 	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_open_connection_to_endpoint__FINE", new Object[] { getEndpoint() });
@@ -582,12 +595,12 @@
 			{
 			  if ( e instanceof JMSException ) {
 			    handleJmsException( (JMSException)e );
+			  } else {
+	        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+	          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { controller.getComponentName(), e});
+	        }
 			  }
 			    
-			  e.printStackTrace();
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "send", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { controller.getComponentName(), e});
-        }
 			}
 		}
 		stopTimer();
@@ -641,7 +654,14 @@
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),
                 "handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_send_failed_deleted_queue_INFO",
                 new Object[] { controller.getName(), destName});
+        return;
+      } else {
+        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "handleJmsException", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_exception__WARNING", new Object[] { controller.getComponentName(), ex});
+        }
+        ex.printStackTrace();
       }
+        
     }
     if ( failed ) {
       return;   // Already marked failed