You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by tj...@apache.org on 2014/06/30 18:55:06 UTC
svn commit: r1606837 [25/27] - in /aries/branches/subsystemsR6: ./
application/ application/application-api/ application/application-bundle/
application/application-converters/
application/application-default-local-platform/
application/application-dep...
Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java Mon Jun 30 16:54:57 2014
@@ -24,11 +24,14 @@ import java.util.concurrent.atomic.Atomi
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
import org.apache.aries.transaction.jms.internal.ConnectionKey;
import org.apache.aries.transaction.jms.internal.ConnectionPool;
-import org.apache.aries.transaction.jms.internal.IOExceptionSupport;
import org.apache.aries.transaction.jms.internal.PooledConnection;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.pool.ObjectPoolFactory;
@@ -37,8 +40,8 @@ import org.apache.commons.pool.impl.Gene
/**
* A JMS provider which pools Connection, Session and MessageProducer instances
- * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's <a
- * href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
+ * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's
+ * <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
* Connections, sessions and producers are returned to a pool after use so that they can be reused later
* without having to undergo the cost of creating them again.
*
@@ -55,77 +58,198 @@ import org.apache.commons.pool.impl.Gene
* all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
* http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
*
+ * Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the
+ * pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should
+ * be used when configuring this optional feature. Eviction runs contend with client threads for access
+ * to objects in the pool, so if they run too frequently performance issues may result. The idle object
+ * eviction thread may be configured using the {@link PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method. By
+ * default the value is -1 which means no eviction thread will be run. Set to a non-negative value to
+ * configure the idle eviction thread to run.
+ *
* @org.apache.xbean.XBean element="pooledConnectionFactory"
*/
public class PooledConnectionFactory implements ConnectionFactory {
-
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
+
+ protected final AtomicBoolean stopped = new AtomicBoolean(false);
+ private GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
+
private ConnectionFactory connectionFactory;
- private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
- private ObjectPoolFactory poolFactory;
- private int maximumActive = 500;
- private int maxConnections = 1;
+
+ private int maximumActiveSessionPerConnection = 500;
private int idleTimeout = 30 * 1000;
private boolean blockIfSessionPoolIsFull = true;
- private AtomicBoolean stopped = new AtomicBoolean(false);
+ private long blockIfSessionPoolIsFullTimeout = -1L;
private long expiryTimeout = 0l;
+ private boolean createConnectionOnStartup = true;
+ private boolean useAnonymousProducers = true;
- public PooledConnectionFactory() {
+ public void initConnectionsPool() {
+ if (this.connectionsPool == null) {
+ this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
+ new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() {
+
+ @Override
+ public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
+ }
+
+ @Override
+ public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception {
+ try {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Destroying connection: {}", connection);
+ }
+ connection.close();
+ } catch (Exception e) {
+ LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
+ }
+ }
+
+ @Override
+ public ConnectionPool makeObject(ConnectionKey key) throws Exception {
+ Connection delegate = createConnection(key);
+
+ ConnectionPool connection = createConnectionPool(delegate);
+ connection.setIdleTimeout(getIdleTimeout());
+ connection.setExpiryTimeout(getExpiryTimeout());
+ connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
+ connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
+ if (isBlockIfSessionPoolIsFull() && getBlockIfSessionPoolIsFullTimeout() > 0) {
+ connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout());
+ }
+ connection.setUseAnonymousProducers(isUseAnonymousProducers());
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Created new connection: {}", connection);
+ }
+
+ return connection;
+ }
+
+ @Override
+ public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
+ }
+
+ @Override
+ public boolean validateObject(ConnectionKey key, ConnectionPool connection) {
+ if (connection != null && connection.expiredCheck()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Connection has expired: {} and will be destroyed", connection);
+ }
+
+ return false;
+ }
+
+ return true;
+ }
+ });
+
+ // Set max idle (not max active) since our connections always idle in the pool.
+ this.connectionsPool.setMaxIdle(1);
+
+ // We always want our validate method to control when idle objects are evicted.
+ this.connectionsPool.setTestOnBorrow(true);
+ this.connectionsPool.setTestWhileIdle(true);
+ }
}
+ /**
+ * @return the currently configured ConnectionFactory used to create the pooled Connections.
+ */
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
/**
- * The actual JMS ConnectionFactory that will be pooled.
+ * Sets the ConnectionFactory used to create new pooled Connections.
+ * <p/>
+ * Updates to this value do not affect Connections that were previously created and placed
+ * into the pool. In order to allocate new Connections based off this new ConnectionFactory
+ * it is first necessary to {@link #clear()} the pooled Connections.
+ *
+ * @param toUse
+ * The factory to use to create pooled Connections.
*/
- public void setConnectionFactory(ConnectionFactory connectionFactory) {
- this.connectionFactory = connectionFactory;
+ public void setConnectionFactory(final ConnectionFactory toUse) {
+ if (toUse instanceof XAConnectionFactory) {
+ this.connectionFactory = new XAConnectionFactoryWrapper((XAConnectionFactory) toUse);
+ } else {
+ this.connectionFactory = toUse;
+ }
}
+ @Override
public Connection createConnection() throws JMSException {
return createConnection(null, null);
}
+ @Override
public synchronized Connection createConnection(String userName, String password) throws JMSException {
if (stopped.get()) {
LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
return null;
}
+ ConnectionPool connection = null;
ConnectionKey key = new ConnectionKey(userName, password);
- LinkedList<ConnectionPool> pools = cache.get(key);
- if (pools == null) {
- pools = new LinkedList<ConnectionPool>();
- cache.put(key, pools);
+ // This will either return an existing non-expired ConnectionPool or it
+ // will create a new one to meet the demand.
+ if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) {
+ try {
+ // we want borrowObject to return the one we added.
+ connectionsPool.setLifo(true);
+ connectionsPool.addObject(key);
+ } catch (Exception e) {
+ throw createJmsException("Error while attempting to add new Connection to the pool", e);
+ }
+ } else {
+ // now we want the oldest one in the pool.
+ connectionsPool.setLifo(false);
}
- ConnectionPool connection = null;
- if (pools.size() == maxConnections) {
- connection = pools.removeFirst();
- }
+ try {
- // Now.. we might get a connection, but it might be that we need to
- // dump it..
- if (connection != null && connection.expiredCheck()) {
- connection = null;
+ // We can race against other threads returning the connection when there is an
+ // expiration or idle timeout. We keep pulling out ConnectionPool instances until
+ // we win and get a non-closed instance and then increment the reference count
+ // under lock to prevent another thread from triggering an expiration check and
+ // pulling the rug out from under us.
+ while (connection == null) {
+ connection = connectionsPool.borrowObject(key);
+ synchronized (connection) {
+ if (connection.getConnection() != null) {
+ connection.incrementReferenceCount();
+ break;
+ }
+
+ // Return the bad one to the pool and let if get destroyed as normal.
+ connectionsPool.returnObject(key, connection);
+ connection = null;
+ }
+ }
+ } catch (Exception e) {
+ throw createJmsException("Error while attempting to retrieve a connection from the pool", e);
}
- if (connection == null) {
- Connection delegate = createConnection(key);
- connection = createConnectionPool(delegate);
+ try {
+ connectionsPool.returnObject(key, connection);
+ } catch (Exception e) {
+ throw createJmsException("Error when returning connection to the pool", e);
}
- pools.add(connection);
+
+ return newPooledConnection(connection);
+ }
+
+ protected Connection newPooledConnection(ConnectionPool connection) {
return new PooledConnection(connection);
}
- protected ConnectionPool createConnectionPool(Connection connection) throws JMSException {
- ConnectionPool result = new ConnectionPool(connection, getPoolFactory());
- result.setIdleTimeout(getIdleTimeout());
- result.setExpiryTimeout(getExpiryTimeout());
- return result;
+ private JMSException createJmsException(String msg, Exception cause) {
+ JMSException exception = new JMSException(msg);
+ exception.setLinkedException(cause);
+ exception.initCause(cause);
+ return exception;
}
protected Connection createConnection(ConnectionKey key) throws JMSException {
@@ -137,54 +261,65 @@ public class PooledConnectionFactory imp
}
public void start() {
- try {
- stopped.set(false);
- createConnection();
- } catch (JMSException e) {
- LOG.warn("Create pooled connection during start failed.", e);
- IOExceptionSupport.create(e);
+ LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
+ stopped.set(false);
+ if (isCreateConnectionOnStartup()) {
+ try {
+ // warm the pool by creating a connection during startup
+ createConnection();
+ } catch (JMSException e) {
+ LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e);
+ }
}
}
public void stop() {
- LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size());
- stopped.set(true);
- for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
- for (ConnectionPool connection : iter.next()) {
- try {
- connection.close();
- }catch(Exception e) {
- LOG.warn("Close connection failed",e);
+ if (stopped.compareAndSet(false, true)) {
+ LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
+ connectionsPool != null ? connectionsPool.getNumActive() : 0);
+ try {
+ if (connectionsPool != null) {
+ connectionsPool.close();
}
+ } catch (Exception e) {
}
}
- cache.clear();
}
- public ObjectPoolFactory getPoolFactory() {
- if (poolFactory == null) {
- poolFactory = createPoolFactory();
+ /**
+ * Clears all connections from the pool. Each connection that is currently in the pool is
+ * closed and removed from the pool. A new connection will be created on the next call to
+ * {@link #createConnection()}. Care should be taken when using this method as Connections that
+ * are in use be client's will be closed.
+ */
+ public void clear() {
+
+ if (stopped.get()) {
+ return;
}
- return poolFactory;
+
+ getConnectionsPool().clear();
}
/**
- * Sets the object pool factory used to create individual session pools for
- * each connection
+ * Returns the currently configured maximum number of sessions a pooled Connection will
+ * create before it either blocks or throws an exception when a new session is requested,
+ * depending on configuration.
+ *
+ * @return the number of session instances that can be taken from a pooled connection.
*/
- public void setPoolFactory(ObjectPoolFactory poolFactory) {
- this.poolFactory = poolFactory;
- }
-
- public int getMaximumActive() {
- return maximumActive;
+ public int getMaximumActiveSessionPerConnection() {
+ return maximumActiveSessionPerConnection;
}
/**
* Sets the maximum number of active sessions per connection
+ *
+ * @param maximumActiveSessionPerConnection
+ * The maximum number of active session per connection in the pool.
*/
- public void setMaximumActive(int maximumActive) {
- this.maximumActive = maximumActive;
+ public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
+ this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
}
/**
@@ -204,57 +339,68 @@ public class PooledConnectionFactory imp
}
/**
- * @return the maxConnections
+ * Returns whether a pooled Connection will enter a blocked state or will throw an Exception
+ * once the maximum number of sessions has been borrowed from the the Session Pool.
+ *
+ * @return true if the pooled Connection createSession method will block when the limit is hit.
+ * @see #setBlockIfSessionPoolIsFull(boolean)
+ */
+ public boolean isBlockIfSessionPoolIsFull() {
+ return this.blockIfSessionPoolIsFull;
+ }
+
+ /**
+ * Returns the maximum number to pooled Connections that this factory will allow before it
+ * begins to return connections from the pool on calls to ({@link #createConnection()}.
+ *
+ * @return the maxConnections that will be created for this pool.
*/
public int getMaxConnections() {
- return maxConnections;
+ return getConnectionsPool().getMaxIdle();
}
/**
- * Number of JMS connections to use. The default is 1 to use a single connection
- * to the broker. For high throughput, it may be interesting to raise this number
- * a bit.
+ * Sets the maximum number of pooled Connections (defaults to one). Each call to
+ * {@link #createConnection()} will result in a new Connection being create up to the max
+ * connections value.
*
* @param maxConnections the maxConnections to set
*/
public void setMaxConnections(int maxConnections) {
- this.maxConnections = maxConnections;
+ getConnectionsPool().setMaxIdle(maxConnections);
}
/**
- * Creates an ObjectPoolFactory. Its behavior is controlled by the two
- * properties @see #maximumActive and @see #blockIfSessionPoolIsFull.
+ * Gets the Idle timeout value applied to new Connection's that are created by this pool.
+ * <p/>
+ * The idle timeout is used determine if a Connection instance has sat to long in the pool unused
+ * and if so is closed and removed from the pool. The default value is 30 seconds.
*
- * @return the newly created but empty ObjectPoolFactory
+ * @return idle timeout value (milliseconds)
*/
- protected ObjectPoolFactory createPoolFactory() {
- if (blockIfSessionPoolIsFull) {
- return new GenericObjectPoolFactory(null, maximumActive);
- } else {
- return new GenericObjectPoolFactory(null,
- maximumActive,
- GenericObjectPool.WHEN_EXHAUSTED_FAIL,
- GenericObjectPool.DEFAULT_MAX_WAIT);
- }
- }
-
public int getIdleTimeout() {
return idleTimeout;
}
/**
- * Specifies the amount of milliseconds after which an idle connection is discarded.
- * Defaults to 30 seconds.
+ * Sets the idle timeout value for Connection's that are created by this pool in Milliseconds,
+ * defaults to 30 seconds.
+ * <p/>
+ * For a Connection that is in the pool but has no current users the idle timeout determines how
+ * long the Connection can live before it is eligible for removal from the pool. Normally the
+ * connections are tested when an attempt to check one out occurs so a Connection instance can sit
+ * in the pool much longer than its idle timeout if connections are used infrequently.
*
- * @param idleTimeout non zero in milliseconds
+ * @param idleTimeout
+ * The maximum time a pooled Connection can sit unused before it is eligible for removal.
*/
public void setIdleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
}
/**
- * Allow connections to expire, irrespective of load or idle time. This is useful with failover
- * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery.
+ * allow connections to expire, irrespective of load or idle time. This is useful with failover
+ * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
*
* @param expiryTimeout non zero in milliseconds
*/
@@ -262,7 +408,162 @@ public class PooledConnectionFactory imp
this.expiryTimeout = expiryTimeout;
}
+ /**
+ * @return the configured expiration timeout for connections in the pool.
+ */
public long getExpiryTimeout() {
return expiryTimeout;
}
+
+ /**
+ * @return true if a Connection is created immediately on a call to {@link #start()}.
+ */
+ public boolean isCreateConnectionOnStartup() {
+ return createConnectionOnStartup;
+ }
+
+ /**
+ * Whether to create a connection on starting this {@link PooledConnectionFactory}.
+ * <p/>
+ * This can be used to warm-up the pool on startup. Notice that any kind of exception
+ * happens during startup is logged at WARN level and ignored.
+ *
+ * @param createConnectionOnStartup <tt>true</tt> to create a connection on startup
+ */
+ public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
+ this.createConnectionOnStartup = createConnectionOnStartup;
+ }
+
+ /**
+ * Should Sessions use one anonymous producer for all producer requests or should a new
+ * MessageProducer be created for each request to create a producer object, default is true.
+ *
+ * When enabled the session only needs to allocate one MessageProducer for all requests and
+ * the MessageProducer#send(destination, message) method can be used. Normally this is the
+ * right thing to do however it does result in the Broker not showing the producers per
+ * destination.
+ *
+ * @return true if a PooledSession will use only a single anonymous message producer instance.
+ */
+ public boolean isUseAnonymousProducers() {
+ return this.useAnonymousProducers;
+ }
+
+ /**
+ * Sets whether a PooledSession uses only one anonymous MessageProducer instance or creates
+ * a new MessageProducer for each call the create a MessageProducer.
+ *
+ * @param value
+ * Boolean value that configures whether anonymous producers are used.
+ */
+ public void setUseAnonymousProducers(boolean value) {
+ this.useAnonymousProducers = value;
+ }
+
+ /**
+ * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
+ *
+ * @return this factories pool of ConnectionPool instances.
+ */
+ protected GenericKeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
+ initConnectionsPool();
+ return this.connectionsPool;
+ }
+
+ /**
+ * Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread.
+ * When non-positive, no idle object eviction thread will be run, and Connections will only be
+ * checked on borrow to determine if they have sat idle for too long or have failed for some
+ * other reason.
+ * <p/>
+ * By default this value is set to -1 and no expiration thread ever runs.
+ *
+ * @param timeBetweenExpirationCheckMillis
+ * The time to wait between runs of the idle Connection eviction thread.
+ */
+ public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
+ getConnectionsPool().setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
+ }
+
+ /**
+ * @return the number of milliseconds to sleep between runs of the idle connection eviction thread.
+ */
+ public long getTimeBetweenExpirationCheckMillis() {
+ return getConnectionsPool().getTimeBetweenEvictionRunsMillis();
+ }
+
+ /**
+ * @return the number of Connections currently in the Pool
+ */
+ public int getNumConnections() {
+ return getConnectionsPool().getNumIdle();
+ }
+
+ /**
+ * Delegate that creates each instance of an ConnectionPool object. Subclasses can override
+ * this method to customize the type of connection pool returned.
+ *
+ * @param connection
+ *
+ * @return instance of a new ConnectionPool.
+ */
+ protected ConnectionPool createConnectionPool(Connection connection) {
+ return new ConnectionPool(connection);
+ }
+
+ /**
+ * Returns the timeout to use for blocking creating new sessions
+ *
+ * @return true if the pooled Connection createSession method will block when the limit is hit.
+ * @see #setBlockIfSessionPoolIsFull(boolean)
+ */
+ public long getBlockIfSessionPoolIsFullTimeout() {
+ return blockIfSessionPoolIsFullTimeout;
+ }
+
+ /**
+ * Controls the behavior of the internal session pool. By default the call to
+ * Connection.getSession() will block if the session pool is full. This setting
+ * will affect how long it blocks and throws an exception after the timeout.
+ *
+ * The size of the session pool is controlled by the @see #maximumActive
+ * property.
+ *
+ * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull
+ * property
+ *
+ * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true,
+ * then use this setting to configure how long to block before retry
+ */
+ public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
+ this.blockIfSessionPoolIsFullTimeout = blockIfSessionPoolIsFullTimeout;
+ }
+
+ static class XAConnectionFactoryWrapper implements XAConnectionFactory, ConnectionFactory {
+ private final XAConnectionFactory delegate;
+
+ XAConnectionFactoryWrapper(XAConnectionFactory delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Connection createConnection() throws JMSException {
+ return createXAConnection();
+ }
+
+ @Override
+ public Connection createConnection(String userName, String password) throws JMSException {
+ return createXAConnection(userName, password);
+ }
+
+ @Override
+ public XAConnection createXAConnection() throws JMSException {
+ return delegate.createXAConnection();
+ }
+
+ @Override
+ public XAConnection createXAConnection(String userName, String password) throws JMSException {
+ return delegate.createXAConnection(userName, password);
+ }
+ }
}
Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java Mon Jun 30 16:54:57 2014
@@ -54,11 +54,8 @@ public class RecoverablePooledConnection
this.name = name;
}
- protected ConnectionPool createConnectionPool(Connection connection) throws JMSException {
- if (!(connection instanceof XAConnection)) {
- throw new JMSException("Require an instance of javax.jms.XAConnection for creating the ConnectionPool");
- }
- return new RecoverableConnectionPool((XAConnection)connection, getPoolFactory(), getTransactionManager(), getName());
+ protected ConnectionPool createConnectionPool(Connection connection) {
+ return new RecoverableConnectionPool(connection, getTransactionManager(), getName());
}
/**
@@ -67,7 +64,7 @@ public class RecoverablePooledConnection
@Override
public void start() {
if (getConnectionFactory() == null) {
- throw new IllegalArgumentException("connectionFactory or xaConnectionFactory must be set");
+ throw new IllegalArgumentException("connectionFactory must be set");
}
if (getTransactionManager() == null) {
throw new IllegalArgumentException("transactionManager must be set");
Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java Mon Jun 30 16:54:57 2014
@@ -17,78 +17,99 @@
package org.apache.aries.transaction.jms.internal;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Session;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.ObjectPoolFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Holds a real JMS connection along with the session pools associated with it.
- *
- *
+ * <p/>
+ * Instances of this class are shared amongst one or more PooledConnection object and must
+ * track the session objects that are loaned out for cleanup on close as well as ensuring
+ * that the temporary destinations of the managed Connection are purged when all references
+ * to this ConnectionPool are released.
*/
public class ConnectionPool {
- private Connection connection;
- private ConcurrentHashMap<SessionKey, SessionPool> cache;
- private ConcurrentLinkedQueue<PooledSession> loanedSessions = new ConcurrentLinkedQueue<PooledSession>();
- private AtomicBoolean started = new AtomicBoolean(false);
+ private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);
+
+ protected Connection connection;
private int referenceCount;
- private ObjectPoolFactory poolFactory;
private long lastUsed = System.currentTimeMillis();
- private long firstUsed = lastUsed;
- private boolean hasFailed;
+ private final long firstUsed = lastUsed;
private boolean hasExpired;
private int idleTimeout = 30 * 1000;
private long expiryTimeout = 0l;
+ private boolean useAnonymousProducers = true;
- public ConnectionPool(Connection connection, ObjectPoolFactory poolFactory) throws JMSException {
- this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory);
- /*
- TODO: activemq specific
- // Add a transport Listener so that we can notice if this connection
- // should be expired due to a connection failure.
- connection.addTransportListener(new TransportListener() {
- public void onCommand(Object command) {
- }
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool;
+ private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
- public void onException(IOException error) {
- synchronized (ConnectionPool.this) {
- hasFailed = true;
- }
- }
+ public ConnectionPool(Connection connection) {
- public void transportInterupted() {
- }
+ this.connection = wrap(connection);
- public void transportResumed() {
- }
- });
+ // Create our internal Pool of session instances.
+ this.sessionPool = new GenericKeyedObjectPool<SessionKey, PooledSession>(
+ new KeyedPoolableObjectFactory<SessionKey, PooledSession>() {
+
+ @Override
+ public void activateObject(SessionKey key, PooledSession session) throws Exception {
+ ConnectionPool.this.loanedSessions.add(session);
+ }
+
+ @Override
+ public void destroyObject(SessionKey key, PooledSession session) throws Exception {
+ ConnectionPool.this.loanedSessions.remove(session);
+ session.getInternalSession().close();
+ }
- // make sure that we set the hasFailed flag, in case the transport already failed
- // prior to the addition of our new TransportListener
- if(connection.isTransportFailed()) {
- hasFailed = true;
- }
- */
- connection.setExceptionListener(new ExceptionListener() {
- public void onException(JMSException exception) {
- synchronized (ConnectionPool.this) {
- hasFailed = true;
+ @Override
+ public PooledSession makeObject(SessionKey key) throws Exception {
+ Session session = makeSession(key);
+ return new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers);
+ }
+
+ @Override
+ public void passivateObject(SessionKey key, PooledSession session) throws Exception {
+ ConnectionPool.this.loanedSessions.remove(session);
+ }
+
+ @Override
+ public boolean validateObject(SessionKey key, PooledSession session) {
+ return true;
+ }
}
- }
- });
+ );
+ }
+
+ // useful when external failure needs to force expiry
+ public void setHasExpired(boolean val) {
+ hasExpired = val;
+ }
+
+ protected Session makeSession(SessionKey key) throws JMSException {
+ return connection.createSession(key.isTransacted(), key.getAckMode());
+ }
+
+ protected Connection wrap(Connection connection) {
+ return connection;
}
- public ConnectionPool(Connection connection, ConcurrentHashMap<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
- this.connection = connection;
- this.cache = cache;
- this.poolFactory = poolFactory;
+ protected void unWrap(Connection connection) {
}
public void start() throws JMSException {
@@ -102,70 +123,28 @@ public class ConnectionPool {
}
}
- public synchronized javax.jms.Connection getConnection() {
+ public synchronized Connection getConnection() {
return connection;
}
public Session createSession(boolean transacted, int ackMode) throws JMSException {
SessionKey key = new SessionKey(transacted, ackMode);
- SessionPool pool = null;
- pool = cache.get(key);
- if (pool == null) {
- SessionPool newPool = createSessionPool(key);
- SessionPool prevPool = cache.putIfAbsent(key, newPool);
- if (prevPool != null && prevPool != newPool) {
- // newPool was not the first one to be associated with this
- // key... close created session pool
- try {
- newPool.close();
- } catch (Exception e) {
- throw new JMSException(e.getMessage());
- }
- }
- pool = cache.get(key); // this will return a non-null value...
- }
- PooledSession session = pool.borrowSession();
- this.loanedSessions.add(session);
- return session;
- }
-
-
- public Session createXaSession(boolean transacted, int ackMode) throws JMSException {
- SessionKey key = new SessionKey(transacted, ackMode);
- SessionPool pool = null;
- pool = cache.get(key);
- if (pool == null) {
- SessionPool newPool = createSessionPool(key);
- SessionPool prevPool = cache.putIfAbsent(key, newPool);
- if (prevPool != null && prevPool != newPool) {
- // newPool was not the first one to be associated with this
- // key... close created session pool
- try {
- newPool.close();
- } catch (Exception e) {
- throw new JMSException(e.getMessage());
- }
- }
- pool = cache.get(key); // this will return a non-null value...
+ PooledSession session;
+ try {
+ session = sessionPool.borrowObject(key);
+ } catch (Exception e) {
+ javax.jms.IllegalStateException illegalStateException = new IllegalStateException(e.toString());
+ illegalStateException.initCause(e);
+ throw illegalStateException;
}
- PooledSession session = pool.borrowSession();
- this.loanedSessions.add(session);
return session;
}
-
public synchronized void close() {
if (connection != null) {
try {
- Iterator<SessionPool> i = cache.values().iterator();
- while (i.hasNext()) {
- SessionPool pool = i.next();
- i.remove();
- try {
- pool.close();
- } catch (Exception e) {
- }
- }
+ sessionPool.close();
+ } catch (Exception e) {
} finally {
try {
connection.close();
@@ -186,8 +165,10 @@ public class ConnectionPool {
referenceCount--;
lastUsed = System.currentTimeMillis();
if (referenceCount == 0) {
- expiredCheck();
-
+ // Loaned sessions are those that are active in the sessionPool and
+ // have not been closed by the client before closing the connection.
+ // These need to be closed so that all session's reflect the fact
+ // that the parent Connection is closed.
for (PooledSession session : this.loanedSessions) {
try {
session.close();
@@ -196,40 +177,53 @@ public class ConnectionPool {
}
this.loanedSessions.clear();
- // only clean up temp destinations when all users
- // of this connection have called close
- if (getConnection() != null) {
- /*
- TODO: activemq specific
- getConnection().cleanUpTempDestinations();
- */
- }
+ unWrap(getConnection());
+
+ expiredCheck();
}
}
/**
+ * Determines if this Connection has expired.
+ * <p/>
+ * A ConnectionPool is considered expired when all references to it are released AND either
+ * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed.
+ * Once a ConnectionPool is determined to have expired its underlying Connection is closed.
+ *
* @return true if this connection has expired.
*/
public synchronized boolean expiredCheck() {
+
+ boolean expired = false;
+
if (connection == null) {
return true;
}
+
if (hasExpired) {
if (referenceCount == 0) {
close();
+ expired = true;
}
- return true;
}
- if (hasFailed
- || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
- || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
+
+ if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
hasExpired = true;
if (referenceCount == 0) {
close();
+ expired = true;
}
- return true;
}
- return false;
+
+ // Only set hasExpired here is no references, as a Connection with references is by
+ // definition not idle at this time.
+ if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) {
+ hasExpired = true;
+ close();
+ expired = true;
+ }
+
+ return expired;
}
public int getIdleTimeout() {
@@ -240,23 +234,99 @@ public class ConnectionPool {
this.idleTimeout = idleTimeout;
}
- protected SessionPool createSessionPool(SessionKey key) {
- return new SessionPool(this, key, poolFactory.createPool());
- }
-
public void setExpiryTimeout(long expiryTimeout) {
- this.expiryTimeout = expiryTimeout;
+ this.expiryTimeout = expiryTimeout;
}
public long getExpiryTimeout() {
return expiryTimeout;
}
- void onSessionReturned(PooledSession session) {
- this.loanedSessions.remove(session);
+ public int getMaximumActiveSessionPerConnection() {
+ return this.sessionPool.getMaxActive();
+ }
+
+ public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
+ this.sessionPool.setMaxActive(maximumActiveSessionPerConnection);
+ }
+
+ public boolean isUseAnonymousProducers() {
+ return this.useAnonymousProducers;
+ }
+
+ public void setUseAnonymousProducers(boolean value) {
+ this.useAnonymousProducers = value;
+ }
+
+ /**
+ * @return the total number of Pooled session including idle sessions that are not
+ * currently loaned out to any client.
+ */
+ public int getNumSessions() {
+ return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive();
+ }
+
+ /**
+ * @return the total number of Sessions that are in the Session pool but not loaned out.
+ */
+ public int getNumIdleSessions() {
+ return this.sessionPool.getNumIdle();
+ }
+
+ /**
+ * @return the total number of Session's that have been loaned to PooledConnection instances.
+ */
+ public int getNumActiveSessions() {
+ return this.sessionPool.getNumActive();
+ }
+
+ /**
+ * Configure whether the createSession method should block when there are no more idle sessions and the
+ * pool already contains the maximum number of active sessions. If false the create method will fail
+ * and throw an exception.
+ *
+ * @param block
+ * Indicates whether blocking should be used to wait for more space to create a session.
+ */
+ public void setBlockIfSessionPoolIsFull(boolean block) {
+ this.sessionPool.setWhenExhaustedAction(
+ (block ? GenericObjectPool.WHEN_EXHAUSTED_BLOCK : GenericObjectPool.WHEN_EXHAUSTED_FAIL));
+ }
+
+ public boolean isBlockIfSessionPoolIsFull() {
+ return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
+ }
+
+ /**
+ * Returns the timeout to use for blocking creating new sessions
+ *
+ * @return true if the pooled Connection createSession method will block when the limit is hit.
+ * @see #setBlockIfSessionPoolIsFull(boolean)
+ */
+ public long getBlockIfSessionPoolIsFullTimeout() {
+ return this.sessionPool.getMaxWait();
+ }
+
+ /**
+ * Controls the behavior of the internal session pool. By default the call to
+ * Connection.getSession() will block if the session pool is full. This setting
+ * will affect how long it blocks and throws an exception after the timeout.
+ *
+ * The size of the session pool is controlled by the @see #maximumActive
+ * property.
+ *
+ * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull
+ * property
+ *
+ * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true,
+ * then use this setting to configure how long to block before retry
+ */
+ public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
+ this.sessionPool.setMaxWait(blockIfSessionPoolIsFullTimeout);
}
- void onSessionInvalidated(PooledSession session) {
- this.loanedSessions.remove(session);
+ @Override
+ public String toString() {
+ return "ConnectionPool[" + connection + "]";
}
}
Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java Mon Jun 30 16:54:57 2014
@@ -39,6 +39,9 @@ public class GenericResourceManager {
private String resourceName;
+ private String userName;
+ private String password;
+
private TransactionManager transactionManager;
private ConnectionFactory connectionFactory;
@@ -64,6 +67,22 @@ public class GenericResourceManager {
}
}
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
public String getResourceName() {
return resourceName;
}
@@ -113,12 +132,17 @@ public class GenericResourceManager {
public NamedXAResource getNamedXAResource() throws SystemException {
try {
- final XAConnection activeConn = (XAConnection)connFactory.createXAConnection();
- final XASession session = (XASession)activeConn.createXASession();
- activeConn.start();
- LOGGER.debug("new namedXAResource's connection: " + activeConn);
+ final XAConnection xaConnection;
+ if (rm.getUserName() != null && rm.getPassword() != null) {
+ xaConnection = connFactory.createXAConnection(rm.getUserName(), rm.getPassword());
+ } else {
+ xaConnection = connFactory.createXAConnection();
+ }
+ final XASession session = xaConnection.createXASession();
+ xaConnection.start();
+ LOGGER.debug("new namedXAResource's connection: " + xaConnection);
- return new ConnectionAndWrapperNamedXAResource(session.getXAResource(), getName(), activeConn);
+ return new ConnectionAndWrapperNamedXAResource(session.getXAResource(), getName(), xaConnection);
} catch (Exception e) {
SystemException se = new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
se.initCause(e);
Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java Mon Jun 30 16:54:57 2014
@@ -16,24 +16,10 @@
*/
package org.apache.aries.transaction.jms.internal;
+import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
+import javax.jms.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,17 +35,25 @@ import org.slf4j.LoggerFactory;
* href="http://jencks.org/Message+Driven+POJOs">this example</a>
*
*/
-public class PooledConnection implements TopicConnection, QueueConnection {
+public class PooledConnection implements TopicConnection, QueueConnection, PooledSessionEventListener {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
- private ConnectionPool pool;
- private boolean stopped;
- private final CopyOnWriteArrayList<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
- private final CopyOnWriteArrayList<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
+ protected ConnectionPool pool;
+ private volatile boolean stopped;
+ private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
+ private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
+ private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
+ /**
+ * Creates a new PooledConnection instance that uses the given ConnectionPool to create
+ * and manage its resources. The ConnectionPool instance can be shared amongst many
+ * PooledConnection instances.
+ *
+ * @param pool
+ * The connection and pool manager backing this proxy connection object.
+ */
public PooledConnection(ConnectionPool pool) {
this.pool = pool;
- this.pool.incrementReferenceCount();
}
/**
@@ -69,55 +63,64 @@ public class PooledConnection implements
return new PooledConnection(pool);
}
+ @Override
public void close() throws JMSException {
this.cleanupConnectionTemporaryDestinations();
+ this.cleanupAllLoanedSessions();
if (this.pool != null) {
this.pool.decrementReferenceCount();
this.pool = null;
}
}
+ @Override
public void start() throws JMSException {
assertNotClosed();
pool.start();
}
+ @Override
public void stop() throws JMSException {
stopped = true;
}
- public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages)
- throws JMSException {
+ @Override
+ public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
}
+ @Override
public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
}
- public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i)
- throws JMSException {
+ @Override
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
}
+ @Override
public String getClientID() throws JMSException {
return getConnection().getClientID();
}
+ @Override
public ExceptionListener getExceptionListener() throws JMSException {
return getConnection().getExceptionListener();
}
+ @Override
public ConnectionMetaData getMetaData() throws JMSException {
return getConnection().getMetaData();
}
+ @Override
public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
getConnection().setExceptionListener(exceptionListener);
}
+ @Override
public void setClientID(String clientID) throws JMSException {
-
// ignore repeated calls to setClientID() with the same client id
// this could happen when a JMS component such as Spring that uses a
// PooledConnectionFactory shuts down and reinitializes.
@@ -126,51 +129,66 @@ public class PooledConnection implements
}
}
+ @Override
public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
}
// Session factory methods
// -------------------------------------------------------------------------
+ @Override
public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
return (QueueSession) createSession(transacted, ackMode);
}
+ @Override
public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
return (TopicSession) createSession(transacted, ackMode);
}
+ @Override
public Session createSession(boolean transacted, int ackMode) throws JMSException {
PooledSession result;
result = (PooledSession) pool.createSession(transacted, ackMode);
- // Add a temporary destination event listener to the session that notifies us when
- // the session creates temporary destinations.
- result.addTempDestEventListener(new PooledSessionEventListener() {
+ // Store the session so we can close the sessions that this PooledConnection
+ // created in order to ensure that consumers etc are closed per the JMS contract.
+ loanedSessions.add(result);
+
+ // Add a event listener to the session that notifies us when the session
+ // creates / destroys temporary destinations and closes etc.
+ result.addSessionEventListener(this);
+ return result;
+ }
- public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
- connTempQueues.add(tempQueue);
- }
+ // Implementation methods
+ // -------------------------------------------------------------------------
- public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
- connTempTopics.add(tempTopic);
- }
- });
+ @Override
+ public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
+ connTempQueues.add(tempQueue);
+ }
- return (Session) result;
+ @Override
+ public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
+ connTempTopics.add(tempTopic);
}
- // Implementation methods
- // -------------------------------------------------------------------------
+ @Override
+ public void onSessionClosed(PooledSession session) {
+ if (session != null) {
+ this.loanedSessions.remove(session);
+ }
+ }
public Connection getConnection() throws JMSException {
assertNotClosed();
return pool.getConnection();
}
- protected void assertNotClosed() throws JMSException {
+ protected void assertNotClosed() throws javax.jms.IllegalStateException {
if (stopped || pool == null) {
- throw new JMSException("Already closed");
+ throw new javax.jms.IllegalStateException("Connection closed");
}
}
@@ -178,6 +196,7 @@ public class PooledConnection implements
return getConnection().createSession(key.isTransacted(), key.getAckMode());
}
+ @Override
public String toString() {
return "PooledConnection { " + pool + " }";
}
@@ -210,4 +229,43 @@ public class PooledConnection implements
}
connTempTopics.clear();
}
+
+ /**
+ * The PooledSession tracks all Sessions that it created and now we close them. Closing the
+ * PooledSession will return the internal Session to the Pool of Session after cleaning up
+ * all the resources that the Session had allocated for this PooledConnection.
+ */
+ protected void cleanupAllLoanedSessions() {
+
+ for (PooledSession session : loanedSessions) {
+ try {
+ session.close();
+ } catch (JMSException ex) {
+ LOG.info("failed to close laoned Session \"" + session + "\" on closing pooled connection: " + ex.getMessage());
+ }
+ }
+ loanedSessions.clear();
+ }
+
+ /**
+ * @return the total number of Pooled session including idle sessions that are not
+ * currently loaned out to any client.
+ */
+ public int getNumSessions() {
+ return this.pool.getNumSessions();
+ }
+
+ /**
+ * @return the number of Sessions that are currently checked out of this Connection's session pool.
+ */
+ public int getNumActiveSessions() {
+ return this.pool.getNumActiveSessions();
+ }
+
+ /**
+ * @return the number of Sessions that are idle in this Connection's sessions pool.
+ */
+ public int getNumtIdleSessions() {
+ return this.pool.getNumIdleSessions();
+ }
}
Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java Mon Jun 30 16:54:57 2014
@@ -17,27 +17,30 @@
package org.apache.aries.transaction.jms.internal;
import javax.jms.Destination;
+import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
/**
* A pooled {@link MessageProducer}
- *
- *
*/
public class PooledProducer implements MessageProducer {
- private MessageProducer messageProducer;
- private Destination destination;
+
+ private final MessageProducer messageProducer;
+ private final Destination destination;
+
private int deliveryMode;
private boolean disableMessageID;
private boolean disableMessageTimestamp;
private int priority;
private long timeToLive;
+ private boolean anonymous = true;
public PooledProducer(MessageProducer messageProducer, Destination destination) throws JMSException {
this.messageProducer = messageProducer;
this.destination = destination;
+ this.anonymous = messageProducer.getDestination() == null;
this.deliveryMode = messageProducer.getDeliveryMode();
this.disableMessageID = messageProducer.getDisableMessageID();
@@ -46,73 +49,104 @@ public class PooledProducer implements M
this.timeToLive = messageProducer.getTimeToLive();
}
+ @Override
public void close() throws JMSException {
+ if (!anonymous) {
+ this.messageProducer.close();
+ }
}
+ @Override
public void send(Destination destination, Message message) throws JMSException {
send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
}
+ @Override
public void send(Message message) throws JMSException {
send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
}
+ @Override
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
send(destination, message, deliveryMode, priority, timeToLive);
}
+ @Override
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+
if (destination == null) {
- destination = this.destination;
+ if (messageProducer.getDestination() == null) {
+ throw new UnsupportedOperationException("A destination must be specified.");
+ }
+ throw new InvalidDestinationException("Don't understand null destinations");
}
+
MessageProducer messageProducer = getMessageProducer();
// just in case let only one thread send at once
synchronized (messageProducer) {
+
+ if (anonymous && this.destination != null && !this.destination.equals(destination)) {
+ throw new UnsupportedOperationException("This producer can only send messages to: " + this.destination);
+ }
+
+ // Producer will do it's own Destination validation so always use the destination
+ // based send method otherwise we might violate a JMS rule.
messageProducer.send(destination, message, deliveryMode, priority, timeToLive);
}
}
+ @Override
public Destination getDestination() {
return destination;
}
+ @Override
public int getDeliveryMode() {
return deliveryMode;
}
+ @Override
public void setDeliveryMode(int deliveryMode) {
this.deliveryMode = deliveryMode;
}
+ @Override
public boolean getDisableMessageID() {
return disableMessageID;
}
+ @Override
public void setDisableMessageID(boolean disableMessageID) {
this.disableMessageID = disableMessageID;
}
+ @Override
public boolean getDisableMessageTimestamp() {
return disableMessageTimestamp;
}
+ @Override
public void setDisableMessageTimestamp(boolean disableMessageTimestamp) {
this.disableMessageTimestamp = disableMessageTimestamp;
}
+ @Override
public int getPriority() {
return priority;
}
+ @Override
public void setPriority(int priority) {
this.priority = priority;
}
+ @Override
public long getTimeToLive() {
return timeToLive;
}
+ @Override
public void setTimeToLive(long timeToLive) {
this.timeToLive = timeToLive;
}
@@ -123,8 +157,12 @@ public class PooledProducer implements M
return messageProducer;
}
+ protected boolean isAnonymous() {
+ return anonymous;
+ }
+
+ @Override
public String toString() {
return "PooledProducer { " + messageProducer + " }";
}
-
}
Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java Mon Jun 30 16:54:57 2014
@@ -23,34 +23,42 @@ import java.util.concurrent.CopyOnWriteA
import javax.jms.*;
import javax.transaction.xa.XAResource;
+import org.apache.commons.pool.KeyedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PooledSession implements Session, TopicSession, QueueSession {
+public class PooledSession implements Session, TopicSession, QueueSession, XASession {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
+ private final SessionKey key;
+ private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
+ private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
+ private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
+ private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>();
+
+ private MessageProducer producer;
+ private TopicPublisher publisher;
+ private QueueSender sender;
+
private Session session;
- private SessionPool sessionPool;
- private MessageProducer messageProducer;
- private QueueSender queueSender;
- private TopicPublisher topicPublisher;
private boolean transactional = true;
private boolean ignoreClose;
-
- private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
- private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
- private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
- new CopyOnWriteArrayList<PooledSessionEventListener>();
private boolean isXa;
+ private boolean useAnonymousProducers = true;
- public PooledSession(Session session, SessionPool sessionPool, boolean transactional) {
+ public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional, boolean anonymous) {
+ this.key = key;
this.session = session;
this.sessionPool = sessionPool;
this.transactional = transactional;
+ this.useAnonymousProducers = anonymous;
}
- public void addTempDestEventListener(PooledSessionEventListener listener) {
- this.tempDestEventListeners.add(listener);
+ public void addSessionEventListener(PooledSessionEventListener listener) {
+ // only add if really needed
+ if (!sessionEventListeners.contains(listener)) {
+ this.sessionEventListeners.add(listener);
+ }
}
protected boolean isIgnoreClose() {
@@ -61,10 +69,9 @@ public class PooledSession implements Se
this.ignoreClose = ignoreClose;
}
+ @Override
public void close() throws JMSException {
if (!ignoreClose) {
- // TODO a cleaner way to reset??
-
boolean invalidate = false;
try {
// lets reset the session
@@ -95,11 +102,15 @@ public class PooledSession implements Se
} finally {
consumers.clear();
browsers.clear();
+ for (PooledSessionEventListener listener : this.sessionEventListeners) {
+ listener.onSessionClosed(this);
+ }
+ sessionEventListeners.clear();
}
if (invalidate) {
- // lets close the session and not put the session back into
- // the pool
+ // lets close the session and not put the session back into the pool
+ // instead invalidate it so the pool can create a new one on demand.
if (session != null) {
try {
session.close();
@@ -108,114 +119,145 @@ public class PooledSession implements Se
}
session = null;
}
- sessionPool.invalidateSession(this);
+ try {
+ sessionPool.invalidateObject(key, this);
+ } catch (Exception e) {
+ LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
+ }
} else {
- sessionPool.returnSession(this);
+ try {
+ sessionPool.returnObject(key, this);
+ } catch (Exception e) {
+ javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
+ illegalStateException.initCause(e);
+ throw illegalStateException;
+ }
}
}
}
+ @Override
public void commit() throws JMSException {
getInternalSession().commit();
}
+ @Override
public BytesMessage createBytesMessage() throws JMSException {
return getInternalSession().createBytesMessage();
}
+ @Override
public MapMessage createMapMessage() throws JMSException {
return getInternalSession().createMapMessage();
}
+ @Override
public Message createMessage() throws JMSException {
return getInternalSession().createMessage();
}
+ @Override
public ObjectMessage createObjectMessage() throws JMSException {
return getInternalSession().createObjectMessage();
}
+ @Override
public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
return getInternalSession().createObjectMessage(serializable);
}
+ @Override
public Queue createQueue(String s) throws JMSException {
return getInternalSession().createQueue(s);
}
+ @Override
public StreamMessage createStreamMessage() throws JMSException {
return getInternalSession().createStreamMessage();
}
+ @Override
public TemporaryQueue createTemporaryQueue() throws JMSException {
TemporaryQueue result;
result = getInternalSession().createTemporaryQueue();
// Notify all of the listeners of the created temporary Queue.
- for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+ for (PooledSessionEventListener listener : this.sessionEventListeners) {
listener.onTemporaryQueueCreate(result);
}
return result;
}
+ @Override
public TemporaryTopic createTemporaryTopic() throws JMSException {
TemporaryTopic result;
result = getInternalSession().createTemporaryTopic();
// Notify all of the listeners of the created temporary Topic.
- for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+ for (PooledSessionEventListener listener : this.sessionEventListeners) {
listener.onTemporaryTopicCreate(result);
}
return result;
}
+ @Override
public void unsubscribe(String s) throws JMSException {
getInternalSession().unsubscribe(s);
}
+ @Override
public TextMessage createTextMessage() throws JMSException {
return getInternalSession().createTextMessage();
}
+ @Override
public TextMessage createTextMessage(String s) throws JMSException {
return getInternalSession().createTextMessage(s);
}
+ @Override
public Topic createTopic(String s) throws JMSException {
return getInternalSession().createTopic(s);
}
+ @Override
public int getAcknowledgeMode() throws JMSException {
return getInternalSession().getAcknowledgeMode();
}
+ @Override
public boolean getTransacted() throws JMSException {
return getInternalSession().getTransacted();
}
+ @Override
public void recover() throws JMSException {
getInternalSession().recover();
}
+ @Override
public void rollback() throws JMSException {
getInternalSession().rollback();
}
+ @Override
public XAResource getXAResource() {
- if (session == null) {
- throw new IllegalStateException("Session is closed");
+ if (session instanceof XASession) {
+ return ((XASession) session).getXAResource();
}
- return ((XASession) session).getXAResource();
+ return null;
}
+ @Override
public Session getSession() {
return this;
}
+ @Override
public void run() {
if (session != null) {
session.run();
@@ -224,112 +266,168 @@ public class PooledSession implements Se
// Consumer related methods
// -------------------------------------------------------------------------
+ @Override
public QueueBrowser createBrowser(Queue queue) throws JMSException {
return addQueueBrowser(getInternalSession().createBrowser(queue));
}
+ @Override
public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
}
+ @Override
public MessageConsumer createConsumer(Destination destination) throws JMSException {
return addConsumer(getInternalSession().createConsumer(destination));
}
+ @Override
public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
return addConsumer(getInternalSession().createConsumer(destination, selector));
}
+ @Override
public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
}
+ @Override
public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
}
+ @Override
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
}
+ @Override
public MessageListener getMessageListener() throws JMSException {
return getInternalSession().getMessageListener();
}
+ @Override
public void setMessageListener(MessageListener messageListener) throws JMSException {
getInternalSession().setMessageListener(messageListener);
}
+ @Override
public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic));
}
+ @Override
public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local));
}
+ @Override
public QueueReceiver createReceiver(Queue queue) throws JMSException {
return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue));
}
+ @Override
public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector));
}
// Producer related methods
// -------------------------------------------------------------------------
+ @Override
public MessageProducer createProducer(Destination destination) throws JMSException {
- return new PooledProducer(getMessageProducer(), destination);
+ return new PooledProducer(getMessageProducer(destination), destination);
}
+ @Override
public QueueSender createSender(Queue queue) throws JMSException {
- return new PooledQueueSender(getQueueSender(), queue);
+ return new PooledQueueSender(getQueueSender(queue), queue);
}
+ @Override
public TopicPublisher createPublisher(Topic topic) throws JMSException {
- return new PooledTopicPublisher(getTopicPublisher(), topic);
- }
-
- /**
- * Callback invoked when the consumer is closed.
- * <p/>
- * This is used to keep track of an explicit closed consumer created by this
- * session, by which we know do not need to keep track of the consumer, as
- * its already closed.
- *
- * @param consumer
- * the consumer which is being closed
- */
- protected void onConsumerClose(MessageConsumer consumer) {
- consumers.remove(consumer);
+ return new PooledTopicPublisher(getTopicPublisher(topic), topic);
}
- public Session getInternalSession() throws JMSException {
+ public Session getInternalSession() throws IllegalStateException {
if (session == null) {
- throw new JMSException("The session has already been closed");
+ throw new IllegalStateException("The session has already been closed");
}
return session;
}
public MessageProducer getMessageProducer() throws JMSException {
- if (messageProducer == null) {
- messageProducer = getInternalSession().createProducer(null);
+ return getMessageProducer(null);
+ }
+
+ public MessageProducer getMessageProducer(Destination destination) throws JMSException {
+ MessageProducer result = null;
+
+ if (useAnonymousProducers) {
+ if (producer == null) {
+ // Don't allow for duplicate anonymous producers.
+ synchronized (this) {
+ if (producer == null) {
+ producer = getInternalSession().createProducer(null);
+ }
+ }
+ }
+
+ result = producer;
+ } else {
+ result = getInternalSession().createProducer(destination);
}
- return messageProducer;
+
+ return result;
}
public QueueSender getQueueSender() throws JMSException {
- if (queueSender == null) {
- queueSender = ((QueueSession) getInternalSession()).createSender(null);
+ return getQueueSender(null);
+ }
+
+ public QueueSender getQueueSender(Queue destination) throws JMSException {
+ QueueSender result = null;
+
+ if (useAnonymousProducers) {
+ if (sender == null) {
+ // Don't allow for duplicate anonymous producers.
+ synchronized (this) {
+ if (sender == null) {
+ sender = ((QueueSession) getInternalSession()).createSender(null);
+ }
+ }
+ }
+
+ result = sender;
+ } else {
+ result = ((QueueSession) getInternalSession()).createSender(destination);
}
- return queueSender;
+
+ return result;
}
public TopicPublisher getTopicPublisher() throws JMSException {
- if (topicPublisher == null) {
- topicPublisher = ((TopicSession) getInternalSession()).createPublisher(null);
+ return getTopicPublisher(null);
+ }
+
+ public TopicPublisher getTopicPublisher(Topic destination) throws JMSException {
+ TopicPublisher result = null;
+
+ if (useAnonymousProducers) {
+ if (publisher == null) {
+ // Don't allow for duplicate anonymous producers.
+ synchronized (this) {
+ if (publisher == null) {
+ publisher = ((TopicSession) getInternalSession()).createPublisher(null);
+ }
+ }
+ }
+
+ result = publisher;
+ } else {
+ result = ((TopicSession) getInternalSession()).createPublisher(destination);
}
- return topicPublisher;
+
+ return result;
}
private QueueBrowser addQueueBrowser(QueueBrowser browser) {
@@ -340,10 +438,8 @@ public class PooledSession implements Se
private MessageConsumer addConsumer(MessageConsumer consumer) {
consumers.add(consumer);
// must wrap in PooledMessageConsumer to ensure the onConsumerClose
- // method is invoked
- // when the returned consumer is closed, to avoid memory leak in this
- // session class
- // in case many consumers is created
+ // method is invoked when the returned consumer is closed, to avoid memory
+ // leak in this session class in case many consumers is created
return new PooledMessageConsumer(this, consumer);
}
@@ -361,7 +457,22 @@ public class PooledSession implements Se
this.isXa = isXa;
}
+ @Override
public String toString() {
return "PooledSession { " + session + " }";
}
+
+ /**
+ * Callback invoked when the consumer is closed.
+ * <p/>
+ * This is used to keep track of an explicit closed consumer created by this
+ * session, by which we know do not need to keep track of the consumer, as
+ * its already closed.
+ *
+ * @param consumer
+ * the consumer which is being closed
+ */
+ protected void onConsumerClose(MessageConsumer consumer) {
+ consumers.remove(consumer);
+ }
}
Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java Mon Jun 30 16:54:57 2014
@@ -25,7 +25,7 @@ interface PooledSessionEventListener {
* Called on successful creation of a new TemporaryQueue.
*
* @param tempQueue
- * The TemporaryQueue just created.
+ * The TemporaryQueue just created.
*/
void onTemporaryQueueCreate(TemporaryQueue tempQueue);
@@ -33,8 +33,16 @@ interface PooledSessionEventListener {
* Called on successful creation of a new TemporaryTopic.
*
* @param tempTopic
- * The TemporaryTopic just created.
+ * The TemporaryTopic just created.
*/
void onTemporaryTopicCreate(TemporaryTopic tempTopic);
+ /**
+ * Called when the PooledSession is closed.
+ *
+ * @param session
+ * The PooledSession that has been closed.
+ */
+ void onSessionClosed(PooledSession session);
+
}
Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java Mon Jun 30 16:54:57 2014
@@ -16,6 +16,7 @@
*/
package org.apache.aries.transaction.jms.internal;
+import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.XAConnection;
import javax.jms.XASession;
@@ -29,8 +30,8 @@ public class RecoverableConnectionPool e
private String name;
- public RecoverableConnectionPool(XAConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) throws JMSException {
- super(connection, poolFactory, transactionManager);
+ public RecoverableConnectionPool(Connection connection, TransactionManager transactionManager, String name) {
+ super(connection, transactionManager);
this.name = name;
}
Modified: aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java
URL: http://svn.apache.org/viewvc/aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java?rev=1606837&r1=1606836&r2=1606837&view=diff
==============================================================================
--- aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java (original)
+++ aries/branches/subsystemsR6/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java Mon Jun 30 16:54:57 2014
@@ -22,8 +22,10 @@ package org.apache.aries.transaction.jms
*
*/
public class SessionKey {
+
private boolean transacted;
private int ackMode;
+
private int hash;
public SessionKey(boolean transacted, int ackMode) {