You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gn...@apache.org on 2006/12/03 10:24:58 UTC

svn commit: r481742 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool: ConnectionPool.java PooledConnectionFactory.java PooledSession.java

Author: gnodet
Date: Sun Dec  3 01:24:55 2006
New Revision: 481742

URL: http://svn.apache.org/viewvc?view=rev&rev=481742
Log:
AMQ-1084: Allow o.a.a.pool to support XA transactions
Also enhance the pooled ConnectionFactory to reuse several connections.
Fix the lastUsed timestamp on the ConnectionPool to avoid discarding reusable connections.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java?view=diff&rev=481742&r1=481741&r2=481742
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java Sun Dec  3 01:24:55 2006
@@ -24,6 +24,11 @@
 
 import javax.jms.JMSException;
 import javax.jms.Session;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.transport.TransportListener;
@@ -38,17 +43,18 @@
  */
 public class ConnectionPool {
 	
+    private TransactionManager transactionManager;
     private ActiveMQConnection connection;
     private Map cache;
     private AtomicBoolean started = new AtomicBoolean(false);
     private int referenceCount;
     private ObjectPoolFactory poolFactory;
-	private long lastUsed;
+	private long lastUsed = System.currentTimeMillis();
 	private boolean hasFailed;
 	private int idleTimeout = 30*1000;
 
-    public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
-        this(connection, new HashMap(), poolFactory);
+    public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) {
+        this(connection, new HashMap(), poolFactory, transactionManager);
         // Add a transport Listener so that we can notice if this connection should be expired due to 
         // a connection failure.
         connection.addTransportListener(new TransportListener(){
@@ -66,10 +72,11 @@
 		});
     }
 
