You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/09/12 05:44:49 UTC

svn commit: r1383746 [1/2] - in /activemq/trunk/activemq-pool/src: main/java/org/apache/activemq/pool/ test/java/org/apache/activemq/pool/

Author: tabish
Date: Wed Sep 12 03:44:48 2012
New Revision: 1383746

URL: http://svn.apache.org/viewvc?rev=1383746&view=rev
Log:
fixes for: https://issues.apache.org/jira/browse/AMQ-4019

Refactoring the pools code to use commons-pool for more of the work and allowing us to
expose more stats info as well as enabling async connection pool cleanup of expired 
connections if so desired.  Added some additional tests.  We should continue to add some
more tests and perhaps add some more functionality to the Session pool in ConnectionPool
to let the commons-pool clean up Idle sessions after a time or to maintain the certain 
number of idle sessions etc.

Added:
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledSessionTest.java
Removed:
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java
Modified:
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/AmqJNDIPooledConnectionFactory.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionKey.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaPooledConnectionFactory.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledProducer.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledQueueSender.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledTopicPublisher.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionKey.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledTopicPublisherTest.java

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/AmqJNDIPooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/AmqJNDIPooledConnectionFactory.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/AmqJNDIPooledConnectionFactory.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/AmqJNDIPooledConnectionFactory.java Wed Sep 12 03:44:48 2012
@@ -17,12 +17,13 @@
 package org.apache.activemq.pool;
 
 import java.util.Properties;
+
 import javax.naming.NamingException;
 import javax.naming.Reference;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.jndi.JNDIReferenceFactory;
 import org.apache.activemq.jndi.JNDIStorableInterface;
