You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by tj...@apache.org on 2014/06/30 18:55:06 UTC

svn commit: r1606837 [25/27] - in /aries/branches/subsystemsR6: ./ application/ application/application-api/ application/application-bundle/ application/application-converters/ application/application-default-local-platform/ application/application-dep...

Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java Mon Jun 30 16:54:57 2014
@@ -24,11 +24,14 @@ import java.util.concurrent.atomic.Atomi
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
 
 import org.apache.aries.transaction.jms.internal.ConnectionKey;
 import org.apache.aries.transaction.jms.internal.ConnectionPool;
-import org.apache.aries.transaction.jms.internal.IOExceptionSupport;
 import org.apache.aries.transaction.jms.internal.PooledConnection;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.pool.ObjectPoolFactory;
@@ -37,8 +40,8 @@ import org.apache.commons.pool.impl.Gene
 
 /**
  * A JMS provider which pools Connection, Session and MessageProducer instances
- * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's <a
- * href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
+ * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's
+ * <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
  * Connections, sessions and producers are returned to a pool after use so that they can be reused later
  * without having to undergo the cost of creating them again.
  *
@@ -55,77 +58,198 @@ import org.apache.commons.pool.impl.Gene
  * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
  * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
  *
+ * Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the
+ * pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should
+ * be used when configuring this optional feature. Eviction runs contend with client threads for access
+ * to objects in the pool, so if they run too frequently performance issues may result. The idle object
+ * eviction thread may be configured using the {@link PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method.  By
+ * default the value is -1 which means no eviction thread will be run.  Set to a non-negative value to
+ * configure the idle eviction thread to run.
+ *
  * @org.apache.xbean.XBean element="pooledConnectionFactory"
  */
 public class PooledConnectionFactory implements ConnectionFactory {
-
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
+
+    protected final AtomicBoolean stopped = new AtomicBoolean(false);
+    private GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
+
     private ConnectionFactory connectionFactory;
-    private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
-    private ObjectPoolFactory poolFactory;
-    private int maximumActive = 500;
-    private int maxConnections = 1;
+
+    private int maximumActiveSessionPerConnection = 500;
     private int idleTimeout = 30 * 1000;
     private boolean blockIfSessionPoolIsFull = true;
-    private AtomicBoolean stopped = new AtomicBoolean(false);
+    private long blockIfSessionPoolIsFullTimeout = -1L;
     private long expiryTimeout = 0l;
+    private boolean createConnectionOnStartup = true;
+    private boolean useAnonymousProducers = true;
 
-    public PooledConnectionFactory() {
+    public void initConnectionsPool() {
+        if (this.connectionsPool == null) {
+            this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
+                    new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() {
+
+                        @Override
+                        public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
+                        }
+
+                        @Override
+                        public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception {
+                            try {
+                                if (LOG.isTraceEnabled()) {
+                                    LOG.trace("Destroying connection: {}", connection);
+                                }
+                                connection.close();
+                            } catch (Exception e) {
+                                LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
+                            }
+                        }
+
+                        @Override
+                        public ConnectionPool makeObject(ConnectionKey key) throws Exception {
+                            Connection delegate = createConnection(key);
+
+                            ConnectionPool connection = createConnectionPool(delegate);
+                            connection.setIdleTimeout(getIdleTimeout());
+                            connection.setExpiryTimeout(getExpiryTimeout());
+                            connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
+                            connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
+                            if (isBlockIfSessionPoolIsFull() && getBlockIfSessionPoolIsFullTimeout() > 0) {
+                                connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout());
+                            }
+                            connection.setUseAnonymousProducers(isUseAnonymousProducers());
+
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Created new connection: {}", connection);
+                            }
+
+                            return connection;
+                        }
+
+                        @Override
+                        public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
+                        }
+
+                        @Override
+                        public boolean validateObject(ConnectionKey key, ConnectionPool connection) {
+                            if (connection != null && connection.expiredCheck()) {
+                                if (LOG.isTraceEnabled()) {
+                                    LOG.trace("Connection has expired: {} and will be destroyed", connection);
+                                }
+
+                                return false;
+                            }
+
+                            return true;
+                        }
+                    });
+
+            // Set max idle (not max active) since our connections always idle in the pool.
+            this.connectionsPool.setMaxIdle(1);
+
+            // We always want our validate method to control when idle objects are evicted.
+            this.connectionsPool.setTestOnBorrow(true);
+            this.connectionsPool.setTestWhileIdle(true);
+        }
     }
 
+    /**
+     * @return the currently configured ConnectionFactory used to create the pooled Connections.
+     */
     public ConnectionFactory getConnectionFactory() {
         return connectionFactory;
     }
 
     /**
-     * The actual JMS ConnectionFactory that will be pooled.
+     * Sets the ConnectionFactory used to create new pooled Connections.
+     * <p/>
+     * Updates to this value do not affect Connections that were previously created and placed
+     * into the pool.  In order to allocate new Connections based off this new ConnectionFactory
+     * it is first necessary to {@link #clear()} the pooled Connections.
+     *
+     * @param toUse
+     *      The factory to use to create pooled Connections.
      */