-    public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory) {
+    public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory, TransactionManager transactionManager) {
         this.connection = connection;
         this.cache = cache;
         this.poolFactory = poolFactory;
+        this.transactionManager = transactionManager;
     }
 
     public void start() throws JMSException {
@@ -83,13 +90,35 @@
     }
 
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
-        SessionKey key = new SessionKey(transacted, ackMode);
-        SessionPool pool = (SessionPool) cache.get(key);
-        if (pool == null) {
-            pool = new SessionPool(this, key, poolFactory.createPool());
-            cache.put(key, pool);
+        try {
+            boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
+            if (isXa) {
+                transacted = true;
+                ackMode = Session.SESSION_TRANSACTED;
+            }
+            SessionKey key = new SessionKey(transacted, ackMode);
+            SessionPool pool = (SessionPool) cache.get(key);
+            if (pool == null) {
+                pool = new SessionPool(this, key, poolFactory.createPool());
+                cache.put(key, pool);
+            }
+            PooledSession session = pool.borrowSession();
+            if (isXa) {
+                session.setIgnoreClose(true);
+                transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
+                incrementReferenceCount();
+                transactionManager.getTransaction().enlistResource(createXaResource(session));
+            }
+            return session;
+        } catch (RollbackException e) {
+            final JMSException jmsException = new JMSException("Rollback Exception");
+            jmsException.initCause(e);
+            throw jmsException;
+        } catch (SystemException e) {
+            final JMSException jmsException = new JMSException("System Exception");
+            jmsException.initCause(e);
+            throw jmsException;
         }
-        return pool.borrowSession();
     }
 
     synchronized public void close() {
@@ -117,8 +146,8 @@
 
 	synchronized public void decrementReferenceCount() {
 		referenceCount--;
+		lastUsed = System.currentTimeMillis();
 		if( referenceCount == 0 ) {
-			lastUsed = System.currentTimeMillis();
 			expiredCheck();
 		}
 	}
@@ -129,7 +158,8 @@
 	synchronized public boolean expiredCheck() {
 		if( connection == null )
 			return true;
-		if( hasFailed || idleTimeout> 0 && System.currentTimeMillis() > lastUsed+idleTimeout ) {
+        long t = System.currentTimeMillis();
+		if( hasFailed || idleTimeout> 0 && t > lastUsed+idleTimeout ) {
 			if( referenceCount == 0 ) {
 				close();
 			}
@@ -145,5 +175,31 @@
 	public void setIdleTimeout(int idleTimeout) {
 		this.idleTimeout = idleTimeout;
 	}
+    
+    protected XAResource createXaResource(PooledSession session) throws JMSException {
+        return session.getSession().getTransactionContext();
+    }
+
+    protected class Synchronization implements javax.transaction.Synchronization {
+        private final PooledSession session;
+
+        private Synchronization(PooledSession session) {
+            this.session = session;
+        }
 
+        public void beforeCompletion() {
+        }
+        
+        public void afterCompletion(int status) {
+            try {
+                // This will return session to the pool.
+                session.setIgnoreClose(false);
+                session.close();
+                decrementReferenceCount();
+            } catch (JMSException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java?view=diff&rev=481742&r1=481741&r2=481742
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java Sun Dec  3 01:24:55 2006
@@ -19,11 +19,13 @@
 
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
+import javax.transaction.TransactionManager;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
@@ -45,10 +47,12 @@
  * @version $Revision: 1.1 $
  */
 public class PooledConnectionFactory implements ConnectionFactory, Service {
-    private ActiveMQConnectionFactory connectionFactory;
+    private ConnectionFactory connectionFactory;
     private Map cache = new HashMap();
     private ObjectPoolFactory poolFactory;
-    private int maximumActive = 5000;
+    private int maximumActive = 500;
+    private int maxConnections = 1;
+    private TransactionManager transactionManager;
 
     public PooledConnectionFactory() {
         this(new ActiveMQConnectionFactory());
@@ -62,21 +66,39 @@
         this.connectionFactory = connectionFactory;
     }
 
-    public ActiveMQConnectionFactory getConnectionFactory() {
+    public ConnectionFactory getConnectionFactory() {
         return connectionFactory;
     }
 
-    public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
         this.connectionFactory = connectionFactory;
     }
 
+    public TransactionManager getTransactionManager() {
+        return transactionManager;
+    }
+
+    public void setTransactionManager(TransactionManager transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+
     public Connection createConnection() throws JMSException {
         return createConnection(null, null);
     }
 
     public synchronized Connection createConnection(String userName, String password) throws JMSException {
         ConnectionKey key = new ConnectionKey(userName, password);
-        ConnectionPool connection = (ConnectionPool) cache.get(key);
+        LinkedList pools = (LinkedList) cache.get(key);
+        
+        if (pools ==  null) {
+            pools = new LinkedList();
+            cache.put(key, pools);
+        }
+
+        ConnectionPool connection = null;
+        if (pools.size() == maxConnections) {
+            connection = (ConnectionPool) pools.removeFirst();
+        }
         
         // Now.. we might get a connection, but it might be that we need to 
         // dump it..
@@ -86,11 +108,15 @@
         
         if (connection == null) {
             ActiveMQConnection delegate = createConnection(key);
-            connection = new ConnectionPool(delegate, getPoolFactory());
-            cache.put(key, connection);
+            connection = createConnectionPool(delegate);
         }
+        pools.add(connection);
         return new PooledConnection(connection);
     }
+    
+    protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
+        return new ConnectionPool(connection, getPoolFactory(), transactionManager);
+    }
 
     protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
         if (key.getUserName() == null && key.getPassword() == null) {
@@ -144,6 +170,20 @@
      */
     public void setMaximumActive(int maximumActive) {
         this.maximumActive = maximumActive;
+    }
+
+    /**
+     * @return the maxConnections
+     */
+    public int getMaxConnections() {
+        return maxConnections;
+    }
+
+    /**
+     * @param maxConnections the maxConnections to set
+     */
+    public void setMaxConnections(int maxConnections) {
+        this.maxConnections = maxConnections;
     }
 
     protected ObjectPoolFactory createPoolFactory() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java?view=diff&rev=481742&r1=481741&r2=481742
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java Sun Dec  3 01:24:55 2006
@@ -65,57 +65,68 @@
     private ActiveMQQueueSender queueSender;
     private ActiveMQTopicPublisher topicPublisher;
     private boolean transactional = true;
+    private boolean ignoreClose = false;
     
     private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
     private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList();
 
+
     public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
         this.session = aSession;
         this.sessionPool = sessionPool;
         this.transactional = session.isTransacted();
     }
 
+    protected boolean isIgnoreClose() {
+        return ignoreClose;
+    }
 
-    public void close() throws JMSException {
-        // TODO a cleaner way to reset??
-
-        // lets reset the session
-        getSession().setMessageListener(null);
-        
-        // Close any consumers and browsers that may have been created.
-        for (Iterator iter = consumers.iterator(); iter.hasNext();) {
-            MessageConsumer consumer = (MessageConsumer) iter.next();
-            consumer.close();
-        }
-        consumers.clear();
-        
-        for (Iterator iter = browsers.iterator(); iter.hasNext();) {
-            QueueBrowser browser = (QueueBrowser) iter.next();
-            browser.close();
-        }
-        browsers.clear();
+    protected void setIgnoreClose(boolean ignoreClose) {
+        this.ignoreClose = ignoreClose;
+    }
 
-        // maybe do a rollback?
-        if (transactional) {
-            try {
-                getSession().rollback();
+    public void close() throws JMSException {
+        if (!ignoreClose) {
+            // TODO a cleaner way to reset??
+    
+            // lets reset the session
+            getSession().setMessageListener(null);
+            
+            // Close any consumers and browsers that may have been created.
+            for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+                MessageConsumer consumer = (MessageConsumer) iter.next();
+                consumer.close();
             }
-            catch (JMSException e) {
-                log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
-
-                // lets close the session and not put the session back into the pool
+            consumers.clear();
+            
+            for (Iterator iter = browsers.iterator(); iter.hasNext();) {
+                QueueBrowser browser = (QueueBrowser) iter.next();
+                browser.close();
+            }
+            browsers.clear();
+    
+            // maybe do a rollback?
+            if (transactional) {
                 try {
-                    session.close();
+                    getSession().rollback();
                 }
-                catch (JMSException e1) {
-                    log.trace("Ignoring exception as discarding session: " + e1, e1);
+                catch (JMSException e) {
+                    log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
+    
+                    // lets close the session and not put the session back into the pool
+                    try {
+                        session.close();
+                    }
+                    catch (JMSException e1) {
+                        log.trace("Ignoring exception as discarding session: " + e1, e1);
+                    }
+                    session = null;
+                    return;
                 }
-                session = null;
-                return;
             }
+    
+            sessionPool.returnSession(this);
         }
-
-        sessionPool.returnSession(this);
     }
 
     public void commit() throws JMSException {