-import org.apache.activemq.pool.PooledConnectionFactory;
 
 /**
 * AmqJNDIPooledConnectionFactory.java
@@ -35,7 +36,7 @@ public class AmqJNDIPooledConnectionFact
     public AmqJNDIPooledConnectionFactory() {
         super();
     }
-    
+
     public AmqJNDIPooledConnectionFactory(String brokerURL) {
         super(brokerURL);
     }
@@ -46,7 +47,7 @@ public class AmqJNDIPooledConnectionFact
 
     /**
      * set the properties for this instance as retrieved from JNDI
-     * 
+     *
      * @param props
      */
     public synchronized void setProperties(Properties props) {
@@ -56,7 +57,7 @@ public class AmqJNDIPooledConnectionFact
 
     /**
      * Get the properties from this instance for storing in JNDI
-     * 
+     *
      * @return the properties
      */
     public synchronized Properties getProperties() {
@@ -69,7 +70,7 @@ public class AmqJNDIPooledConnectionFact
 
     /**
      * Retrive a Reference for this instance to store in JNDI
-     * 
+     *
      * @return the built Reference
      * @throws NamingException
      *             if error on building Reference
@@ -87,7 +88,7 @@ public class AmqJNDIPooledConnectionFact
                 .buildFromProperties(properties);
         String temp = properties.getProperty("maximumActive");
         if (temp != null && temp.length() > 0) {
-            setMaximumActive(Integer.parseInt(temp));
+            setMaximumActiveSessionPerConnection(Integer.parseInt(temp));
         }
         temp = properties.getProperty("maximumActiveSessionPerConnection");
         if (temp != null && temp.length() > 0) {
@@ -103,7 +104,7 @@ public class AmqJNDIPooledConnectionFact
         ((ActiveMQConnectionFactory) getConnectionFactory())
                 .populateProperties(props);
         props.setProperty("maximumActive", Integer
-                .toString(getMaximumActive()));
+                .toString(getMaximumActiveSessionPerConnection()));
         props.setProperty("maximumActiveSessionPerConnection", Integer
                 .toString(getMaximumActiveSessionPerConnection()));
         props.setProperty("maxConnections", Integer

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionKey.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionKey.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionKey.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionKey.java Wed Sep 12 03:44:48 2012
@@ -18,12 +18,11 @@ package org.apache.activemq.pool;
 
 /**
  * A cache key for the connection details
- * 
- * 
  */
 public class ConnectionKey {
-    private String userName;
-    private String password;
+
+    private final String userName;
+    private final String password;
     private int hash;
 
     public ConnectionKey(String userName, String password) {

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java Wed Sep 12 03:44:48 2012
@@ -18,31 +18,33 @@
 package org.apache.activemq.pool;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import javax.jms.JMSException;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.transport.TransportListener;
-import org.apache.commons.pool.ObjectPoolFactory;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPool;
 
 /**
  * Holds a real JMS connection along with the session pools associated with it.
- *
- *
+ * <p/>
+ * Instances of this class are shared amongst one or more PooledConnection object and must
+ * track the session objects that are loaned out for cleanup on close as well as ensuring
+ * that the temporary destinations of the managed Connection are purged when all references
+ * to this ConnectionPool are released.
  */
 public class ConnectionPool {
 
     private ActiveMQConnection connection;
-    private ConcurrentHashMap<SessionKey, SessionPool> cache;
-    private List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
-    private AtomicBoolean started = new AtomicBoolean(false);
     private int referenceCount;
-    private ObjectPoolFactory poolFactory;
     private long lastUsed = System.currentTimeMillis();
     private long firstUsed = lastUsed;
     private boolean hasFailed;
@@ -50,8 +52,14 @@ public class ConnectionPool {
     private int idleTimeout = 30 * 1000;
     private long expiryTimeout = 0l;
 
-    public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
-        this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory);
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool;
+    private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
+
+    public ConnectionPool(ActiveMQConnection connection) {
+
+        this.connection = connection;
+
         // Add a transport Listener so that we can notice if this connection
         // should be expired due to a connection failure.
         connection.addTransportListener(new TransportListener() {
@@ -76,12 +84,40 @@ public class ConnectionPool {
         if(connection.isTransportFailed()) {
             hasFailed = true;
         }
-    }
 
-    public ConnectionPool(ActiveMQConnection connection, ConcurrentHashMap<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
-        this.connection = connection;
-        this.cache = cache;
-        this.poolFactory = poolFactory;
+        // Create our internal Pool of session instances.
+        this.sessionPool = new GenericKeyedObjectPool<SessionKey, PooledSession>(
+            new KeyedPoolableObjectFactory<SessionKey, PooledSession>() {
+
+                @Override
+                public void activateObject(SessionKey key, PooledSession session) throws Exception {
+                    ConnectionPool.this.loanedSessions.add(session);
+                }
+
+                @Override
+                public void destroyObject(SessionKey key, PooledSession session) throws Exception {
+                    ConnectionPool.this.loanedSessions.remove(session);
+                    session.getInternalSession().close();
+                }
+
+                @Override
+                public PooledSession makeObject(SessionKey key) throws Exception {
+                    ActiveMQSession session = (ActiveMQSession)
+                            ConnectionPool.this.connection.createSession(key.isTransacted(), key.getAckMode());
+                    return new PooledSession(key, session, sessionPool);
+                }
+
+                @Override
+                public void passivateObject(SessionKey key, PooledSession session) throws Exception {
+                    ConnectionPool.this.loanedSessions.remove(session);
+                }
+
+                @Override
+                public boolean validateObject(SessionKey key, PooledSession session) {
+                    return true;
+                }
+            }
+        );
     }
 
     public void start() throws JMSException {
@@ -101,39 +137,20 @@ public class ConnectionPool {
 
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
         SessionKey key = new SessionKey(transacted, ackMode);
-        SessionPool pool = null;
-        pool = cache.get(key);
-        if (pool == null) {
-            SessionPool newPool = createSessionPool(key);
-            SessionPool prevPool = cache.putIfAbsent(key, newPool);
-            if (prevPool != null && prevPool != newPool) {
-                // newPool was not the first one to be associated with this
-                // key... close created session pool
-                try {
-                    newPool.close();
-                } catch (Exception e) {
-                    throw new JMSException(e.getMessage());
-                }
-            }
-            pool = cache.get(key); // this will return a non-null value...
+        PooledSession session;
+        try {
+            session = sessionPool.borrowObject(key);
+        } catch (Exception e) {
+            throw JMSExceptionSupport.create(e);
         }
-        PooledSession session = pool.borrowSession();
-        this.loanedSessions.add(session);
         return session;
     }
 
     public synchronized void close() {
         if (connection != null) {
             try {
-                Iterator<SessionPool> i = cache.values().iterator();
-                while (i.hasNext()) {
-                    SessionPool pool = i.next();
-                    i.remove();
-                    try {
-                        pool.close();
-                    } catch (Exception e) {
-                    }
-                }
+                sessionPool.close();
+            } catch (Exception e) {
             } finally {
                 try {
                     connection.close();
@@ -156,6 +173,10 @@ public class ConnectionPool {
         if (referenceCount == 0) {
             expiredCheck();
 
+            // Loaned sessions are those that are active in the sessionPool and
+            // have not been closed by the client before closing the connection.
+            // These need to be closed so that all session's reflect the fact
+            // that the parent Connection is closed.
             for (PooledSession session : this.loanedSessions) {
                 try {
                     session.close();
@@ -164,8 +185,8 @@ public class ConnectionPool {
             }
             this.loanedSessions.clear();
 
-            // only clean up temp destinations when all users
-            // of this connection have called close
+            // We only clean up temporary destinations when all users of this
+            // connection have called close.
             if (getConnection() != null) {
                 getConnection().cleanUpTempDestinations();
             }
@@ -173,21 +194,30 @@ public class ConnectionPool {
     }
 
     /**
+     * Determines if this Connection has expired.
+     * <p/>
+     * A ConnectionPool is considered expired when all references to it are released AND either
+     * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed.
+     * Once a ConnectionPool is determined to have expired its underlying Connection is closed.
+     *
      * @return true if this connection has expired.
      */
     public synchronized boolean expiredCheck() {
         if (connection == null) {
             return true;
         }
+
         if (hasExpired) {
             if (referenceCount == 0) {
                 close();
             }
             return true;
         }
+
         if (hasFailed
                 || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
                 || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
+
             hasExpired = true;
             if (referenceCount == 0) {
                 close();
@@ -205,24 +235,59 @@ public class ConnectionPool {
         this.idleTimeout = idleTimeout;
     }
 
-    protected SessionPool createSessionPool(SessionKey key) {
-        return new SessionPool(this, key, poolFactory.createPool());
-    }
-
     public void setExpiryTimeout(long expiryTimeout) {
-        this.expiryTimeout  = expiryTimeout;
+        this.expiryTimeout = expiryTimeout;
     }
 
     public long getExpiryTimeout() {
         return expiryTimeout;
     }
 
-    void onSessionReturned(PooledSession session) {
-        this.loanedSessions.remove(session);
+    public int getMaximumActiveSessionPerConnection() {
+        return this.sessionPool.getMaxActive();
+    }
+
+    public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
+        this.sessionPool.setMaxActive(maximumActiveSessionPerConnection);
+    }
+
+    /**
+     * @return the total number of Pooled session including idle sessions that are not
+     *          currently loaned out to any client.
+     */
+    public int getNumSessions() {
+        return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive();
+    }
+
+    /**
+     * @return the total number of Sessions that are in the Session pool but not loaned out.
+     */
+    public int getNumIdleSessions() {
+        return this.sessionPool.getNumIdle();
+    }
+
+    /**
+     * @return the total number of Session's that have been loaned to PooledConnection instances.
+     */
+    public int getNumActiveSessions() {
+        return this.sessionPool.getNumActive();
+    }
+
+    /**
+     * Configure whether the createSession method should block when there are no more idle sessions and the
+     * pool already contains the maximum number of active sessions.  If false the create method will fail
+     * and throw an exception.
+     *
+     * @param block
+     * 		Indicates whether blocking should be used to wait for more space to create a session.
+     */
+    public void setBlockIfSessionPoolIsFull(boolean block) {
+        this.sessionPool.setWhenExhaustedAction(
+                (block ? GenericObjectPool.WHEN_EXHAUSTED_BLOCK : GenericObjectPool.WHEN_EXHAUSTED_FAIL));
     }
 
-    void onSessionInvalidated(PooledSession session) {
-        this.loanedSessions.remove(session);
+    public boolean isBlockIfSessionPoolIsFull() {
+        return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
     }
 
     @Override

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java Wed Sep 12 03:44:48 2012
@@ -28,11 +28,18 @@ public class JcaConnectionPool extends X
 
     private String name;
 
-    public JcaConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) {
-        super(connection, poolFactory, transactionManager);
+    public JcaConnectionPool(ActiveMQConnection connection, TransactionManager transactionManager, String name) {
+        super(connection, transactionManager);
         this.name = name;
     }
 
+    /**
+     * @deprecated
+     */
+    public JcaConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) {
+        this(connection, transactionManager, name);
+    }
+
     protected XAResource createXaResource(PooledSession session) throws JMSException {
         XAResource xares = new LocalAndXATransaction(session.getInternalSession().getTransactionContext());
         if (name != null) {

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaPooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaPooledConnectionFactory.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaPooledConnectionFactory.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/JcaPooledConnectionFactory.java Wed Sep 12 03:44:48 2012
@@ -19,7 +19,7 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.ActiveMQConnectionFactory;
 
 /**
- * 
+ *
  */
 public class JcaPooledConnectionFactory extends XaPooledConnectionFactory {
 
@@ -46,7 +46,6 @@ public class JcaPooledConnectionFactory 
     }
 
     protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
-        return new JcaConnectionPool(connection, getPoolFactory(), getTransactionManager(), getName());
+        return new JcaConnectionPool(connection, getTransactionManager(), getName());
     }
-
 }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java Wed Sep 12 03:44:48 2012
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
  * {@link QueueConnection} which is pooled and on {@link #close()} will return
- * itself to the sessionPool.
+ * its reference to the ConnectionPool backing it.
  *
  * <b>NOTE</b> this implementation is only intended for use when sending
  * messages. It does not deal with pooling of consumers; for that look at a
@@ -63,6 +63,14 @@ public class PooledConnection implements
     private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
     private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
 
+    /**
+     * Creates a new PooledConnection instance that uses the given ConnectionPool to create
+     * and manage its resources.  The ConnectionPool instance can be shared amongst many
+     * PooledConnection instances.
+     *
+     * @param pool
+     *      The connection and pool manager backing this proxy connection object.
+     */
     public PooledConnection(ConnectionPool pool) {
         this.pool = pool;
         this.pool.incrementReferenceCount();
@@ -75,6 +83,7 @@ public class PooledConnection implements
         return new PooledConnection(pool);
     }
 
+    @Override
     public void close() throws JMSException {
         this.cleanupConnectionTemporaryDestinations();
         if (this.pool != null) {
@@ -83,45 +92,55 @@ public class PooledConnection implements
         }
     }
 
+    @Override
     public void start() throws JMSException {
         assertNotClosed();
         pool.start();
     }
 
+    @Override
     public void stop() throws JMSException {
         stopped = true;
     }
 
+    @Override
     public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages)
             throws JMSException {
         return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
     }
 
+    @Override
     public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
         return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
     }
 
+    @Override
     public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i)
             throws JMSException {
         return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
     }
 
+    @Override
     public String getClientID() throws JMSException {
         return getConnection().getClientID();
     }
 
+    @Override
     public ExceptionListener getExceptionListener() throws JMSException {
         return getConnection().getExceptionListener();
     }
 
+    @Override
     public ConnectionMetaData getMetaData() throws JMSException {
         return getConnection().getMetaData();
     }
 
+    @Override
     public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
         getConnection().setExceptionListener(exceptionListener);
     }
 
+    @Override
     public void setClientID(String clientID) throws JMSException {
 
         // ignore repeated calls to setClientID() with the same client id
@@ -132,20 +151,24 @@ public class PooledConnection implements
         }
     }
 
+    @Override
     public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
         return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
     }
 
     // Session factory methods
     // -------------------------------------------------------------------------
+    @Override
     public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
         return (QueueSession) createSession(transacted, ackMode);
     }
 
+    @Override
     public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
         return (TopicSession) createSession(transacted, ackMode);
     }
 
+    @Override
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
         PooledSession result;
         result = (PooledSession) pool.createSession(transacted, ackMode);
@@ -156,18 +179,10 @@ public class PooledConnection implements
         return (Session) result;
     }
 
-
-    public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
-        connTempQueues.add(tempQueue);
-    }
-
-    public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
-        connTempTopics.add(tempTopic);
-    }
-
     // EnhancedCollection API
     // -------------------------------------------------------------------------
 
+    @Override
     public DestinationSource getDestinationSource() throws JMSException {
         return getConnection().getDestinationSource();
     }
@@ -175,6 +190,14 @@ public class PooledConnection implements
     // Implementation methods
     // -------------------------------------------------------------------------
 
+    public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
+        connTempQueues.add(tempQueue);
+    }
+
+    public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
+        connTempTopics.add(tempTopic);
+    }
+
     public ActiveMQConnection getConnection() throws JMSException {
         assertNotClosed();
         return pool.getConnection();
@@ -222,4 +245,26 @@ public class PooledConnection implements
         }
         connTempTopics.clear();
     }
+
+    /**
+     * @return the total number of Pooled session including idle sessions that are not
+     *          currently loaned out to any client.
+     */
+    public int getNumSessions() {
+        return this.pool.getNumSessions();
+    }
+
+    /**
+     * @return the number of Sessions that are currently checked out of this Connection's session pool.
+     */
+    public int getNumActiveSessions() {
+        return this.pool.getNumActiveSessions();
+    }
+
+    /**
+     * @return the number of Sessions that are idle in this Connection's sessions pool.
+     */
+    public int getNumtIdleSessions() {
+        return this.pool.getNumIdleSessions();
+    }
 }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java Wed Sep 12 03:44:48 2012
@@ -16,28 +16,27 @@
  */
 package org.apache.activemq.pool;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
+
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.Service;
-import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.commons.pool.KeyedObjectPool;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.ObjectPoolFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.commons.pool.ObjectPoolFactory;
-import org.apache.commons.pool.impl.GenericObjectPool;
-import org.apache.commons.pool.impl.GenericObjectPoolFactory;
 
 /**
  * A JMS provider which pools Connection, Session and MessageProducer instances
- * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's <a
- * href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
+ * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's
+ * <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
  * Connections, sessions and producers are returned to a pool after use so that they can be reused later
  * without having to undergo the cost of creating them again.
  *
@@ -54,92 +53,193 @@ import org.apache.commons.pool.impl.Gene
  * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
  * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
  *
- * @org.apache.xbean.XBean element="pooledConnectionFactory"
- *
+ * Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the
+ * pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should
+ * be used when configuring this optional feature. Eviction runs contend with client threads for access
+ * to objects in the pool, so if they run too frequently performance issues may result. The idle object
+ * eviction thread may be configured using the {@link setTimeBetweenExpirationCheckMillis} method.  By
+ * default the value is -1 which means no eviction thread will be run.  Set to a non-negative value to
+ * configure the idle eviction thread to run.
  *
+ * @org.apache.xbean.XBean element="pooledConnectionFactory"
  */
 public class PooledConnectionFactory implements ConnectionFactory, Service {
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
+
+    private final AtomicBoolean stopped = new AtomicBoolean(false);
+    private final GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
+
     private ConnectionFactory connectionFactory;
-    private final Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
-    private ObjectPoolFactory poolFactory;
+
     private int maximumActiveSessionPerConnection = 500;
-    private int maxConnections = 1;
     private int idleTimeout = 30 * 1000;
     private boolean blockIfSessionPoolIsFull = true;
-    private final AtomicBoolean stopped = new AtomicBoolean(false);
     private long expiryTimeout = 0l;
     private boolean createConnectionOnStartup = true;
 
+    /**
+     * Creates new PooledConnectionFactory with a default ActiveMQConnectionFactory instance.
+     * <p/>
+     * The URI used to connect to ActiveMQ comes from the default value of ActiveMQConnectionFactory.
+     */
     public PooledConnectionFactory() {
         this(new ActiveMQConnectionFactory());
     }
 
+    /**
+     * Creates a new PooledConnectionFactory that will use the given broker URI to connect to
+     * ActiveMQ.
+     *
+     * @param brokerURL
+     *      The URI to use to configure the internal ActiveMQConnectionFactory.
+     */
     public PooledConnectionFactory(String brokerURL) {
         this(new ActiveMQConnectionFactory(brokerURL));
     }
 
+    /**
+     * Creates a new PooledConnectionFactory that will use the given ActiveMQConnectionFactory to
+     * create new ActiveMQConnection instances that will be pooled.
+     *
+     * @param connectionFactory
+     *      The ActiveMQConnectionFactory to create new Connections for this pool.
+     */
     public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
         this.connectionFactory = connectionFactory;
+
+        this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
+            new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() {
+
+                @Override
+                public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
+                }
+
+                @Override
+                public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception {
+                    try {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Destroying connection: {}", connection);
+                        }
+                        connection.close();
+                    } catch (Exception e) {
+                        LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
+                    }
+                }
+
+                @Override
+                public ConnectionPool makeObject(ConnectionKey key) throws Exception {
+                    ActiveMQConnection delegate = createConnection(key);
+
+                    ConnectionPool connection = new ConnectionPool(delegate);
+                    connection.setIdleTimeout(getIdleTimeout());
+                    connection.setExpiryTimeout(getExpiryTimeout());
+                    connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
+                    connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
+
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Created new connection: {}", connection);
+                    }
+
+                    return connection;
+                }
+
+                @Override
+                public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
+                }
+
+                @Override
+                public boolean validateObject(ConnectionKey key, ConnectionPool connection) {
+                    if (connection != null && connection.expiredCheck()) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Connection has expired: {} and will be destroyed", connection);
+                        }
+
+                        return false;
+                    }
+
+                    return true;
+                }
+        });
+
+        // Set max idle (not max active) since our connections always idle in the pool.
+        this.connectionsPool.setMaxIdle(1);
+
+        // We always want our validate method to control when idle objects are evicted.
+        this.connectionsPool.setTestOnBorrow(true);
+        this.connectionsPool.setTestWhileIdle(true);
     }
 
