You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gn...@apache.org on 2006/12/03 10:24:58 UTC
svn commit: r481742 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool:
ConnectionPool.java PooledConnectionFactory.java PooledSession.java
Author: gnodet
Date: Sun Dec 3 01:24:55 2006
New Revision: 481742
URL: http://svn.apache.org/viewvc?view=rev&rev=481742
Log:
AMQ-1084: Allow o.a.a.pool to support XA transactions
Also enhance the pooled ConnectionFactory to reuse several connections.
Fix the lastUsed timestamp on the ConnectionPool to avoid discarding reusable connections.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java?view=diff&rev=481742&r1=481741&r2=481742
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java Sun Dec 3 01:24:55 2006
@@ -24,6 +24,11 @@
import javax.jms.JMSException;
import javax.jms.Session;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.transport.TransportListener;
@@ -38,17 +43,18 @@
*/
public class ConnectionPool {
+ private TransactionManager transactionManager;
private ActiveMQConnection connection;
private Map cache;
private AtomicBoolean started = new AtomicBoolean(false);
private int referenceCount;
private ObjectPoolFactory poolFactory;
- private long lastUsed;
+ private long lastUsed = System.currentTimeMillis();
private boolean hasFailed;
private int idleTimeout = 30*1000;
- public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
- this(connection, new HashMap(), poolFactory);
+ public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) {
+ this(connection, new HashMap(), poolFactory, transactionManager);
// Add a transport Listener so that we can notice if this connection should be expired due to
// a connection failure.
connection.addTransportListener(new TransportListener(){
@@ -66,10 +72,11 @@
});
}
- public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory) {
+ public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory, TransactionManager transactionManager) {
this.connection = connection;
this.cache = cache;
this.poolFactory = poolFactory;
+ this.transactionManager = transactionManager;
}
public void start() throws JMSException {
@@ -83,13 +90,35 @@
}
public Session createSession(boolean transacted, int ackMode) throws JMSException {
- SessionKey key = new SessionKey(transacted, ackMode);
- SessionPool pool = (SessionPool) cache.get(key);
- if (pool == null) {
- pool = new SessionPool(this, key, poolFactory.createPool());
- cache.put(key, pool);
+ try {
+ boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
+ if (isXa) {
+ transacted = true;
+ ackMode = Session.SESSION_TRANSACTED;
+ }
+ SessionKey key = new SessionKey(transacted, ackMode);
+ SessionPool pool = (SessionPool) cache.get(key);
+ if (pool == null) {
+ pool = new SessionPool(this, key, poolFactory.createPool());
+ cache.put(key, pool);
+ }
+ PooledSession session = pool.borrowSession();
+ if (isXa) {
+ session.setIgnoreClose(true);
+ transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
+ incrementReferenceCount();
+ transactionManager.getTransaction().enlistResource(createXaResource(session));
+ }
+ return session;
+ } catch (RollbackException e) {
+ final JMSException jmsException = new JMSException("Rollback Exception");
+ jmsException.initCause(e);
+ throw jmsException;
+ } catch (SystemException e) {
+ final JMSException jmsException = new JMSException("System Exception");
+ jmsException.initCause(e);
+ throw jmsException;
}
- return pool.borrowSession();
}
synchronized public void close() {
@@ -117,8 +146,8 @@
synchronized public void decrementReferenceCount() {
referenceCount--;
+ lastUsed = System.currentTimeMillis();
if( referenceCount == 0 ) {
- lastUsed = System.currentTimeMillis();
expiredCheck();
}
}
@@ -129,7 +158,8 @@
synchronized public boolean expiredCheck() {
if( connection == null )
return true;
- if( hasFailed || idleTimeout> 0 && System.currentTimeMillis() > lastUsed+idleTimeout ) {
+ long t = System.currentTimeMillis();
+ if( hasFailed || idleTimeout> 0 && t > lastUsed+idleTimeout ) {
if( referenceCount == 0 ) {
close();
}
@@ -145,5 +175,31 @@
public void setIdleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
}
+
+ protected XAResource createXaResource(PooledSession session) throws JMSException {
+ return session.getSession().getTransactionContext();
+ }
+
+ protected class Synchronization implements javax.transaction.Synchronization {
+ private final PooledSession session;
+
+ private Synchronization(PooledSession session) {
+ this.session = session;
+ }
+ public void beforeCompletion() {
+ }
+
+ public void afterCompletion(int status) {
+ try {
+ // This will return session to the pool.
+ session.setIgnoreClose(false);
+ session.close();
+ decrementReferenceCount();
+ } catch (JMSException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java?view=diff&rev=481742&r1=481741&r2=481742
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java Sun Dec 3 01:24:55 2006
@@ -19,11 +19,13 @@
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
+import javax.transaction.TransactionManager;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
@@ -45,10 +47,12 @@
* @version $Revision: 1.1 $
*/
public class PooledConnectionFactory implements ConnectionFactory, Service {
- private ActiveMQConnectionFactory connectionFactory;
+ private ConnectionFactory connectionFactory;
private Map cache = new HashMap();
private ObjectPoolFactory poolFactory;
- private int maximumActive = 5000;
+ private int maximumActive = 500;
+ private int maxConnections = 1;
+ private TransactionManager transactionManager;
public PooledConnectionFactory() {
this(new ActiveMQConnectionFactory());
@@ -62,21 +66,39 @@
this.connectionFactory = connectionFactory;
}
- public ActiveMQConnectionFactory getConnectionFactory() {
+ public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
- public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
+ public TransactionManager getTransactionManager() {
+ return transactionManager;
+ }
+
+ public void setTransactionManager(TransactionManager transactionManager) {
+ this.transactionManager = transactionManager;
+ }
+
public Connection createConnection() throws JMSException {
return createConnection(null, null);
}
public synchronized Connection createConnection(String userName, String password) throws JMSException {
ConnectionKey key = new ConnectionKey(userName, password);
- ConnectionPool connection = (ConnectionPool) cache.get(key);
+ LinkedList pools = (LinkedList) cache.get(key);
+
+ if (pools == null) {
+ pools = new LinkedList();
+ cache.put(key, pools);
+ }
+
+ ConnectionPool connection = null;
+ if (pools.size() == maxConnections) {
+ connection = (ConnectionPool) pools.removeFirst();
+ }
// Now.. we might get a connection, but it might be that we need to
// dump it..
@@ -86,11 +108,15 @@
if (connection == null) {
ActiveMQConnection delegate = createConnection(key);
- connection = new ConnectionPool(delegate, getPoolFactory());
- cache.put(key, connection);
+ connection = createConnectionPool(delegate);
}
+ pools.add(connection);
return new PooledConnection(connection);
}
+
+ protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
+ return new ConnectionPool(connection, getPoolFactory(), transactionManager);
+ }
protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
if (key.getUserName() == null && key.getPassword() == null) {
@@ -144,6 +170,20 @@
*/
public void setMaximumActive(int maximumActive) {
this.maximumActive = maximumActive;
+ }
+
+ /**
+ * @return the maxConnections
+ */
+ public int getMaxConnections() {
+ return maxConnections;
+ }
+
+ /**
+ * @param maxConnections the maxConnections to set
+ */
+ public void setMaxConnections(int maxConnections) {
+ this.maxConnections = maxConnections;
}
protected ObjectPoolFactory createPoolFactory() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java?view=diff&rev=481742&r1=481741&r2=481742
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java Sun Dec 3 01:24:55 2006
@@ -65,57 +65,68 @@
private ActiveMQQueueSender queueSender;
private ActiveMQTopicPublisher topicPublisher;
private boolean transactional = true;
+ private boolean ignoreClose = false;
private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList();
+
public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
this.session = aSession;
this.sessionPool = sessionPool;
this.transactional = session.isTransacted();
}
+ protected boolean isIgnoreClose() {
+ return ignoreClose;
+ }
- public void close() throws JMSException {
- // TODO a cleaner way to reset??
-
- // lets reset the session
- getSession().setMessageListener(null);
-
- // Close any consumers and browsers that may have been created.
- for (Iterator iter = consumers.iterator(); iter.hasNext();) {
- MessageConsumer consumer = (MessageConsumer) iter.next();
- consumer.close();
- }
- consumers.clear();
-
- for (Iterator iter = browsers.iterator(); iter.hasNext();) {
- QueueBrowser browser = (QueueBrowser) iter.next();
- browser.close();
- }
- browsers.clear();
+ protected void setIgnoreClose(boolean ignoreClose) {
+ this.ignoreClose = ignoreClose;
+ }
- // maybe do a rollback?
- if (transactional) {
- try {
- getSession().rollback();
+ public void close() throws JMSException {
+ if (!ignoreClose) {
+ // TODO a cleaner way to reset??
+
+ // lets reset the session
+ getSession().setMessageListener(null);
+
+ // Close any consumers and browsers that may have been created.
+ for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+ MessageConsumer consumer = (MessageConsumer) iter.next();
+ consumer.close();
}
- catch (JMSException e) {
- log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
-
- // lets close the session and not put the session back into the pool
+ consumers.clear();
+
+ for (Iterator iter = browsers.iterator(); iter.hasNext();) {
+ QueueBrowser browser = (QueueBrowser) iter.next();
+ browser.close();
+ }
+ browsers.clear();
+
+ // maybe do a rollback?
+ if (transactional) {
try {
- session.close();
+ getSession().rollback();
}
- catch (JMSException e1) {
- log.trace("Ignoring exception as discarding session: " + e1, e1);
+ catch (JMSException e) {
+ log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
+
+ // lets close the session and not put the session back into the pool
+ try {
+ session.close();
+ }
+ catch (JMSException e1) {
+ log.trace("Ignoring exception as discarding session: " + e1, e1);
+ }
+ session = null;
+ return;
}
- session = null;
- return;
}
+
+ sessionPool.returnSession(this);
}
-
- sessionPool.returnSession(this);
}
public void commit() throws JMSException {