You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/07/13 22:27:01 UTC

svn commit: r793699 - /incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java

Author: cwiklik
Date: Mon Jul 13 20:27:01 2009
New Revision: 793699

URL: http://svn.apache.org/viewvc?rev=793699&view=rev
Log:
UIMA-1435 Modified to use SharedConnection object to reuse a single JMS Connection

Modified:
    incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java

Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=793699&r1=793698&r2=793699&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Mon Jul 13 20:27:01 2009
@@ -28,6 +28,7 @@
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
@@ -87,7 +88,7 @@
 	private Session session = null;
 	private Session consumerSession = null;
 
-	private Connection connection = null;
+	//private Connection connection = null;
 	private volatile boolean serviceInitializationException;
 	private volatile boolean serviceInitializationCompleted;
 	
@@ -97,6 +98,8 @@
 	private Session producerSession = null;
 	private JmxManager jmxManager = null;
 	private String applicationName = "UimaASClient";
+	//private volatile boolean usesSharedConnection = false;
+	private static SharedConnection sharedConnection = null;
 	
 	public BaseUIMAAsynchronousEngine_impl() {
         UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO, "UIMA-AS version " + UIMAFramework.getVersionString());
@@ -230,17 +233,23 @@
 			}
 			if ( initialized )
 			{
-	      consumerSession.close();
-	      consumer.close();
-	      try
-	      {
-		      connection.close();
-	      }
-	      catch(Exception ex) 
-	      {
-	    	  //	Shutting down, ignore a connection error
+			  try {
+	        consumerSession.close();
+	        consumer.close();
+			  } catch ( JMSException exx) {}
+	      // SharedConnection object manages a single JMS connection to the 
+	      // broker. If the client is scaled out in the same JVM, the connection
+	      // is shared by all instances of the client to reduce number of threads
+	      // in the broker. The SharedConnection object also maintains the number
+	      // of client instances to determine when it is ok to close the connection.
+	      // The connection is closed when the last client calls stop().
+	      if ( sharedConnection != null ) {
+	        // Decrement number of active clients
+	        sharedConnection.decrementClientCount();
+	        // The destroy method closes the JMS connection when the number of
+	        // clients becomes 0, otherwise it is a no-op
+	        sharedConnection.destroy();
 	      }
-	      connection = null;
 			}
 			if ( jmxManager != null )
 			{
@@ -280,17 +289,18 @@
       ((TextMessage)msg).setText("");
     }
 	}
-	protected synchronized Connection getConnection( String aBrokerURI ) throws Exception
+	protected synchronized void setupConnection( String aBrokerURI ) throws Exception
 	{
-		if (connection == null )
+		if (sharedConnection == null )
 		{
 			ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(aBrokerURI);
-			connection = factory.createConnection();
+			Connection connection = factory.createConnection();
 			// This only effects Consumer
 			addPrefetch((ActiveMQConnection)connection);
 			connection.start();
+			sharedConnection = new SharedConnection();
+			sharedConnection.setConnection(connection);
 		} 
-		return connection;
 	}
 
 	private void addPrefetch(ActiveMQConnection aConnection ) {
@@ -300,27 +310,36 @@
 	}
 	private void validateConnection(String aBrokerURI) throws Exception
 	{
-		if (connection == null)	{
-			connection = getConnection(aBrokerURI);
+		if (sharedConnection == null)	{
+			setupConnection(aBrokerURI);
 		}
 	}
 	protected Session getSession(String aBrokerURI) throws Exception
 	{
 		validateConnection(aBrokerURI);
-		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-		return session;
+		return getSession(sharedConnection.getConnection());
 	}
+  protected Session getSession(Connection aConnection) throws Exception
+  {
+    session = aConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    return session;
+  }
 
 	protected MessageProducer lookupProducerForEndpoint( Endpoint anEndpoint ) throws Exception
 	{
-		if ( connection == null || producerSession == null )
+		if ( sharedConnection == null || producerSession == null )
 		{
 			throw new ResourceInitializationException();
 		}
 		Destination dest = producerSession.createQueue(anEndpoint.getEndpoint());
 		return producerSession.createProducer(dest);
 	}
-	public void initializeProducer(String aBrokerURI, String aQueueName) throws Exception
+  public void initializeProducer(String aBrokerURI, String aQueueName) throws Exception {
+    setupConnection(aBrokerURI);
+    initializeProducer(aBrokerURI, aQueueName, sharedConnection.getConnection());
+  }
+
+  public void initializeProducer(String aBrokerURI, String aQueueName, Connection aConnection) throws Exception
 	{
     if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST) ) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "initializeProducer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_jms_producer_INFO", new Object[] { aBrokerURI, aQueueName });
@@ -337,7 +356,7 @@
 
 		//	create a worker object. This doesnt start the thread yet
 		sender = 
-			new ActiveMQMessageSender( getConnection(aBrokerURI), super.pendingMessageList, aQueueName, this);
+			new ActiveMQMessageSender( aConnection, super.pendingMessageList, aQueueName, this);
 		producerInitialized = false;
 		Thread t = new Thread( (BaseMessageSender) sender);
 		//	Start the worker thread. The jms session and message producer are created. Once
@@ -376,9 +395,14 @@
 	 * @param aBrokerURI 
 	 * @throws Exception
 	 */
-	public void initializeConsumer(String aBrokerURI) throws Exception
+  public void initializeConsumer(String aBrokerURI) throws Exception {
+    setupConnection(aBrokerURI);
+    initializeConsumer(aBrokerURI, sharedConnection.getConnection());
+  }
+
+  public void initializeConsumer(String aBrokerURI, Connection connection) throws Exception
 	{
-		consumerSession = getSession(aBrokerURI);
+		consumerSession = getSession(connection);
 		consumerDestination = consumerSession.createTemporaryQueue();
     if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST) ) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "initializeConsumer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_jms_consumer_INFO", new Object[] { aBrokerURI, consumerDestination.getQueueName() });
@@ -424,7 +448,6 @@
 			performanceTuningSettings = new Properties();
 			performanceTuningSettings.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, cas_initial_heap_size);
 		}
-
 		asynchManager = new AsynchAECasManager_impl(rm);
 
 		brokerURI = (String) anApplicationContext.get(UimaAsynchronousEngine.ServerUri);
@@ -486,8 +509,19 @@
 			ObjectName on = new ObjectName("org.apache.uima:name="+applicationName);
 			jmxManager.registerMBean(clientSideJmxStats, on);
 
-			initializeProducer(brokerURI, endpoint);
-			initializeConsumer(brokerURI);
+      // Reuse existing JMS connection if available 
+	    if (sharedConnection != null )  {
+        initializeProducer(brokerURI, endpoint, sharedConnection.getConnection());
+        initializeConsumer(brokerURI, sharedConnection.getConnection());
+	    } else {
+	      //  This call creates a SharedConnection object and a JMS Connection
+	      initializeProducer(brokerURI, endpoint);
+	      initializeConsumer(brokerURI);
+	    }
+	    // Increment number of client instances. SharedConnection object is a static
+	    // and is used to share a single JMS connection. The connection is closed 
+	    // when the last client finishes processing and calls stop().
+	    sharedConnection.incrementClientCount();
 			running = true;
 			sendMetaRequest();
 			waitForMetadataReply();