You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2009/07/13 22:27:01 UTC
svn commit: r793699 -
/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
Author: cwiklik
Date: Mon Jul 13 20:27:01 2009
New Revision: 793699
URL: http://svn.apache.org/viewvc?rev=793699&view=rev
Log:
UIMA-1435 Modified to use SharedConnection object to reuse a single JMS Connection
Modified:
incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
Modified: incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: http://svn.apache.org/viewvc/incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=793699&r1=793698&r2=793699&view=diff
==============================================================================
--- incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original)
+++ incubator/uima/sandbox/trunk/uima-as/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Mon Jul 13 20:27:01 2009
@@ -28,6 +28,7 @@
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
@@ -87,7 +88,7 @@
private Session session = null;
private Session consumerSession = null;
- private Connection connection = null;
+ //private Connection connection = null;
private volatile boolean serviceInitializationException;
private volatile boolean serviceInitializationCompleted;
@@ -97,6 +98,8 @@
private Session producerSession = null;
private JmxManager jmxManager = null;
private String applicationName = "UimaASClient";
+ //private volatile boolean usesSharedConnection = false;
+ private static SharedConnection sharedConnection = null;
public BaseUIMAAsynchronousEngine_impl() {
UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO, "UIMA-AS version " + UIMAFramework.getVersionString());
@@ -230,17 +233,23 @@
}
if ( initialized )
{
- consumerSession.close();
- consumer.close();
- try
- {
- connection.close();
- }
- catch(Exception ex)
- {
- // Shutting down, ignore a connection error
+ try {
+ consumerSession.close();
+ consumer.close();
+ } catch ( JMSException exx) {}
+ // SharedConnection object manages a single JMS connection to the
+ // broker. If the client is scaled out in the same JVM, the connection
+ // is shared by all instances of the client to reduce number of threads
+ // in the broker. The SharedConnection object also maintains the number
+ // of client instances to determine when it is ok to close the connection.
+ // The connection is closed when the last client calls stop().
+ if ( sharedConnection != null ) {
+ // Decrement number of active clients
+ sharedConnection.decrementClientCount();
+ // The destroy method closes the JMS connection when the number of
+ // clients becomes 0, otherwise it is a no-op
+ sharedConnection.destroy();
}
- connection = null;
}
if ( jmxManager != null )
{
@@ -280,17 +289,18 @@
((TextMessage)msg).setText("");
}
}
- protected synchronized Connection getConnection( String aBrokerURI ) throws Exception
+ protected synchronized void setupConnection( String aBrokerURI ) throws Exception
{
- if (connection == null )
+ if (sharedConnection == null )
{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(aBrokerURI);
- connection = factory.createConnection();
+ Connection connection = factory.createConnection();
// This only effects Consumer
addPrefetch((ActiveMQConnection)connection);
connection.start();
+ sharedConnection = new SharedConnection();
+ sharedConnection.setConnection(connection);
}
- return connection;
}
private void addPrefetch(ActiveMQConnection aConnection ) {
@@ -300,27 +310,36 @@
}
private void validateConnection(String aBrokerURI) throws Exception
{
- if (connection == null) {
- connection = getConnection(aBrokerURI);
+ if (sharedConnection == null) {
+ setupConnection(aBrokerURI);
}
}
protected Session getSession(String aBrokerURI) throws Exception
{
validateConnection(aBrokerURI);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- return session;
+ return getSession(sharedConnection.getConnection());
}
+ protected Session getSession(Connection aConnection) throws Exception
+ {
+ session = aConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ return session;
+ }
protected MessageProducer lookupProducerForEndpoint( Endpoint anEndpoint ) throws Exception
{
- if ( connection == null || producerSession == null )
+ if ( sharedConnection == null || producerSession == null )
{
throw new ResourceInitializationException();
}
Destination dest = producerSession.createQueue(anEndpoint.getEndpoint());
return producerSession.createProducer(dest);
}
- public void initializeProducer(String aBrokerURI, String aQueueName) throws Exception
+ public void initializeProducer(String aBrokerURI, String aQueueName) throws Exception {
+ setupConnection(aBrokerURI);
+ initializeProducer(aBrokerURI, aQueueName, sharedConnection.getConnection());
+ }
+
+ public void initializeProducer(String aBrokerURI, String aQueueName, Connection aConnection) throws Exception
{
if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST) ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "initializeProducer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_jms_producer_INFO", new Object[] { aBrokerURI, aQueueName });
@@ -337,7 +356,7 @@
// create a worker object. This doesnt start the thread yet
sender =
- new ActiveMQMessageSender( getConnection(aBrokerURI), super.pendingMessageList, aQueueName, this);
+ new ActiveMQMessageSender( aConnection, super.pendingMessageList, aQueueName, this);
producerInitialized = false;
Thread t = new Thread( (BaseMessageSender) sender);
// Start the worker thread. The jms session and message producer are created. Once
@@ -376,9 +395,14 @@
* @param aBrokerURI
* @throws Exception
*/
- public void initializeConsumer(String aBrokerURI) throws Exception
+ public void initializeConsumer(String aBrokerURI) throws Exception {
+ setupConnection(aBrokerURI);
+ initializeConsumer(aBrokerURI, sharedConnection.getConnection());
+ }
+
+ public void initializeConsumer(String aBrokerURI, Connection connection) throws Exception
{
- consumerSession = getSession(aBrokerURI);
+ consumerSession = getSession(connection);
consumerDestination = consumerSession.createTemporaryQueue();
if ( UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST) ) {
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "initializeConsumer", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_init_jms_consumer_INFO", new Object[] { aBrokerURI, consumerDestination.getQueueName() });
@@ -424,7 +448,6 @@
performanceTuningSettings = new Properties();
performanceTuningSettings.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, cas_initial_heap_size);
}
-
asynchManager = new AsynchAECasManager_impl(rm);
brokerURI = (String) anApplicationContext.get(UimaAsynchronousEngine.ServerUri);
@@ -486,8 +509,19 @@
ObjectName on = new ObjectName("org.apache.uima:name="+applicationName);
jmxManager.registerMBean(clientSideJmxStats, on);
- initializeProducer(brokerURI, endpoint);
- initializeConsumer(brokerURI);
+ // Reuse existing JMS connection if available
+ if (sharedConnection != null ) {
+ initializeProducer(brokerURI, endpoint, sharedConnection.getConnection());
+ initializeConsumer(brokerURI, sharedConnection.getConnection());
+ } else {
+ // This call creates a SharedConnection object and a JMS Connection
+ initializeProducer(brokerURI, endpoint);
+ initializeConsumer(brokerURI);
+ }
+ // Increment number of client instances. SharedConnection object is a static
+ // and is used to share a single JMS connection. The connection is closed
+ // when the last client finishes processing and calls stop().
+ sharedConnection.incrementClientCount();
running = true;
sendMetaRequest();
waitForMetadataReply();