+    /**
+     * @return the currently configured ConnectionFactory used to create the pooled Connections.
+     */
     public ConnectionFactory getConnectionFactory() {
         return connectionFactory;
     }
 
+    /**
+     * Sets the ConnectionFactory used to create new pooled Connections.
+     * <p/>
+     * Updates to this value do not affect Connections that were previously created and placed
+     * into the pool.  In order to allocate new Connections based off this new ConnectionFactory
+     * it is first necessary to {@link clear} the pooled Connections.
+     *
+     * @param connectionFactory
+     *      The factory to use to create pooled Connections.
+     */
     public void setConnectionFactory(ConnectionFactory connectionFactory) {
         this.connectionFactory = connectionFactory;
     }
 
+    @Override
     public Connection createConnection() throws JMSException {
         return createConnection(null, null);
     }
 
+    @Override
     public synchronized Connection createConnection(String userName, String password) throws JMSException {
         if (stopped.get()) {
             LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
             return null;
         }
 
+        ConnectionPool connection = null;
         ConnectionKey key = new ConnectionKey(userName, password);
-        LinkedList<ConnectionPool> pools = cache.get(key);
 
-        if (pools == null) {
-            pools = new LinkedList<ConnectionPool>();
-            cache.put(key, pools);
+        // This will either return an existing non-expired ConnectionPool or it
+        // will create a new one to meet the demand.
+        if (connectionsPool.getNumIdle(key) < getMaxConnections()) {
+            try {
+                // we want borrowObject to return the one we added.
+                connectionsPool.setLifo(true);
+                connectionsPool.addObject(key);
+            } catch (Exception e) {
+                throw JMSExceptionSupport.create("Error while attempting to add new Connection to the pool", e);
+            }
+        } else {
+            // now we want the oldest one in the pool.
+            connectionsPool.setLifo(false);
         }
 
-        ConnectionPool connection = null;
-        if (pools.size() == maxConnections) {
-            connection = pools.removeFirst();
+        try {
+            connection = connectionsPool.borrowObject(key);
+        } catch (Exception e) {
+            throw JMSExceptionSupport.create("Error while attempting to retrieve a connection from the pool", e);
         }
 
-        // Now.. we might get a connection, but it might be that we need to
-        // dump it..
-        if (connection != null && connection.expiredCheck()) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Connection has expired: {}", connection);
-            }
-            connection = null;
+        try {
+            connectionsPool.returnObject(key, connection);
+        } catch (Exception e) {
+            throw JMSExceptionSupport.create("Error when returning connection to the pool", e);
         }
 
-        if (connection == null) {
-            ActiveMQConnection delegate = createConnection(key);
-            connection = createConnectionPool(delegate);
-
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Created new connection: {}", connection);
-            }
-        }
-        pools.add(connection);
         return new PooledConnection(connection);
     }
 