-    public void setConnectionFactory(ConnectionFactory connectionFactory) {
-        this.connectionFactory = connectionFactory;
+    public void setConnectionFactory(final ConnectionFactory toUse) {
+        if (toUse instanceof XAConnectionFactory) {
+            this.connectionFactory = new XAConnectionFactoryWrapper((XAConnectionFactory) toUse);
+        } else {
+            this.connectionFactory = toUse;
+        }
     }
 
+    @Override
     public Connection createConnection() throws JMSException {
         return createConnection(null, null);
     }
 
+    @Override
     public synchronized Connection createConnection(String userName, String password) throws JMSException {
         if (stopped.get()) {
             LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
             return null;
         }
 
+        ConnectionPool connection = null;
         ConnectionKey key = new ConnectionKey(userName, password);
-        LinkedList<ConnectionPool> pools = cache.get(key);
 
-        if (pools == null) {
-            pools = new LinkedList<ConnectionPool>();
-            cache.put(key, pools);
+        // This will either return an existing non-expired ConnectionPool or it
+        // will create a new one to meet the demand.
+        if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) {
+            try {
+                // we want borrowObject to return the one we added.
+                connectionsPool.setLifo(true);
+                connectionsPool.addObject(key);
+            } catch (Exception e) {
+                throw createJmsException("Error while attempting to add new Connection to the pool", e);
+            }
+        } else {
+            // now we want the oldest one in the pool.
+            connectionsPool.setLifo(false);
         }
 
-        ConnectionPool connection = null;
-        if (pools.size() == maxConnections) {
-            connection = pools.removeFirst();
-        }
+        try {
 
-        // Now.. we might get a connection, but it might be that we need to
-        // dump it..
-        if (connection != null && connection.expiredCheck()) {
-            connection = null;
+            // We can race against other threads returning the connection when there is an
+            // expiration or idle timeout.  We keep pulling out ConnectionPool instances until
+            // we win and get a non-closed instance and then increment the reference count
+            // under lock to prevent another thread from triggering an expiration check and
+            // pulling the rug out from under us.
+            while (connection == null) {
+                connection = connectionsPool.borrowObject(key);
+                synchronized (connection) {
+                    if (connection.getConnection() != null) {
+                        connection.incrementReferenceCount();
+                        break;
+                    }
+
+                    // Return the bad one to the pool and let if get destroyed as normal.
+                    connectionsPool.returnObject(key, connection);
+                    connection = null;
+                }
+            }
+        } catch (Exception e) {
+            throw createJmsException("Error while attempting to retrieve a connection from the pool", e);
         }
 
-        if (connection == null) {
-            Connection delegate = createConnection(key);
-            connection = createConnectionPool(delegate);
+        try {
+            connectionsPool.returnObject(key, connection);
+        } catch (Exception e) {
+            throw createJmsException("Error when returning connection to the pool", e);
         }
-        pools.add(connection);
+
+        return newPooledConnection(connection);
+    }
+
+    protected Connection newPooledConnection(ConnectionPool connection) {
         return new PooledConnection(connection);
     }
 
-    protected ConnectionPool createConnectionPool(Connection connection) throws JMSException {
-        ConnectionPool result =  new ConnectionPool(connection, getPoolFactory());
-        result.setIdleTimeout(getIdleTimeout());
-        result.setExpiryTimeout(getExpiryTimeout());
-        return result;
+    private JMSException createJmsException(String msg, Exception cause) {
+        JMSException exception = new JMSException(msg);
+        exception.setLinkedException(cause);
+        exception.initCause(cause);
+        return exception;
     }
 
     protected Connection createConnection(ConnectionKey key) throws JMSException {
@@ -137,54 +261,65 @@ public class PooledConnectionFactory imp
     }
 
     public void start() {
-        try {
-            stopped.set(false);
-            createConnection();
-        } catch (JMSException e) {
-            LOG.warn("Create pooled connection during start failed.", e);
-            IOExceptionSupport.create(e);
+        LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
+        stopped.set(false);
+        if (isCreateConnectionOnStartup()) {
+            try {
+                // warm the pool by creating a connection during startup
+                createConnection();
+            } catch (JMSException e) {
+                LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e);
+            }
         }
     }
 
     public void stop() {
-        LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size());
-        stopped.set(true);
-        for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
-            for (ConnectionPool connection : iter.next()) {
-                try {
-                    connection.close();
-                }catch(Exception e) {
-                    LOG.warn("Close connection failed",e);
+        if (stopped.compareAndSet(false, true)) {
+            LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
+                    connectionsPool != null ? connectionsPool.getNumActive() : 0);
+            try {
+                if (connectionsPool != null) {
+                    connectionsPool.close();
                 }
+            } catch (Exception e) {
             }
         }
-        cache.clear();
     }
 
-    public ObjectPoolFactory getPoolFactory() {
-        if (poolFactory == null) {
-            poolFactory = createPoolFactory();
+    /**
+     * Clears all connections from the pool.  Each connection that is currently in the pool is
+     * closed and removed from the pool.  A new connection will be created on the next call to
+     * {@link #createConnection()}.  Care should be taken when using this method as Connections that
+     * are in use be client's will be closed.
+     */
+    public void clear() {
+
+        if (stopped.get()) {
+            return;
         }
-        return poolFactory;
+
+        getConnectionsPool().clear();
     }
 
     /**
-     * Sets the object pool factory used to create individual session pools for
-     * each connection
+     * Returns the currently configured maximum number of sessions a pooled Connection will
+     * create before it either blocks or throws an exception when a new session is requested,
+     * depending on configuration.
+     *
+     * @return the number of session instances that can be taken from a pooled connection.
      */
-    public void setPoolFactory(ObjectPoolFactory poolFactory) {
-        this.poolFactory = poolFactory;
-    }
-
-    public int getMaximumActive() {
-        return maximumActive;
+    public int getMaximumActiveSessionPerConnection() {
+        return maximumActiveSessionPerConnection;
     }
 
     /**
      * Sets the maximum number of active sessions per connection
+     *
+     * @param maximumActiveSessionPerConnection
+     *      The maximum number of active session per connection in the pool.
      */
-    public void setMaximumActive(int maximumActive) {
-        this.maximumActive = maximumActive;
+    public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
+        this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
     }
 
     /**
@@ -204,57 +339,68 @@ public class PooledConnectionFactory imp
     }
 
     /**
-     * @return the maxConnections
+     * Returns whether a pooled Connection will enter a blocked state or will throw an Exception
+     * once the maximum number of sessions has been borrowed from the the Session Pool.
+     *
+     * @return true if the pooled Connection createSession method will block when the limit is hit.
+     * @see #setBlockIfSessionPoolIsFull(boolean)
+     */
+    public boolean isBlockIfSessionPoolIsFull() {
+        return this.blockIfSessionPoolIsFull;
+    }
+
+    /**
+     * Returns the maximum number to pooled Connections that this factory will allow before it
+     * begins to return connections from the pool on calls to ({@link #createConnection()}.
+     *
+     * @return the maxConnections that will be created for this pool.
      */
     public int getMaxConnections() {
-        return maxConnections;
+        return getConnectionsPool().getMaxIdle();
     }
 
     /**
-     * Number of JMS connections to use.  The default is 1 to use a single connection
-     * to the broker.  For high throughput, it may be interesting to raise this number
-     * a bit.
+     * Sets the maximum number of pooled Connections (defaults to one).  Each call to
+     * {@link #createConnection()} will result in a new Connection being create up to the max
+     * connections value.
      *
      * @param maxConnections the maxConnections to set
      */
     public void setMaxConnections(int maxConnections) {
-        this.maxConnections = maxConnections;
+        getConnectionsPool().setMaxIdle(maxConnections);
     }
 
     /**
-     * Creates an ObjectPoolFactory. Its behavior is controlled by the two
-     * properties @see #maximumActive and @see #blockIfSessionPoolIsFull.
+     * Gets the Idle timeout value applied to new Connection's that are created by this pool.
+     * <p/>
+     * The idle timeout is used determine if a Connection instance has sat to long in the pool unused
+     * and if so is closed and removed from the pool.  The default value is 30 seconds.
      *
-     * @return the newly created but empty ObjectPoolFactory
+     * @return idle timeout value (milliseconds)
      */
-    protected ObjectPoolFactory createPoolFactory() {
-         if (blockIfSessionPoolIsFull) {
-            return new GenericObjectPoolFactory(null, maximumActive);
-        } else {
-            return new GenericObjectPoolFactory(null,
-                maximumActive,
-                GenericObjectPool.WHEN_EXHAUSTED_FAIL,
-                GenericObjectPool.DEFAULT_MAX_WAIT);
-        }
-    }
-
     public int getIdleTimeout() {
         return idleTimeout;
     }
 
     /**
-     * Specifies the amount of milliseconds after which an idle connection is discarded.
-     * Defaults to 30 seconds.
+     * Sets the idle timeout  value for Connection's that are created by this pool in Milliseconds,
+     * defaults to 30 seconds.
+     * <p/>
+     * For a Connection that is in the pool but has no current users the idle timeout determines how
+     * long the Connection can live before it is eligible for removal from the pool.  Normally the
+     * connections are tested when an attempt to check one out occurs so a Connection instance can sit
+     * in the pool much longer than its idle timeout if connections are used infrequently.
      *
-     * @param idleTimeout non zero in milliseconds
+     * @param idleTimeout
+     *      The maximum time a pooled Connection can sit unused before it is eligible for removal.
      */
     public void setIdleTimeout(int idleTimeout) {
         this.idleTimeout = idleTimeout;
     }
 
     /**
-     * Allow connections to expire, irrespective of load or idle time. This is useful with failover
-     * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery.
+     * allow connections to expire, irrespective of load or idle time. This is useful with failover
+     * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
      *
      * @param expiryTimeout non zero in milliseconds
      */
@@ -262,7 +408,162 @@ public class PooledConnectionFactory imp
         this.expiryTimeout = expiryTimeout;
     }
 
+    /**
+     * @return the configured expiration timeout for connections in the pool.
+     */
     public long getExpiryTimeout() {
         return expiryTimeout;
     }
+
+    /**
+     * @return true if a Connection is created immediately on a call to {@link #start()}.
+     */
+    public boolean isCreateConnectionOnStartup() {
+        return createConnectionOnStartup;
+    }
+
+    /**
+     * Whether to create a connection on starting this {@link PooledConnectionFactory}.
+     * <p/>
+     * This can be used to warm-up the pool on startup. Notice that any kind of exception
+     * happens during startup is logged at WARN level and ignored.
+     *
+     * @param createConnectionOnStartup <tt>true</tt> to create a connection on startup
+     */
+    public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
+        this.createConnectionOnStartup = createConnectionOnStartup;
+    }
+
+    /**
+     * Should Sessions use one anonymous producer for all producer requests or should a new
+     * MessageProducer be created for each request to create a producer object, default is true.
+     *
+     * When enabled the session only needs to allocate one MessageProducer for all requests and
+     * the MessageProducer#send(destination, message) method can be used.  Normally this is the
+     * right thing to do however it does result in the Broker not showing the producers per
+     * destination.
+     *
+     * @return true if a PooledSession will use only a single anonymous message producer instance.
+     */
+    public boolean isUseAnonymousProducers() {
+        return this.useAnonymousProducers;
+    }
+
+    /**
+     * Sets whether a PooledSession uses only one anonymous MessageProducer instance or creates
+     * a new MessageProducer for each call the create a MessageProducer.
+     *
+     * @param value
+     *      Boolean value that configures whether anonymous producers are used.
+     */
+    public void setUseAnonymousProducers(boolean value) {
+        this.useAnonymousProducers = value;
+    }
+
+    /**
+     * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
+     *
+     * @return this factories pool of ConnectionPool instances.
+     */
+    protected GenericKeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
+        initConnectionsPool();
+        return this.connectionsPool;
+    }
+
+    /**
+     * Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread.
+     * When non-positive, no idle object eviction thread will be run, and Connections will only be
+     * checked on borrow to determine if they have sat idle for too long or have failed for some
+     * other reason.
+     * <p/>
+     * By default this value is set to -1 and no expiration thread ever runs.
+     *
+     * @param timeBetweenExpirationCheckMillis
+     *      The time to wait between runs of the idle Connection eviction thread.
+     */
+    public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
+        getConnectionsPool().setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
+    }
+
+    /**
+     * @return the number of milliseconds to sleep between runs of the idle connection eviction thread.
+     */
+    public long getTimeBetweenExpirationCheckMillis() {
+        return getConnectionsPool().getTimeBetweenEvictionRunsMillis();
+    }
+
+    /**
+     * @return the number of Connections currently in the Pool
+     */
+    public int getNumConnections() {
+        return getConnectionsPool().getNumIdle();
+    }
+
+    /**
+     * Delegate that creates each instance of an ConnectionPool object.  Subclasses can override
+     * this method to customize the type of connection pool returned.
+     *
+     * @param connection
+     *
+     * @return instance of a new ConnectionPool.
+     */
+    protected ConnectionPool createConnectionPool(Connection connection) {
+        return new ConnectionPool(connection);
+    }
+
+    /**
+     * Returns the timeout to use for blocking creating new sessions
+     *
+     * @return true if the pooled Connection createSession method will block when the limit is hit.
+     * @see #setBlockIfSessionPoolIsFull(boolean)
+     */
+    public long getBlockIfSessionPoolIsFullTimeout() {
+        return blockIfSessionPoolIsFullTimeout;
+    }
+
+    /**
+     * Controls the behavior of the internal session pool. By default the call to
+     * Connection.getSession() will block if the session pool is full.  This setting
+     * will affect how long it blocks and throws an exception after the timeout.
+     *
+     * The size of the session pool is controlled by the @see #maximumActive
+     * property.
+     *
+     * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull
+     * property
+     *
+     * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true,
+     *                                        then use this setting to configure how long to block before retry
+     */
+    public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
+        this.blockIfSessionPoolIsFullTimeout = blockIfSessionPoolIsFullTimeout;
+    }
+
+    static class XAConnectionFactoryWrapper implements XAConnectionFactory, ConnectionFactory {
+        private final XAConnectionFactory delegate;
+
+        XAConnectionFactoryWrapper(XAConnectionFactory delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public Connection createConnection() throws JMSException {
+            return createXAConnection();
+        }
+
+        @Override
+        public Connection createConnection(String userName, String password) throws JMSException {
+            return createXAConnection(userName, password);
+        }
+
+        @Override
+        public XAConnection createXAConnection() throws JMSException {
+            return delegate.createXAConnection();
+        }
+
+        @Override
+        public XAConnection createXAConnection(String userName, String password) throws JMSException {
+            return delegate.createXAConnection(userName, password);
+        }
+    }
 }

Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java Mon Jun 30 16:54:57 2014
@@ -54,11 +54,8 @@ public class RecoverablePooledConnection
         this.name = name;
     }
 
-    protected ConnectionPool createConnectionPool(Connection connection) throws JMSException {
-    	if (!(connection instanceof XAConnection)) {
-    		throw new JMSException("Require an instance of javax.jms.XAConnection for creating the ConnectionPool");
-    	}
-        return new RecoverableConnectionPool((XAConnection)connection, getPoolFactory(), getTransactionManager(), getName());
+    protected ConnectionPool createConnectionPool(Connection connection) {
+        return new RecoverableConnectionPool(connection, getTransactionManager(), getName());
     }
 
     /**
@@ -67,7 +64,7 @@ public class RecoverablePooledConnection
     @Override
     public void start() {
         if (getConnectionFactory() == null) {
-            throw new IllegalArgumentException("connectionFactory or xaConnectionFactory must be set");
+            throw new IllegalArgumentException("connectionFactory must be set");
         }
         if (getTransactionManager() == null) {
             throw new IllegalArgumentException("transactionManager must be set");

Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java Mon Jun 30 16:54:57 2014
@@ -17,78 +17,99 @@
 package org.apache.aries.transaction.jms.internal;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Session;
 
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.ObjectPoolFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Holds a real JMS connection along with the session pools associated with it.
- *
- *
+ * <p/>
+ * Instances of this class are shared amongst one or more PooledConnection object and must
+ * track the session objects that are loaned out for cleanup on close as well as ensuring
+ * that the temporary destinations of the managed Connection are purged when all references
+ * to this ConnectionPool are released.
  */
 public class ConnectionPool {
 
-    private Connection connection;
-    private ConcurrentHashMap<SessionKey, SessionPool> cache;
-    private ConcurrentLinkedQueue<PooledSession> loanedSessions = new ConcurrentLinkedQueue<PooledSession>();
-    private AtomicBoolean started = new AtomicBoolean(false);
+    private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);
+
+    protected Connection connection;
     private int referenceCount;
-    private ObjectPoolFactory poolFactory;
     private long lastUsed = System.currentTimeMillis();
-    private long firstUsed = lastUsed;
-    private boolean hasFailed;
+    private final long firstUsed = lastUsed;
     private boolean hasExpired;
     private int idleTimeout = 30 * 1000;
     private long expiryTimeout = 0l;
+    private boolean useAnonymousProducers = true;
 
-    public ConnectionPool(Connection connection, ObjectPoolFactory poolFactory) throws JMSException {
-        this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory);
-        /*
-        TODO: activemq specific
-        // Add a transport Listener so that we can notice if this connection
-        // should be expired due to a connection failure.
-        connection.addTransportListener(new TransportListener() {
-            public void onCommand(Object command) {
-            }
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool;
+    private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
 
-            public void onException(IOException error) {
-                synchronized (ConnectionPool.this) {
-                    hasFailed = true;
-                }
-            }
+    public ConnectionPool(Connection connection) {
 
-            public void transportInterupted() {
-            }
+        this.connection = wrap(connection);
 
-            public void transportResumed() {
-            }
-        });
+        // Create our internal Pool of session instances.
+        this.sessionPool = new GenericKeyedObjectPool<SessionKey, PooledSession>(
+                new KeyedPoolableObjectFactory<SessionKey, PooledSession>() {
+
+                    @Override
+                    public void activateObject(SessionKey key, PooledSession session) throws Exception {
+                        ConnectionPool.this.loanedSessions.add(session);
+                    }
+
+                    @Override
+                    public void destroyObject(SessionKey key, PooledSession session) throws Exception {
+                        ConnectionPool.this.loanedSessions.remove(session);
+                        session.getInternalSession().close();
+                    }
 
-        // make sure that we set the hasFailed flag, in case the transport already failed
-        // prior to the addition of our new TransportListener
-        if(connection.isTransportFailed()) {
-            hasFailed = true;
-        }
-        */
-        connection.setExceptionListener(new ExceptionListener() {
-            public void onException(JMSException exception) {
-                synchronized (ConnectionPool.this) {
-                    hasFailed = true;
+                    @Override
+                    public PooledSession makeObject(SessionKey key) throws Exception {
+                        Session session = makeSession(key);
+                        return new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers);
+                    }
+
+                    @Override
+                    public void passivateObject(SessionKey key, PooledSession session) throws Exception {
+                        ConnectionPool.this.loanedSessions.remove(session);
+                    }
+
+                    @Override
+                    public boolean validateObject(SessionKey key, PooledSession session) {
+                        return true;
+                    }
                 }
-            }
-        });
+        );
+    }
+
+    // useful when external failure needs to force expiry
+    public void setHasExpired(boolean val) {
+        hasExpired = val;
+    }
+
+    protected Session makeSession(SessionKey key) throws JMSException {
+        return connection.createSession(key.isTransacted(), key.getAckMode());
+    }
+
+    protected Connection wrap(Connection connection) {
+        return connection;
     }
 