-    protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
-        ConnectionPool result =  new ConnectionPool(connection, getPoolFactory());
-        result.setIdleTimeout(getIdleTimeout());
-        result.setExpiryTimeout(getExpiryTimeout());
-        return result;
+    /**
+     * @deprecated
+     */
+    public ObjectPoolFactory<?> getPoolFactory() {
+        return null;
     }
 
     protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
@@ -150,8 +250,9 @@ public class PooledConnectionFactory imp
         }
     }
 
+    @Override
     public void start() {
-        LOG.debug("Staring the PooledConnectionFactory");
+        LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
         stopped.set(false);
         if (isCreateConnectionOnStartup()) {
             try {
@@ -163,34 +264,32 @@ public class PooledConnectionFactory imp
         }
     }
 
+    @Override
     public void stop() {
-        LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}", cache.size());
-        stopped.set(true);
-        for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
-            for (ConnectionPool connection : iter.next()) {
-                try {
-                    connection.close();
-                } catch (Exception e) {
-                    LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
-                }
-            }
-        }
-        cache.clear();
-    }
+        LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
+                  connectionsPool.getNumActive());
 
-    public ObjectPoolFactory getPoolFactory() {
-        if (poolFactory == null) {
-            poolFactory = createPoolFactory();
+        if (stopped.compareAndSet(false, true)) {
+            try {
+                connectionsPool.close();
+            } catch (Exception e) {
+            }
         }
-        return poolFactory;
     }
 
     /**
-     * Sets the object pool factory used to create individual session pools for
-     * each connection
+     * Clears all connections from the pool.  Each connection that is currently in the pool is
+     * closed and removed from the pool.  A new connection will be created on the next call to
+     * {@link createConnection}.  Care should be taken when using this method as Connections that
+     * are in use be client's will be closed.
      */
-    public void setPoolFactory(ObjectPoolFactory poolFactory) {
-        this.poolFactory = poolFactory;
+    public void clear() {
+
+        if (stopped.get()) {
+            return;
+        }
+
+        this.connectionsPool.clear();
     }
 
     /**
@@ -209,12 +308,22 @@ public class PooledConnectionFactory imp
         setMaximumActiveSessionPerConnection(maximumActive);
     }
 
+    /**
+     * Returns the currently configured maximum number of sessions a pooled Connection will
+     * create before it either blocks or throws an exception when a new session is requested,
+     * depending on configuration.
+     *
+     * @return the number of session instances that can be taken from a pooled connection.
+     */
     public int getMaximumActiveSessionPerConnection() {
         return maximumActiveSessionPerConnection;
     }
 
     /**
      * Sets the maximum number of active sessions per connection
+     *
+     * @param maximumActiveSessionPerConnection
+     *      The maximum number of active session per connection in the pool.
      */
     public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
         this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
@@ -237,40 +346,61 @@ public class PooledConnectionFactory imp
     }
 
     /**
-     * @return the maxConnections
+     * Returns whether a pooled Connection will enter a blocked state or will throw an Exception
+     * once the maximum number of sessions has been borrowed from the the Session Pool.
+     *
+     * @return true if the pooled Connection createSession method will block when the limit is hit.
+     * @see setBlockIfSessionPoolIsFull
+     */
+    public boolean isBlockIfSessionPoolIsFull() {
+        return this.blockIfSessionPoolIsFull;
+    }
+
+    /**
+     * Returns the maximum number to pooled Connections that this factory will allow before it
+     * begins to return connections from the pool on calls to ({@link createConnection}.
+     *
+     * @return the maxConnections that will be created for this pool.
      */
     public int getMaxConnections() {
-        return maxConnections;
+        return connectionsPool.getMaxIdle();
     }
 
     /**
+     * Sets the maximum number of pooled Connections (defaults to one).  Each call to
+     * {@link createConnection} will result in a new Connection being create up to the max
+     * connections value.
+     *
      * @param maxConnections the maxConnections to set
      */
     public void setMaxConnections(int maxConnections) {
-        this.maxConnections = maxConnections;
+        this.connectionsPool.setMaxIdle(maxConnections);
     }
 
     /**
-     * Creates an ObjectPoolFactory. Its behavior is controlled by the two
-     * properties @see #maximumActive and @see #blockIfSessionPoolIsFull.
+     * Gets the Idle timeout value applied to new Connection's that are created by this pool.
+     * <p/>
+     * The idle timeout is used determine if a Connection instance has sat to long in the pool unused
+     * and if so is closed and removed from the pool.  The default value is 30 seconds.
      *
-     * @return the newly created but empty ObjectPoolFactory
+     * @return
      */
-    protected ObjectPoolFactory createPoolFactory() {
-         if (blockIfSessionPoolIsFull) {
-            return new GenericObjectPoolFactory(null, maximumActiveSessionPerConnection);
-        } else {
-            return new GenericObjectPoolFactory(null,
-                maximumActiveSessionPerConnection,
-                GenericObjectPool.WHEN_EXHAUSTED_FAIL,
-                GenericObjectPool.DEFAULT_MAX_WAIT);
-        }
-    }
-
     public int getIdleTimeout() {
         return idleTimeout;
     }
 
+    /**
+     * Sets the idle timeout value for Connection's that are created by this pool, defaults to 30 seconds.
+     * <p/>
+     * For a Connection that is in the pool but has no current users the idle timeout determines how
+     * long the Connection can live before it is eligible for removal from the pool.  Normally the
+     * connections are tested when an attempt to check one out occurs so a Connection instance can sit
+     * in the pool much longer than its idle timeout if connections are used infrequently.
+     *
+     *
+     * @param idleTimeout
+     *      The maximum time a pooled Connection can sit unused before it is eligible for removal.
+     */
     public void setIdleTimeout(int idleTimeout) {
         this.idleTimeout = idleTimeout;
     }
@@ -285,10 +415,16 @@ public class PooledConnectionFactory imp
         this.expiryTimeout = expiryTimeout;
     }
 
+    /**
+     * @return the configured expiration timeout for connections in the pool.
+     */
     public long getExpiryTimeout() {
         return expiryTimeout;
     }
 
+    /**
+     * @return true if a Connection is created immediately on a call to {@link start}.
+     */
     public boolean isCreateConnectionOnStartup() {
         return createConnectionOnStartup;
     }
@@ -296,7 +432,7 @@ public class PooledConnectionFactory imp
     /**
      * Whether to create a connection on starting this {@link PooledConnectionFactory}.
      * <p/>
-     * This can be used to warmup the pool on startup. Notice that any kind of exception
+     * This can be used to warm-up the pool on startup. Notice that any kind of exception
      * happens during startup is logged at WARN level and ignored.
      *
      * @param createConnectionOnStartup <tt>true</tt> to create a connection on startup
@@ -304,4 +440,48 @@ public class PooledConnectionFactory imp
     public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
         this.createConnectionOnStartup = createConnectionOnStartup;
     }
+
+    /**
+     * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
+     *
+     * @return this factories pool of ConnectionPool instances.
+     */
+    KeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
+        return this.connectionsPool;
+    }
+
+    /**
+     * Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread.
+     * When non-positive, no idle object eviction thread will be run, and Connections will only be
+     * checked on borrow to determine if they have sat idle for too long or have failed for some
+     * other reason.
+     * <p/>
+     * By default this value is set to -1 and no expiration thread ever runs.
+     *
+     * @param timeBetweenExpirationCheckMillis
+     *      The time to wait between runs of the idle Connection eviction thread.
+     */
+    public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
+        this.connectionsPool.setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
+    }
+
+    /**
+     * @return the number of milliseconds to sleep between runs of the idle connection eviction thread.
+     */
+    public long setTimeBetweenExpirationCheckMillis() {
+        return this.connectionsPool.getTimeBetweenEvictionRunsMillis();
+    }
+
+    /**
+     * @return the number of Connections currently in the Pool
+     */
+    public int getNumConnections() {
+        return this.connectionsPool.getNumIdle();
+    }
+
+    /**
+     * @deprecated
+     */
+    public void setPoolFactory(ObjectPoolFactory<?> factory) {
+    }
 }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java Wed Sep 12 03:44:48 2012
@@ -79,6 +79,6 @@ public class PooledMessageConsumer imple
 
     @Override
     public String toString() {
-        return delegate.toString();
+        return "PooledMessageConsumer { " + delegate + " }";
     }
 }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledProducer.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledProducer.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledProducer.java Wed Sep 12 03:44:48 2012
@@ -25,12 +25,12 @@ import org.apache.activemq.ActiveMQMessa
 
 /**
  * A pooled {@link MessageProducer}
- * 
- * 
  */
 public class PooledProducer implements MessageProducer {
-    private ActiveMQMessageProducer messageProducer;
-    private Destination destination;
+
+    private final ActiveMQMessageProducer messageProducer;
+    private final Destination destination;
+
     private int deliveryMode;
     private boolean disableMessageID;
     private boolean disableMessageTimestamp;
@@ -48,21 +48,26 @@ public class PooledProducer implements M
         this.timeToLive = messageProducer.getTimeToLive();
     }
 
+    @Override
     public void close() throws JMSException {
     }
 
+    @Override
     public void send(Destination destination, Message message) throws JMSException {
         send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
     }
 
+    @Override
     public void send(Message message) throws JMSException {
         send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
     }
 
+    @Override
     public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
         send(destination, message, deliveryMode, priority, timeToLive);
     }
 
+    @Override
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
         if (destination == null) {
             destination = this.destination;
@@ -75,46 +80,57 @@ public class PooledProducer implements M
         }
     }
 
+    @Override
     public Destination getDestination() {
         return destination;
     }
 
+    @Override
     public int getDeliveryMode() {
         return deliveryMode;
     }
 
+    @Override
     public void setDeliveryMode(int deliveryMode) {
         this.deliveryMode = deliveryMode;
     }
 
+    @Override
     public boolean getDisableMessageID() {
         return disableMessageID;
     }
 
+    @Override
     public void setDisableMessageID(boolean disableMessageID) {
         this.disableMessageID = disableMessageID;
     }
 
+    @Override
     public boolean getDisableMessageTimestamp() {
         return disableMessageTimestamp;
     }
 
+    @Override
     public void setDisableMessageTimestamp(boolean disableMessageTimestamp) {
         this.disableMessageTimestamp = disableMessageTimestamp;
     }
 
+    @Override
     public int getPriority() {
         return priority;
     }
 
+    @Override
     public void setPriority(int priority) {
         this.priority = priority;
     }
 
+    @Override
     public long getTimeToLive() {
         return timeToLive;
     }
 
+    @Override
     public void setTimeToLive(long timeToLive) {
         this.timeToLive = timeToLive;
     }
@@ -125,8 +141,8 @@ public class PooledProducer implements M
         return messageProducer;
     }
 
+    @Override
     public String toString() {
         return "PooledProducer { " + messageProducer + " }";
     }
-
 }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledQueueSender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledQueueSender.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledQueueSender.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledQueueSender.java Wed Sep 12 03:44:48 2012
@@ -25,7 +25,7 @@ import javax.jms.QueueSender;
 import org.apache.activemq.ActiveMQQueueSender;
 
 /**
- * 
+ * {@link QueueSender} instance that is created and managed by the PooledConnection.
  */
 public class PooledQueueSender extends PooledProducer implements QueueSender {
 
@@ -33,21 +33,22 @@ public class PooledQueueSender extends P
         super(messageProducer, destination);
     }
 
+    @Override
     public void send(Queue queue, Message message, int i, int i1, long l) throws JMSException {
         getQueueSender().send(queue, message, i, i1, l);
     }
 
+    @Override
     public void send(Queue queue, Message message) throws JMSException {
         getQueueSender().send(queue, message);
     }
 
+    @Override
     public Queue getQueue() throws JMSException {
         return getQueueSender().getQueue();
     }
 
-
     protected ActiveMQQueueSender getQueueSender() {
         return (ActiveMQQueueSender) getMessageProducer();
     }
-
 }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java Wed Sep 12 03:44:48 2012