-    public ConnectionPool(Connection connection, ConcurrentHashMap<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
-        this.connection = connection;
-        this.cache = cache;
-        this.poolFactory = poolFactory;
+    protected void unWrap(Connection connection) {
     }
 
     public void start() throws JMSException {
@@ -102,70 +123,28 @@ public class ConnectionPool {
         }
     }
 
-    public synchronized javax.jms.Connection getConnection() {
+    public synchronized Connection getConnection() {
         return connection;
     }
 
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
         SessionKey key = new SessionKey(transacted, ackMode);
-        SessionPool pool = null;
-        pool = cache.get(key);
-        if (pool == null) {
-            SessionPool newPool = createSessionPool(key);
-            SessionPool prevPool = cache.putIfAbsent(key, newPool);
-            if (prevPool != null && prevPool != newPool) {
-                // newPool was not the first one to be associated with this
-                // key... close created session pool
-                try {
-                    newPool.close();
-                } catch (Exception e) {
-                    throw new JMSException(e.getMessage());
-                }
-            }
-            pool = cache.get(key); // this will return a non-null value...
-        }
-        PooledSession session = pool.borrowSession();
-        this.loanedSessions.add(session);
-        return session;
-    }
-    
-    
-    public Session createXaSession(boolean transacted, int ackMode) throws JMSException {
-        SessionKey key = new SessionKey(transacted, ackMode);
-        SessionPool pool = null;
-        pool = cache.get(key);
-        if (pool == null) {
-            SessionPool newPool = createSessionPool(key);
-            SessionPool prevPool = cache.putIfAbsent(key, newPool);
-            if (prevPool != null && prevPool != newPool) {
-                // newPool was not the first one to be associated with this
-                // key... close created session pool
-                try {
-                    newPool.close();
-                } catch (Exception e) {
-                    throw new JMSException(e.getMessage());
-                }
-            }
-            pool = cache.get(key); // this will return a non-null value...
+        PooledSession session;
+        try {
+            session = sessionPool.borrowObject(key);
+        } catch (Exception e) {
+            javax.jms.IllegalStateException illegalStateException = new IllegalStateException(e.toString());
+            illegalStateException.initCause(e);
+            throw illegalStateException;
         }
-        PooledSession session = pool.borrowSession();
-        this.loanedSessions.add(session);
         return session;
     }
-    
 
     public synchronized void close() {
         if (connection != null) {
             try {
-                Iterator<SessionPool> i = cache.values().iterator();
-                while (i.hasNext()) {
-                    SessionPool pool = i.next();
-                    i.remove();
-                    try {
-                        pool.close();
-                    } catch (Exception e) {
-                    }
-                }
+                sessionPool.close();
+            } catch (Exception e) {
             } finally {
                 try {
                     connection.close();
@@ -186,8 +165,10 @@ public class ConnectionPool {
         referenceCount--;
         lastUsed = System.currentTimeMillis();
         if (referenceCount == 0) {
-            expiredCheck();
-
+            // Loaned sessions are those that are active in the sessionPool and
+            // have not been closed by the client before closing the connection.
+            // These need to be closed so that all session's reflect the fact
+            // that the parent Connection is closed.
             for (PooledSession session : this.loanedSessions) {
                 try {
                     session.close();
@@ -196,40 +177,53 @@ public class ConnectionPool {
             }
             this.loanedSessions.clear();
 
-            // only clean up temp destinations when all users
-            // of this connection have called close
-            if (getConnection() != null) {
-                /*
-                TODO: activemq specific
-                getConnection().cleanUpTempDestinations();
-                */
-            }
+            unWrap(getConnection());
+
+            expiredCheck();
         }
     }
 
     /**
+     * Determines if this Connection has expired.
+     * <p/>
+     * A ConnectionPool is considered expired when all references to it are released AND either
+     * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed.
+     * Once a ConnectionPool is determined to have expired its underlying Connection is closed.
+     *
      * @return true if this connection has expired.
      */
     public synchronized boolean expiredCheck() {
+
+        boolean expired = false;
+
         if (connection == null) {
             return true;
         }
+
         if (hasExpired) {
             if (referenceCount == 0) {
                 close();
+                expired = true;
             }
-            return true;
         }
-        if (hasFailed
-                || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
-                || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
+
+        if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
             hasExpired = true;
             if (referenceCount == 0) {
                 close();
+                expired = true;
             }
-            return true;
         }
-        return false;
+
+        // Only set hasExpired here is no references, as a Connection with references is by
+        // definition not idle at this time.
+        if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) {
+            hasExpired = true;
+            close();
+            expired = true;
+        }
+
+        return expired;
     }
 
     public int getIdleTimeout() {
@@ -240,23 +234,99 @@ public class ConnectionPool {
         this.idleTimeout = idleTimeout;
     }
 
-    protected SessionPool createSessionPool(SessionKey key) {
-        return new SessionPool(this, key, poolFactory.createPool());
-    }
-
     public void setExpiryTimeout(long expiryTimeout) {
-        this.expiryTimeout  = expiryTimeout;
+        this.expiryTimeout = expiryTimeout;
     }
 
     public long getExpiryTimeout() {
         return expiryTimeout;
     }
 
-    void onSessionReturned(PooledSession session) {
-        this.loanedSessions.remove(session);
+    public int getMaximumActiveSessionPerConnection() {
+        return this.sessionPool.getMaxActive();
+    }
+
+    public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
+        this.sessionPool.setMaxActive(maximumActiveSessionPerConnection);
+    }
+
+    public boolean isUseAnonymousProducers() {
+        return this.useAnonymousProducers;
+    }
+
+    public void setUseAnonymousProducers(boolean value) {
+        this.useAnonymousProducers = value;
+    }
+
+    /**
+     * @return the total number of Pooled session including idle sessions that are not
+     *          currently loaned out to any client.
+     */
+    public int getNumSessions() {
+        return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive();
+    }
+
+    /**
+     * @return the total number of Sessions that are in the Session pool but not loaned out.
+     */
+    public int getNumIdleSessions() {
+        return this.sessionPool.getNumIdle();
+    }
+
+    /**
+     * @return the total number of Session's that have been loaned to PooledConnection instances.
+     */
+    public int getNumActiveSessions() {
+        return this.sessionPool.getNumActive();
+    }
+
+    /**
+     * Configure whether the createSession method should block when there are no more idle sessions and the
+     * pool already contains the maximum number of active sessions.  If false the create method will fail
+     * and throw an exception.
+     *
+     * @param block
+     * 		Indicates whether blocking should be used to wait for more space to create a session.
+     */
+    public void setBlockIfSessionPoolIsFull(boolean block) {
+        this.sessionPool.setWhenExhaustedAction(
+                (block ? GenericObjectPool.WHEN_EXHAUSTED_BLOCK : GenericObjectPool.WHEN_EXHAUSTED_FAIL));
+    }
+
+    public boolean isBlockIfSessionPoolIsFull() {
+        return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
+    }
+
+    /**
+     * Returns the timeout to use for blocking creating new sessions
+     *
+     * @return true if the pooled Connection createSession method will block when the limit is hit.
+     * @see #setBlockIfSessionPoolIsFull(boolean)
+     */
+    public long getBlockIfSessionPoolIsFullTimeout() {
+        return this.sessionPool.getMaxWait();
+    }
+
+    /**
+     * Controls the behavior of the internal session pool. By default the call to
+     * Connection.getSession() will block if the session pool is full.  This setting
+     * will affect how long it blocks and throws an exception after the timeout.
+     *
+     * The size of the session pool is controlled by the @see #maximumActive
+     * property.
+     *
+     * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull
+     * property
+     *
+     * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true,
+     *                                        then use this setting to configure how long to block before retry
+     */
+    public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
+        this.sessionPool.setMaxWait(blockIfSessionPoolIsFullTimeout);
     }
 
-    void onSessionInvalidated(PooledSession session) {
-        this.loanedSessions.remove(session);
+    @Override
+    public String toString() {
+        return "ConnectionPool[" + connection + "]";
     }
 }

Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java Mon Jun 30 16:54:57 2014
@@ -39,6 +39,9 @@ public class GenericResourceManager {
 
     private String resourceName;
 
+    private String userName;
+    private String password;
+
     private TransactionManager transactionManager;
 
     private ConnectionFactory connectionFactory;
@@ -64,6 +67,22 @@ public class GenericResourceManager {
         }
     }
 
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
     public String getResourceName() {
         return resourceName;
     }
@@ -113,12 +132,17 @@ public class GenericResourceManager {
 
                     public NamedXAResource getNamedXAResource() throws SystemException {
                         try {
-                            final XAConnection activeConn = (XAConnection)connFactory.createXAConnection();
-                            final XASession session = (XASession)activeConn.createXASession();
-                            activeConn.start();
-                            LOGGER.debug("new namedXAResource's connection: " + activeConn);
+                            final XAConnection xaConnection;
+                            if (rm.getUserName() != null && rm.getPassword() != null) {
+                                xaConnection = connFactory.createXAConnection(rm.getUserName(), rm.getPassword());
+                            } else {
+                                xaConnection = connFactory.createXAConnection();
+                            }
+                            final XASession session = xaConnection.createXASession();
+                            xaConnection.start();
+                            LOGGER.debug("new namedXAResource's connection: " + xaConnection);
 
-                            return new ConnectionAndWrapperNamedXAResource(session.getXAResource(), getName(), activeConn);
+                            return new ConnectionAndWrapperNamedXAResource(session.getXAResource(), getName(), xaConnection);
                         } catch (Exception e) {
                             SystemException se =  new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
                             se.initCause(e);

Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java Mon Jun 30 16:54:57 2014
@@ -16,24 +16,10 @@
  */
 package org.apache.aries.transaction.jms.internal;
 
+import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
+import javax.jms.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,17 +35,25 @@ import org.slf4j.LoggerFactory;
  * href="http://jencks.org/Message+Driven+POJOs">this example</a>
  *
  */
-public class PooledConnection implements TopicConnection, QueueConnection {
+public class PooledConnection implements TopicConnection, QueueConnection, PooledSessionEventListener {
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
 
-    private ConnectionPool pool;
-    private boolean stopped;
-    private final CopyOnWriteArrayList<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
-    private final CopyOnWriteArrayList<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
+    protected ConnectionPool pool;
+    private volatile boolean stopped;
+    private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
+    private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
+    private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
 
+    /**
+     * Creates a new PooledConnection instance that uses the given ConnectionPool to create
+     * and manage its resources.  The ConnectionPool instance can be shared amongst many
+     * PooledConnection instances.
+     *
+     * @param pool
+     *      The connection and pool manager backing this proxy connection object.
+     */
     public PooledConnection(ConnectionPool pool) {
         this.pool = pool;
-        this.pool.incrementReferenceCount();
     }
 
     /**
@@ -69,55 +63,64 @@ public class PooledConnection implements
         return new PooledConnection(pool);
     }
 
+    @Override
     public void close() throws JMSException {
         this.cleanupConnectionTemporaryDestinations();
+        this.cleanupAllLoanedSessions();
         if (this.pool != null) {
             this.pool.decrementReferenceCount();
             this.pool = null;
         }
     }
 
+    @Override
     public void start() throws JMSException {
         assertNotClosed();
         pool.start();
     }
 
+    @Override
     public void stop() throws JMSException {
         stopped = true;
     }
 
-    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages)
-            throws JMSException {
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
         return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
     }
 
+    @Override
     public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
         return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
     }
 
-    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i)
-            throws JMSException {
+    @Override
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
         return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
     }
 
+    @Override
     public String getClientID() throws JMSException {
         return getConnection().getClientID();
     }
 
+    @Override
     public ExceptionListener getExceptionListener() throws JMSException {
         return getConnection().getExceptionListener();
     }
 
+    @Override
     public ConnectionMetaData getMetaData() throws JMSException {
         return getConnection().getMetaData();
     }
 
+    @Override
     public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
         getConnection().setExceptionListener(exceptionListener);
     }
 
+    @Override
     public void setClientID(String clientID) throws JMSException {
-
         // ignore repeated calls to setClientID() with the same client id
         // this could happen when a JMS component such as Spring that uses a
         // PooledConnectionFactory shuts down and reinitializes.
@@ -126,51 +129,66 @@ public class PooledConnection implements
         }
     }
 
+    @Override
     public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
         return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
     }
 
     // Session factory methods
     // -------------------------------------------------------------------------
+    @Override
     public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
         return (QueueSession) createSession(transacted, ackMode);
     }
 
+    @Override
     public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
         return (TopicSession) createSession(transacted, ackMode);
     }
 
+    @Override
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
         PooledSession result;
         result = (PooledSession) pool.createSession(transacted, ackMode);
 
-        // Add a temporary destination event listener to the session that notifies us when
-        // the session creates temporary destinations.
-        result.addTempDestEventListener(new PooledSessionEventListener() {
+        // Store the session so we can close the sessions that this PooledConnection
+        // created in order to ensure that consumers etc are closed per the JMS contract.
+        loanedSessions.add(result);
+
+        // Add a event listener to the session that notifies us when the session
+        // creates / destroys temporary destinations and closes etc.
+        result.addSessionEventListener(this);
+        return result;
+    }
 
-            public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
-                connTempQueues.add(tempQueue);
-            }
+    // Implementation methods
+    // -------------------------------------------------------------------------
 
-            public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
-                connTempTopics.add(tempTopic);
-            }
-        });
+    @Override
+    public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
+        connTempQueues.add(tempQueue);
+    }
 
-        return (Session) result;
+    @Override
+    public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
+        connTempTopics.add(tempTopic);
     }
 
-    // Implementation methods
-    // -------------------------------------------------------------------------
+    @Override
+    public void onSessionClosed(PooledSession session) {
+        if (session != null) {
+            this.loanedSessions.remove(session);
+        }
+    }
 
     public Connection getConnection() throws JMSException {
         assertNotClosed();
         return pool.getConnection();
     }
 
-    protected void assertNotClosed() throws JMSException {
+    protected void assertNotClosed() throws javax.jms.IllegalStateException {
         if (stopped || pool == null) {
-            throw new JMSException("Already closed");
+            throw new javax.jms.IllegalStateException("Connection closed");
         }
     }
 
@@ -178,6 +196,7 @@ public class PooledConnection implements
         return getConnection().createSession(key.isTransacted(), key.getAckMode());
     }
 
+    @Override
     public String toString() {
         return "PooledConnection { " + pool + " }";
     }
@@ -210,4 +229,43 @@ public class PooledConnection implements
         }
         connTempTopics.clear();
     }
+
+    /**
+     * The PooledSession tracks all Sessions that it created and now we close them.  Closing the
+     * PooledSession will return the internal Session to the Pool of Session after cleaning up
+     * all the resources that the Session had allocated for this PooledConnection.
+     */
+    protected void cleanupAllLoanedSessions() {
+
+        for (PooledSession session : loanedSessions) {
+            try {
+                session.close();
+            } catch (JMSException ex) {
+                LOG.info("failed to close laoned Session \"" + session + "\" on closing pooled connection: " + ex.getMessage());
+            }
+        }
+        loanedSessions.clear();
+    }
+
+    /**
+     * @return the total number of Pooled session including idle sessions that are not
+     *          currently loaned out to any client.
+     */
+    public int getNumSessions() {
+        return this.pool.getNumSessions();
+    }
+
+    /**
+     * @return the number of Sessions that are currently checked out of this Connection's session pool.
+     */
+    public int getNumActiveSessions() {
+        return this.pool.getNumActiveSessions();
+    }
+
+    /**
+     * @return the number of Sessions that are idle in this Connection's sessions pool.
+     */
+    public int getNumtIdleSessions() {
+        return this.pool.getNumIdleSessions();
+    }
 }

Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java Mon Jun 30 16:54:57 2014
@@ -17,27 +17,30 @@
 package org.apache.aries.transaction.jms.internal;
 
 import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 
 /**
  * A pooled {@link MessageProducer}
- * 
- * 
  */
 public class PooledProducer implements MessageProducer {
-    private MessageProducer messageProducer;
-    private Destination destination;
+
+    private final MessageProducer messageProducer;
+    private final Destination destination;
+
     private int deliveryMode;
     private boolean disableMessageID;
     private boolean disableMessageTimestamp;
     private int priority;
     private long timeToLive;
+    private boolean anonymous = true;
 
     public PooledProducer(MessageProducer messageProducer, Destination destination) throws JMSException {
         this.messageProducer = messageProducer;
         this.destination = destination;
+        this.anonymous = messageProducer.getDestination() == null;
 
         this.deliveryMode = messageProducer.getDeliveryMode();
         this.disableMessageID = messageProducer.getDisableMessageID();
@@ -46,73 +49,104 @@ public class PooledProducer implements M
         this.timeToLive = messageProducer.getTimeToLive();
     }
 
+    @Override
     public void close() throws JMSException {
+        if (!anonymous) {
+            this.messageProducer.close();
+        }
     }
 
+    @Override
     public void send(Destination destination, Message message) throws JMSException {
         send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
     }
 
+    @Override
     public void send(Message message) throws JMSException {
         send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
     }
 
+    @Override
     public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
         send(destination, message, deliveryMode, priority, timeToLive);
     }
 
+    @Override
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+
         if (destination == null) {
-            destination = this.destination;
+            if (messageProducer.getDestination() == null) {
+                throw new UnsupportedOperationException("A destination must be specified.");
+            }
+            throw new InvalidDestinationException("Don't understand null destinations");
         }
+
         MessageProducer messageProducer = getMessageProducer();
 
         // just in case let only one thread send at once
         synchronized (messageProducer) {
+
+            if (anonymous && this.destination != null && !this.destination.equals(destination)) {
+                throw new UnsupportedOperationException("This producer can only send messages to: " + this.destination);
+            }
+
+            // Producer will do it's own Destination validation so always use the destination
+            // based send method otherwise we might violate a JMS rule.
             messageProducer.send(destination, message, deliveryMode, priority, timeToLive);
         }
     }
 
+    @Override
     public Destination getDestination() {
         return destination;
     }
 
+    @Override
     public int getDeliveryMode() {
         return deliveryMode;
     }
 
+    @Override
     public void setDeliveryMode(int deliveryMode) {
         this.deliveryMode = deliveryMode;
     }
 
+    @Override
     public boolean getDisableMessageID() {
         return disableMessageID;
     }
 
+    @Override
     public void setDisableMessageID(boolean disableMessageID) {
         this.disableMessageID = disableMessageID;
     }
 
+    @Override
     public boolean getDisableMessageTimestamp() {
         return disableMessageTimestamp;
     }
 
+    @Override
     public void setDisableMessageTimestamp(boolean disableMessageTimestamp) {
         this.disableMessageTimestamp = disableMessageTimestamp;
     }
 
+    @Override
     public int getPriority() {
         return priority;
     }
 
+    @Override
     public void setPriority(int priority) {
         this.priority = priority;
     }
 
+    @Override
     public long getTimeToLive() {
         return timeToLive;
     }
 
+    @Override
     public void setTimeToLive(long timeToLive) {
         this.timeToLive = timeToLive;
     }
@@ -123,8 +157,12 @@ public class PooledProducer implements M
         return messageProducer;
     }
 
+    protected boolean isAnonymous() {
+        return anonymous;
+    }
+
+    @Override
     public String toString() {
         return "PooledProducer { " + messageProducer + " }";
     }
-
 }

Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java Mon Jun 30 16:54:57 2014
@@ -23,34 +23,42 @@ import java.util.concurrent.CopyOnWriteA
 import javax.jms.*;
 import javax.transaction.xa.XAResource;
 
+import org.apache.commons.pool.KeyedObjectPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PooledSession implements Session, TopicSession, QueueSession {
+public class PooledSession implements Session, TopicSession, QueueSession, XASession {
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
 
+    private final SessionKey key;
+    private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
+    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
+    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
+    private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>();
+
+    private MessageProducer producer;
+    private TopicPublisher publisher;
+    private QueueSender sender;
+
     private Session session;
-    private SessionPool sessionPool;
-    private MessageProducer messageProducer;
-    private QueueSender queueSender;
-    private TopicPublisher topicPublisher;
     private boolean transactional = true;
     private boolean ignoreClose;
-
-    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
-    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
-    private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
-        new CopyOnWriteArrayList<PooledSessionEventListener>();
     private boolean isXa;
+    private boolean useAnonymousProducers = true;
 
-    public PooledSession(Session session, SessionPool sessionPool, boolean transactional) {
+    public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional, boolean anonymous) {
+        this.key = key;
         this.session = session;
         this.sessionPool = sessionPool;
         this.transactional = transactional;
+        this.useAnonymousProducers = anonymous;
     }
 
-    public void addTempDestEventListener(PooledSessionEventListener listener) {
-        this.tempDestEventListeners.add(listener);
+    public void addSessionEventListener(PooledSessionEventListener listener) {
+        // only add if really needed
+        if (!sessionEventListeners.contains(listener)) {
+            this.sessionEventListeners.add(listener);
+        }
     }
 
     protected boolean isIgnoreClose() {
@@ -61,10 +69,9 @@ public class PooledSession implements Se
         this.ignoreClose = ignoreClose;
     }
 
+    @Override
     public void close() throws JMSException {
         if (!ignoreClose) {
-            // TODO a cleaner way to reset??
-
             boolean invalidate = false;
             try {
                 // lets reset the session
@@ -95,11 +102,15 @@ public class PooledSession implements Se
             } finally {
                 consumers.clear();
                 browsers.clear();
+                for (PooledSessionEventListener listener : this.sessionEventListeners) {
+                    listener.onSessionClosed(this);
+                }
+                sessionEventListeners.clear();
             }
 
             if (invalidate) {
-                // lets close the session and not put the session back into
-                // the pool
+                // lets close the session and not put the session back into the pool
+                // instead invalidate it so the pool can create a new one on demand.
                 if (session != null) {
                     try {
                         session.close();
@@ -108,114 +119,145 @@ public class PooledSession implements Se
                     }
                     session = null;
                 }
-                sessionPool.invalidateSession(this);
+                try {
+                    sessionPool.invalidateObject(key, this);
+                } catch (Exception e) {
+                    LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
+                }
             } else {
-                sessionPool.returnSession(this);
+                try {
+                    sessionPool.returnObject(key, this);
+                } catch (Exception e) {
+                    javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
+                    illegalStateException.initCause(e);
+                    throw illegalStateException;
+                }
             }
         }
     }
 
+    @Override
     public void commit() throws JMSException {
         getInternalSession().commit();
     }
 
+    @Override
     public BytesMessage createBytesMessage() throws JMSException {
         return getInternalSession().createBytesMessage();
     }
 
+    @Override
     public MapMessage createMapMessage() throws JMSException {
         return getInternalSession().createMapMessage();
     }
 
+    @Override
     public Message createMessage() throws JMSException {
         return getInternalSession().createMessage();
     }
 
+    @Override
     public ObjectMessage createObjectMessage() throws JMSException {
         return getInternalSession().createObjectMessage();
     }
 
+    @Override
     public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
         return getInternalSession().createObjectMessage(serializable);
     }
 
+    @Override
     public Queue createQueue(String s) throws JMSException {
         return getInternalSession().createQueue(s);
     }
 
+    @Override
     public StreamMessage createStreamMessage() throws JMSException {
         return getInternalSession().createStreamMessage();
     }
 
+    @Override
     public TemporaryQueue createTemporaryQueue() throws JMSException {
         TemporaryQueue result;
 
         result = getInternalSession().createTemporaryQueue();
 
         // Notify all of the listeners of the created temporary Queue.
-        for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+        for (PooledSessionEventListener listener : this.sessionEventListeners) {
             listener.onTemporaryQueueCreate(result);
         }
 
         return result;
     }
 
+    @Override
     public TemporaryTopic createTemporaryTopic() throws JMSException {
         TemporaryTopic result;
 
         result = getInternalSession().createTemporaryTopic();
 
         // Notify all of the listeners of the created temporary Topic.
-        for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+        for (PooledSessionEventListener listener : this.sessionEventListeners) {
             listener.onTemporaryTopicCreate(result);
         }
 
         return result;
     }
 
+    @Override
     public void unsubscribe(String s) throws JMSException {
         getInternalSession().unsubscribe(s);
     }
 
+    @Override
     public TextMessage createTextMessage() throws JMSException {
         return getInternalSession().createTextMessage();
     }
 
+    @Override
     public TextMessage createTextMessage(String s) throws JMSException {
         return getInternalSession().createTextMessage(s);
     }
 
+    @Override
     public Topic createTopic(String s) throws JMSException {
         return getInternalSession().createTopic(s);
     }
 
+    @Override
     public int getAcknowledgeMode() throws JMSException {
         return getInternalSession().getAcknowledgeMode();
     }
 
+    @Override
     public boolean getTransacted() throws JMSException {
         return getInternalSession().getTransacted();
     }
 
+    @Override
     public void recover() throws JMSException {
         getInternalSession().recover();
     }
 
+    @Override
     public void rollback() throws JMSException {
         getInternalSession().rollback();
     }
 
+    @Override
     public XAResource getXAResource() {
-        if (session == null) {
-            throw new IllegalStateException("Session is closed");
+        if (session instanceof XASession) {
+            return ((XASession) session).getXAResource();
         }
-        return ((XASession) session).getXAResource();
+        return null;
     }
 
+    @Override
     public Session getSession() {
         return this;
     }
 
+    @Override
     public void run() {
         if (session != null) {
             session.run();
@@ -224,112 +266,168 @@ public class PooledSession implements Se
 
     // Consumer related methods
     // -------------------------------------------------------------------------
+    @Override
     public QueueBrowser createBrowser(Queue queue) throws JMSException {
         return addQueueBrowser(getInternalSession().createBrowser(queue));
     }
 
+    @Override
     public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
         return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
     }
 
+    @Override
     public MessageConsumer createConsumer(Destination destination) throws JMSException {
         return addConsumer(getInternalSession().createConsumer(destination));
     }
 
+    @Override
     public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
         return addConsumer(getInternalSession().createConsumer(destination, selector));
     }
 
+    @Override
     public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
         return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
     }
 
+    @Override
     public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
         return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
     }
 
+    @Override
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
         return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
     }
 
+    @Override
     public MessageListener getMessageListener() throws JMSException {
         return getInternalSession().getMessageListener();
     }
 
+    @Override
     public void setMessageListener(MessageListener messageListener) throws JMSException {
         getInternalSession().setMessageListener(messageListener);
     }
 
+    @Override
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
         return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic));
     }
 
+    @Override
     public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
         return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local));
     }
 
+    @Override
     public QueueReceiver createReceiver(Queue queue) throws JMSException {
         return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue));
     }
 