@@ -51,28 +51,32 @@ import org.apache.activemq.ActiveMQQueue
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.ActiveMQTopicPublisher;
 import org.apache.activemq.AlreadyClosedException;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.commons.pool.KeyedObjectPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PooledSession implements Session, TopicSession, QueueSession, XASession {
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
 
+    private final SessionKey key;
+    private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
+    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
+    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
+    private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
+        new CopyOnWriteArrayList<PooledSessionEventListener>();
+
     private ActiveMQSession session;
-    private SessionPool sessionPool;
     private ActiveMQMessageProducer messageProducer;
     private ActiveMQQueueSender queueSender;
     private ActiveMQTopicPublisher topicPublisher;
     private boolean transactional = true;
     private boolean ignoreClose;
-
-    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
-    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
-    private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
-        new CopyOnWriteArrayList<PooledSessionEventListener>();
     private boolean isXa;
 
-    public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
-        this.session = aSession;
+    public PooledSession(SessionKey key, ActiveMQSession session, KeyedObjectPool<SessionKey, PooledSession> sessionPool) {
+        this.key = key;
+        this.session = session;
         this.sessionPool = sessionPool;
         this.transactional = session.isTransacted();
     }
@@ -92,10 +96,9 @@ public class PooledSession implements Se
         this.ignoreClose = ignoreClose;
     }
 
+    @Override
     public void close() throws JMSException {
         if (!ignoreClose) {
-            // TODO a cleaner way to reset??
-
             boolean invalidate = false;
             try {
                 // lets reset the session
@@ -130,8 +133,8 @@ public class PooledSession implements Se
             }
 
             if (invalidate) {
-                // lets close the session and not put the session back into
-                // the pool
+                // lets close the session and not put the session back into the pool
+                // instead invalidate it so the pool can create a new one on demand.
                 if (session != null) {
                     try {
                         session.close();
@@ -140,45 +143,62 @@ public class PooledSession implements Se
                     }
                     session = null;
                 }
-                sessionPool.invalidateSession(this);
+                try {
+                    sessionPool.invalidateObject(key, this);
+                } catch (Exception e) {
+                    throw JMSExceptionSupport.create(e);
+                }
             } else {
-                sessionPool.returnSession(this);
+                try {
+                    sessionPool.returnObject(key, this);
+                } catch (Exception e) {
+                    throw JMSExceptionSupport.create(e);
+                }
             }
         }
     }
 
+    @Override
     public void commit() throws JMSException {
         getInternalSession().commit();
     }
 
+    @Override
     public BytesMessage createBytesMessage() throws JMSException {
         return getInternalSession().createBytesMessage();
     }
 
+    @Override
     public MapMessage createMapMessage() throws JMSException {
         return getInternalSession().createMapMessage();
     }
 
+    @Override
     public Message createMessage() throws JMSException {
         return getInternalSession().createMessage();
     }
 
+    @Override
     public ObjectMessage createObjectMessage() throws JMSException {
         return getInternalSession().createObjectMessage();
     }
 
+    @Override
     public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
         return getInternalSession().createObjectMessage(serializable);
     }
 
+    @Override
     public Queue createQueue(String s) throws JMSException {
         return getInternalSession().createQueue(s);
     }
 
+    @Override
     public StreamMessage createStreamMessage() throws JMSException {
         return getInternalSession().createStreamMessage();
     }
 
+    @Override
     public TemporaryQueue createTemporaryQueue() throws JMSException {
         TemporaryQueue result;
 
@@ -192,6 +212,7 @@ public class PooledSession implements Se
         return result;
     }
 
+    @Override
     public TemporaryTopic createTemporaryTopic() throws JMSException {
         TemporaryTopic result;
 
@@ -205,38 +226,47 @@ public class PooledSession implements Se
         return result;
     }
 
+    @Override
     public void unsubscribe(String s) throws JMSException {
         getInternalSession().unsubscribe(s);
     }
 
+    @Override
     public TextMessage createTextMessage() throws JMSException {
         return getInternalSession().createTextMessage();
     }
 
+    @Override
     public TextMessage createTextMessage(String s) throws JMSException {
         return getInternalSession().createTextMessage(s);
     }
 
+    @Override
     public Topic createTopic(String s) throws JMSException {
         return getInternalSession().createTopic(s);
     }
 
+    @Override
     public int getAcknowledgeMode() throws JMSException {
         return getInternalSession().getAcknowledgeMode();
     }
 
+    @Override
     public boolean getTransacted() throws JMSException {
         return getInternalSession().getTransacted();
     }
 
+    @Override
     public void recover() throws JMSException {
         getInternalSession().recover();
     }
 
+    @Override
     public void rollback() throws JMSException {
         getInternalSession().rollback();
     }
 
+    @Override
     public XAResource getXAResource() {
         if (session == null) {
             throw new IllegalStateException("Session is closed");
@@ -244,10 +274,12 @@ public class PooledSession implements Se
         return session.getTransactionContext();
     }
 
+    @Override
     public Session getSession() {
         return this;
     }
 
+    @Override
     public void run() {
         if (session != null) {
             session.run();
@@ -256,68 +288,84 @@ public class PooledSession implements Se
 
     // Consumer related methods
     // -------------------------------------------------------------------------
+    @Override
     public QueueBrowser createBrowser(Queue queue) throws JMSException {
         return addQueueBrowser(getInternalSession().createBrowser(queue));
     }
 
+    @Override
     public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
         return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
     }
 
+    @Override
     public MessageConsumer createConsumer(Destination destination) throws JMSException {
         return addConsumer(getInternalSession().createConsumer(destination));
     }
 
+    @Override
     public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
         return addConsumer(getInternalSession().createConsumer(destination, selector));
     }
 
+    @Override
     public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
         return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
     }
 
+    @Override
     public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
         return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
     }
 
+    @Override
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
         return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
     }
 
+    @Override
     public MessageListener getMessageListener() throws JMSException {
         return getInternalSession().getMessageListener();
     }
 
+    @Override
     public void setMessageListener(MessageListener messageListener) throws JMSException {
         getInternalSession().setMessageListener(messageListener);
     }
 
+    @Override
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
         return addTopicSubscriber(getInternalSession().createSubscriber(topic));
     }
 
+    @Override
     public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
         return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
     }
 
+    @Override
     public QueueReceiver createReceiver(Queue queue) throws JMSException {
         return addQueueReceiver(getInternalSession().createReceiver(queue));
     }
 
+    @Override
     public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
         return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
     }
 
     // Producer related methods
     // -------------------------------------------------------------------------
+    @Override
     public MessageProducer createProducer(Destination destination) throws JMSException {
         return new PooledProducer(getMessageProducer(), destination);
     }
 
+    @Override
     public QueueSender createSender(Queue queue) throws JMSException {
         return new PooledQueueSender(getQueueSender(), queue);
     }
 
+    @Override
     public TopicPublisher createPublisher(Topic topic) throws JMSException {
         return new PooledTopicPublisher(getTopicPublisher(), topic);
     }
@@ -371,11 +419,9 @@ public class PooledSession implements Se
 
     private MessageConsumer addConsumer(MessageConsumer consumer) {
         consumers.add(consumer);
-        // must wrap in PooledMessageConsumer to ensure the onConsumerClose
-        // method is invoked
-        // when the returned consumer is closed, to avoid memory leak in this
-        // session class
-        // in case many consumers is created
+        // must wrap in PooledMessageConsumer to ensure the onConsumerClose method is
+        // invoked when the returned consumer is closed, to avoid memory leak in this
+        // session class in case many consumers is created
         return new PooledMessageConsumer(this, consumer);
     }
 
@@ -393,6 +439,7 @@ public class PooledSession implements Se
         this.isXa = isXa;
     }
 
+    @Override
     public String toString() {
         return "PooledSession { " + session + " }";
     }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledTopicPublisher.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledTopicPublisher.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledTopicPublisher.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledTopicPublisher.java Wed Sep 12 03:44:48 2012
@@ -25,7 +25,7 @@ import javax.jms.TopicPublisher;
 import org.apache.activemq.ActiveMQTopicPublisher;
 
 /**
- * 
+ * A {@link TopicPublisher} instance that is created and managed by a PooledConnection.
  */
 public class PooledTopicPublisher extends PooledProducer implements TopicPublisher {
 
@@ -33,22 +33,27 @@ public class PooledTopicPublisher extend
         super(messageProducer, destination);
     }
 
+    @Override
     public Topic getTopic() throws JMSException {
         return getTopicPublisher().getTopic();
     }
 
+    @Override
     public void publish(Message message) throws JMSException {
         getTopicPublisher().publish((Topic) getDestination(), message);
     }
 
+    @Override
     public void publish(Message message, int i, int i1, long l) throws JMSException {
         getTopicPublisher().publish((Topic) getDestination(), message, i, i1, l);
     }
 
+    @Override
     public void publish(Topic topic, Message message) throws JMSException {
         getTopicPublisher().publish(topic, message);
     }
 
+    @Override
     public void publish(Topic topic, Message message, int i, int i1, long l) throws JMSException {
         getTopicPublisher().publish(topic, message, i, i1, l);
     }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionKey.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionKey.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionKey.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionKey.java Wed Sep 12 03:44:48 2012
@@ -17,13 +17,13 @@
 package org.apache.activemq.pool;
 
 /**
- * A cache key for the session details
- *
- * 
+ * A cache key for the session details used to locate PooledSession intances.
  */
 public class SessionKey {
-    private boolean transacted;
-    private int ackMode;
+
+    private final boolean transacted;
+    private final int ackMode;
+
     private int hash;
 
     public SessionKey(boolean transacted, int ackMode) {

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java Wed Sep 12 03:44:48 2012
@@ -16,9 +16,6 @@
  */
 package org.apache.activemq.pool;
 
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.commons.pool.ObjectPoolFactory;
-
 import javax.jms.JMSException;
 import javax.jms.Session;
 import javax.transaction.RollbackException;
@@ -27,21 +24,32 @@ import javax.transaction.SystemException
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAResource;
 
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.commons.pool.ObjectPoolFactory;
+
 /**
  * An XA-aware connection pool.  When a session is created and an xa transaction is active,
  * the session will automatically be enlisted in the current transaction.
- * 
+ *
  * @author gnodet
  */
 public class XaConnectionPool extends ConnectionPool {
 
     private TransactionManager transactionManager;
 
-    public XaConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) {
-        super(connection, poolFactory);
+    public XaConnectionPool(ActiveMQConnection connection, TransactionManager transactionManager) {
+        super(connection);
         this.transactionManager = transactionManager;
     }
 
+    /**
+     * @deprecated
+     */
+    public XaConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) {
+        this(connection, transactionManager);
+    }
+
+    @Override
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
         try {
             boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
@@ -74,8 +82,7 @@ public class XaConnectionPool extends Co
     protected XAResource createXaResource(PooledSession session) throws JMSException {
         return session.getXAResource();
     }
-    
-    
+
     protected class Synchronization implements javax.transaction.Synchronization {
         private final PooledSession session;
 
@@ -85,7 +92,7 @@ public class XaConnectionPool extends Co
 
         public void beforeCompletion() {
         }
-        
+
         public void afterCompletion(int status) {
             try {
                 // This will return session to the pool.
@@ -99,5 +106,4 @@ public class XaConnectionPool extends Co
             }
         }
     }
-    
 }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java Wed Sep 12 03:44:48 2012
@@ -28,7 +28,7 @@ import org.apache.activemq.ActiveMQConne
 public class XaPooledConnectionFactory extends PooledConnectionFactory {
 
     private TransactionManager transactionManager;
-    
+
     public XaPooledConnectionFactory() {
         super();
     }
@@ -50,7 +50,6 @@ public class XaPooledConnectionFactory e
     }
 
     protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
-        return new XaConnectionPool(connection, getPoolFactory(), getTransactionManager());
+        return new XaConnectionPool(connection, getTransactionManager());
     }
-
 }

Modified: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java?rev=1383746&r1=1383745&r2=1383746&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java (original)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java Wed Sep 12 03:44:48 2012
@@ -16,60 +16,188 @@
  */
 package org.apache.activemq.pool;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
 import junit.framework.Assert;
 import junit.framework.Test;
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
+
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.util.Wait;
 import org.apache.log4j.Logger;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Session;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-
 /**
- * Checks the behavior of the PooledConnectionFactory when the maximum amount
- * of sessions is being reached.
- * Older versions simply block in the call to Connection.getSession(), which isn't good.
- * An exception being returned is the better option, so JMS clients don't block.
- * This test succeeds if an exception is returned and fails if the call to getSession()
- * blocks.
+ * Checks the behavior of the PooledConnectionFactory when the maximum amount of
+ * sessions is being reached.
  *
+ * Older versions simply block in the call to Connection.getSession(), which
+ * isn't good. An exception being returned is the better option, so JMS clients
+ * don't block. This test succeeds if an exception is returned and fails if the
+ * call to getSession() blocks.
  */
-public class PooledConnectionFactoryTest extends TestCase
-{
-    public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryTest.class);
+public class PooledConnectionFactoryTest extends TestCase {
 
+    public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryTest.class);
 
     /**
      * Create the test case
      *
-     * @param testName name of the test case
+     * @param testName
+     *            name of the test case
      */