+    @Override
     public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
         return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector));
     }
 
     // Producer related methods
     // -------------------------------------------------------------------------
+    @Override
     public MessageProducer createProducer(Destination destination) throws JMSException {
-        return new PooledProducer(getMessageProducer(), destination);
+        return new PooledProducer(getMessageProducer(destination), destination);
     }
 
+    @Override
     public QueueSender createSender(Queue queue) throws JMSException {
-        return new PooledQueueSender(getQueueSender(), queue);
+        return new PooledQueueSender(getQueueSender(queue), queue);
     }
 
+    @Override
     public TopicPublisher createPublisher(Topic topic) throws JMSException {
-        return new PooledTopicPublisher(getTopicPublisher(), topic);
-    }
-
-    /**
-     * Callback invoked when the consumer is closed.
-     * <p/>
-     * This is used to keep track of an explicit closed consumer created by this
-     * session, by which we know do not need to keep track of the consumer, as
-     * its already closed.
-     *
-     * @param consumer
-     *            the consumer which is being closed
-     */
-    protected void onConsumerClose(MessageConsumer consumer) {
-        consumers.remove(consumer);
+        return new PooledTopicPublisher(getTopicPublisher(topic), topic);
     }
 
-    public Session getInternalSession() throws JMSException {
+    public Session getInternalSession() throws IllegalStateException {
         if (session == null) {
-            throw new JMSException("The session has already been closed");
+            throw new IllegalStateException("The session has already been closed");
         }
         return session;
     }
 
     public MessageProducer getMessageProducer() throws JMSException {
-        if (messageProducer == null) {
-            messageProducer = getInternalSession().createProducer(null);
+        return getMessageProducer(null);
+    }
+
+    public MessageProducer getMessageProducer(Destination destination) throws JMSException {
+        MessageProducer result = null;
+
+        if (useAnonymousProducers) {
+            if (producer == null) {
+                // Don't allow for duplicate anonymous producers.
+                synchronized (this) {
+                    if (producer == null) {
+                        producer = getInternalSession().createProducer(null);
+                    }
+                }
+            }
+
+            result = producer;
+        } else {
+            result = getInternalSession().createProducer(destination);
         }
-        return messageProducer;
+
+        return result;
     }
 
     public QueueSender getQueueSender() throws JMSException {
-        if (queueSender == null) {
-            queueSender = ((QueueSession) getInternalSession()).createSender(null);
+        return getQueueSender(null);
+    }
+
+    public QueueSender getQueueSender(Queue destination) throws JMSException {
+        QueueSender result = null;
+
+        if (useAnonymousProducers) {
+            if (sender == null) {
+                // Don't allow for duplicate anonymous producers.
+                synchronized (this) {
+                    if (sender == null) {
+                        sender = ((QueueSession) getInternalSession()).createSender(null);
+                    }
+                }
+            }
+
+            result = sender;
+        } else {
+            result = ((QueueSession) getInternalSession()).createSender(destination);
         }
-        return queueSender;
+
+        return result;
     }
 
     public TopicPublisher getTopicPublisher() throws JMSException {
-        if (topicPublisher == null) {
-            topicPublisher = ((TopicSession) getInternalSession()).createPublisher(null);
+        return getTopicPublisher(null);
+    }
+
+    public TopicPublisher getTopicPublisher(Topic destination) throws JMSException {
+        TopicPublisher result = null;
+
+        if (useAnonymousProducers) {
+            if (publisher == null) {
+                // Don't allow for duplicate anonymous producers.
+                synchronized (this) {
+                    if (publisher == null) {
+                        publisher = ((TopicSession) getInternalSession()).createPublisher(null);
+                    }
+                }
+            }
+
+            result = publisher;
+        } else {
+            result = ((TopicSession) getInternalSession()).createPublisher(destination);
         }
-        return topicPublisher;
+
+        return result;
     }
 
     private QueueBrowser addQueueBrowser(QueueBrowser browser) {
@@ -340,10 +438,8 @@ public class PooledSession implements Se
     private MessageConsumer addConsumer(MessageConsumer consumer) {
         consumers.add(consumer);
         // must wrap in PooledMessageConsumer to ensure the onConsumerClose
-        // method is invoked
-        // when the returned consumer is closed, to avoid memory leak in this
-        // session class
-        // in case many consumers is created
+        // method is invoked when the returned consumer is closed, to avoid memory
+        // leak in this session class in case many consumers is created
         return new PooledMessageConsumer(this, consumer);
     }
 
@@ -361,7 +457,22 @@ public class PooledSession implements Se
         this.isXa = isXa;
     }
 
+    @Override
     public String toString() {
         return "PooledSession { " + session + " }";
     }
+
+    /**
+     * Callback invoked when the consumer is closed.
+     * <p/>
+     * This is used to keep track of an explicit closed consumer created by this
+     * session, by which we know do not need to keep track of the consumer, as
+     * its already closed.
+     *
+     * @param consumer
+     *            the consumer which is being closed
+     */
+    protected void onConsumerClose(MessageConsumer consumer) {
+        consumers.remove(consumer);
+    }
 }

Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java Mon Jun 30 16:54:57 2014
@@ -25,7 +25,7 @@ interface PooledSessionEventListener {
      * Called on successful creation of a new TemporaryQueue.
      *
      * @param tempQueue
-     *            The TemporaryQueue just created.
+     *      The TemporaryQueue just created.
      */
     void onTemporaryQueueCreate(TemporaryQueue tempQueue);
 
@@ -33,8 +33,16 @@ interface PooledSessionEventListener {
      * Called on successful creation of a new TemporaryTopic.
      *
      * @param tempTopic
-     *            The TemporaryTopic just created.
+     *      The TemporaryTopic just created.
      */
     void onTemporaryTopicCreate(TemporaryTopic tempTopic);
 
+    /**
+     * Called when the PooledSession is closed.
+     *
+     * @param session
+     *      The PooledSession that has been closed.
+     */
+    void onSessionClosed(PooledSession session);
+
 }

Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java Mon Jun 30 16:54:57 2014
@@ -16,6 +16,7 @@
  */
 package org.apache.aries.transaction.jms.internal;
 
+import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.XAConnection;
 import javax.jms.XASession;
@@ -29,8 +30,8 @@ public class RecoverableConnectionPool e
 
     private String name;
 
-    public RecoverableConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) throws JMSException {
-        super(connection, poolFactory, transactionManager);
+    public RecoverableConnectionPool(Connection connection, TransactionManager transactionManager, String name) {
+        super(connection, transactionManager);
         this.name = name;
     }
 

Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java Mon Jun 30 16:54:57 2014
@@ -22,8 +22,10 @@ package org.apache.aries.transaction.jms
  * 
  */
 public class SessionKey {
+
     private boolean transacted;
     private int ackMode;
+
     private int hash;
 
     public SessionKey(boolean transacted, int ackMode) {