-    public PooledConnectionFactoryTest( String testName )
-    {
-        super( testName );
+    public PooledConnectionFactoryTest(String testName) {
+        super(testName);
     }
 
     /**
      * @return the suite of tests being tested
      */
-    public static Test suite()
-    {
-        return new TestSuite( PooledConnectionFactoryTest.class );
+    public static Test suite() {
+        return new TestSuite(PooledConnectionFactoryTest.class);
+    }
+
+    public void testClearAllConnections() throws Exception {
+
+        ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+        PooledConnectionFactory cf = new PooledConnectionFactory(amq);
+        cf.setMaxConnections(3);
+
+        PooledConnection conn1 = (PooledConnection) cf.createConnection();
+        PooledConnection conn2 = (PooledConnection) cf.createConnection();
+        PooledConnection conn3 = (PooledConnection) cf.createConnection();
+
+        assertNotSame(conn1.getConnection(), conn2.getConnection());
+        assertNotSame(conn1.getConnection(), conn3.getConnection());
+        assertNotSame(conn2.getConnection(), conn3.getConnection());
+
+        assertEquals(3, cf.getNumConnections());
+
+        cf.clear();
+
+        assertEquals(0, cf.getNumConnections());
+
+        conn1 = (PooledConnection) cf.createConnection();
+        conn2 = (PooledConnection) cf.createConnection();
+        conn3 = (PooledConnection) cf.createConnection();
+
+        assertNotSame(conn1.getConnection(), conn2.getConnection());
+        assertNotSame(conn1.getConnection(), conn3.getConnection());
+        assertNotSame(conn2.getConnection(), conn3.getConnection());
+    }
+
+    public void testMaxConnectionsAreCreated() throws Exception {
+
+        ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+        PooledConnectionFactory cf = new PooledConnectionFactory(amq);
+        cf.setMaxConnections(3);
+
+        PooledConnection conn1 = (PooledConnection) cf.createConnection();
+        PooledConnection conn2 = (PooledConnection) cf.createConnection();
+        PooledConnection conn3 = (PooledConnection) cf.createConnection();
+
+        assertNotSame(conn1.getConnection(), conn2.getConnection());
+        assertNotSame(conn1.getConnection(), conn3.getConnection());
+        assertNotSame(conn2.getConnection(), conn3.getConnection());
+
+        assertEquals(3, cf.getNumConnections());
+    }
+
+    public void testConnectionsAreRotated() throws Exception {
+
+        ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+        PooledConnectionFactory cf = new PooledConnectionFactory(amq);
+        cf.setMaxConnections(10);
+
+        ActiveMQConnection previous = null;
+
+        // Front load the pool.
+        for (int i = 0; i < 10; ++i) {
+            cf.createConnection();
+        }
+
+        for (int i = 0; i < 100; ++i) {
+            ActiveMQConnection current = ((PooledConnection) cf.createConnection()).getConnection();
+            assertNotSame(previous, current);
+            previous = current;
+        }
+    }
+
+    public void testConnectionsArePooled() throws Exception {
+
+        ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+        PooledConnectionFactory cf = new PooledConnectionFactory(amq);
+        cf.setMaxConnections(1);
+
+        PooledConnection conn1 = (PooledConnection) cf.createConnection();
+        PooledConnection conn2 = (PooledConnection) cf.createConnection();
+        PooledConnection conn3 = (PooledConnection) cf.createConnection();
+
+        assertSame(conn1.getConnection(), conn2.getConnection());
+        assertSame(conn1.getConnection(), conn3.getConnection());
+        assertSame(conn2.getConnection(), conn3.getConnection());
+
+        assertEquals(1, cf.getNumConnections());
+    }
+
+    public void testConnectionsArePooledAsyncCreate() throws Exception {
+
+        final ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+        final PooledConnectionFactory cf = new PooledConnectionFactory(amq);
+        cf.setMaxConnections(1);
+
+        final ConcurrentLinkedQueue<PooledConnection> connections = new ConcurrentLinkedQueue<PooledConnection>();
+
+        final PooledConnection primary = (PooledConnection) cf.createConnection();
+        final ExecutorService executor = Executors.newFixedThreadPool(10);
+        final int numConnections = 100;
+
+        for (int i = 0; i < numConnections; ++i) {
+            executor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        connections.add((PooledConnection) cf.createConnection());
+                    } catch (JMSException e) {
+                    }
+                }
+            });
+        }
+
+        assertTrue("", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return connections.size() == numConnections;
+            }
+        }));
+
+        executor.shutdown();
+        assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+
+        for(PooledConnection connection : connections) {
+            assertSame(primary.getConnection(), connection.getConnection());
+        }
+
+        connections.clear();
     }
 
     /**
-     * Tests the behavior of the sessionPool of the PooledConnectionFactory
-     * when maximum number of sessions are reached.
+     * Tests the behavior of the sessionPool of the PooledConnectionFactory when
+     * maximum number of sessions are reached.
      */
-    public void testApp() throws Exception
-    {
+    public void testApp() throws Exception {
         // using separate thread for testing so that we can interrupt the test
         // if the call to get a new session blocks.
 
@@ -78,74 +206,74 @@ public class PooledConnectionFactoryTest
         Future<Boolean> result = (Future<Boolean>) executor.submit(new TestRunner());
 
         // test should not take > 5secs, so test fails i
-        Thread.sleep(5*1000);
+        Thread.sleep(5 * 1000);
 
         if (!result.isDone() || !result.get().booleanValue()) {
-            PooledConnectionFactoryTest.LOG.error("2nd call to createSession()" +
-            " is blocking but should have returned an error instead.");
+            PooledConnectionFactoryTest.LOG.error("2nd call to createSession()" + " is blocking but should have returned an error instead.");
 
             executor.shutdownNow();
 
-            Assert.fail("SessionPool inside PooledConnectionFactory is blocking if " +
-            "limit is exceeded but should return an exception instead.");
+            Assert.fail("SessionPool inside PooledConnectionFactory is blocking if " + "limit is exceeded but should return an exception instead.");
         }
     }
-}
-
-class TestRunner implements Callable<Boolean> {
 
-    public final static Logger LOG = Logger.getLogger(TestRunner.class);
+    static class TestRunner implements Callable<Boolean> {
 
-    /**
-     * @return true if test succeeded, false otherwise
-     */
-    public Boolean call() {
-
-        Connection conn = null;
-        Session one = null;
+        public final static Logger LOG = Logger.getLogger(TestRunner.class);
 
-        // wait at most 5 seconds for the call to createSession
-        try {
-            ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
-            PooledConnectionFactory cf = new PooledConnectionFactory(amq);
-            cf.setMaxConnections(3);
-            cf.setMaximumActiveSessionPerConnection(1);
-            cf.setBlockIfSessionPoolIsFull(false);
+        /**
+         * @return true if test succeeded, false otherwise
+         */
+        public Boolean call() {
 
-            conn = cf.createConnection();
-            one = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Connection conn = null;
+            Session one = null;
 
-            Session two = null;
+            // wait at most 5 seconds for the call to createSession
             try {
-                // this should raise an exception as we called setMaximumActive(1)
-                two = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-                two.close();
-
-                LOG.error("Expected JMSException wasn't thrown.");
-                Assert.fail("seconds call to Connection.createSession() was supposed" +
-                        "to raise an JMSException as internal session pool" +
-                        "is exhausted. This did not happen and indiates a problem");
-                return new Boolean(false);
-            } catch (JMSException ex) {
-                if (ex.getCause().getClass() == java.util.NoSuchElementException.class) {
-                    //expected, ignore but log
-                    LOG.info("Caught expected " + ex);
-                } else {
-                    LOG.error(ex);
+                ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+                PooledConnectionFactory cf = new PooledConnectionFactory(amq);
+                cf.setMaxConnections(3);
+                cf.setMaximumActiveSessionPerConnection(1);
+                cf.setBlockIfSessionPoolIsFull(false);
+
+                conn = cf.createConnection();
+                one = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                Session two = null;
+                try {
+                    // this should raise an exception as we called
+                    // setMaximumActive(1)
+                    two = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    two.close();
+
+                    LOG.error("Expected JMSException wasn't thrown.");
+                    Assert.fail("seconds call to Connection.createSession() was supposed" + "to raise an JMSException as internal session pool"
+                            + "is exhausted. This did not happen and indiates a problem");
                     return new Boolean(false);
+                } catch (JMSException ex) {
+                    if (ex.getCause().getClass() == java.util.NoSuchElementException.class) {
+                        // expected, ignore but log
+                        LOG.info("Caught expected " + ex);
+                    } else {
+                        LOG.error(ex);
+                        return new Boolean(false);
+                    }
+                } finally {
+                    if (one != null)
+                        one.close();
+                    if (conn != null)
+                        conn.close();
                 }
-            } finally {
-                if (one != null)
-                    one.close();
-                if (conn != null)
-                    conn.close();
+            } catch (Exception ex) {
+                LOG.error(ex.getMessage());
+                return new Boolean(false);
             }
-        } catch (Exception ex) {
-            LOG.error(ex.getMessage());
-            return new Boolean(false);
-        }
 
-        // all good, test succeeded
-        return new Boolean(true);
+            // all good, test succeeded
+            return new Boolean(true);
+        }
     }
 }
+
+