You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/09/01 15:02:29 UTC
svn commit: r1164058 - in /activemq/trunk/activemq-pool/src:
main/java/org/apache/activemq/pool/PooledConnectionFactory.java
test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java
Author: tabish
Date: Thu Sep 1 13:02:28 2011
New Revision: 1164058
URL: http://svn.apache.org/viewvc?rev=1164058&view=rev
Log:
apply fix for: https://issues.apache.org/jira/browse/AMQ-3482 to return default behavior to blocking.
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java
Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java?rev=1164058&r1=1164057&r2=1164058&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java Thu Sep 1 13:02:28 2011
@@ -40,23 +40,23 @@ import org.apache.commons.pool.impl.Gene
* href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
* Connections, sessions and producers are returned to a pool after use so that they can be reused later
* without having to undergo the cost of creating them again.
- *
+ *
* b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
- * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
+ * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
* are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
* just created at startup and left active, handling incoming messages as they come. When a consumer is
- * complete, it is best to close it rather than return it to a pool for later reuse: this is because,
+ * complete, it is best to close it rather than return it to a pool for later reuse: this is because,
* even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
* where they'll get held until the consumer is active again.
- *
+ *
* If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
- * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
- * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
+ * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
+ * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
* http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
- *
+ *
* @org.apache.xbean.XBean element="pooledConnectionFactory"
- *
- *
+ *
+ *
*/
public class PooledConnectionFactory implements ConnectionFactory, Service {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
@@ -66,7 +66,7 @@ public class PooledConnectionFactory imp
private int maximumActive = 500;
private int maxConnections = 1;
private int idleTimeout = 30 * 1000;
- private boolean blockIfSessionPoolIsFull = false;
+ private boolean blockIfSessionPoolIsFull = true;
private AtomicBoolean stopped = new AtomicBoolean(false);
private long expiryTimeout = 0l;
@@ -99,7 +99,7 @@ public class PooledConnectionFactory imp
LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
return null;
}
-
+
ConnectionKey key = new ConnectionKey(userName, password);
LinkedList<ConnectionPool> pools = cache.get(key);
@@ -195,25 +195,21 @@ public class PooledConnectionFactory imp
public void setMaximumActive(int maximumActive) {
this.maximumActive = maximumActive;
}
-
+
/**
- * Controls the behavior of the internal session pool.
- * By default the call to Connection.getSession() will
- * return a JMSException if the session pool is full.
- * If the argument true is given, it will change the
- * default behavior and instead the call to getSession()
- * will block until a session is available in the pool, which
- * used to be the default behavior in ActiveMQ versions < 5.6.
- *
- * The size of the session pool is controlled by the @see #maximumActive
+ * Controls the behavior of the internal session pool. By default the call to
+ * Connection.getSession() will block if the session pool is full. If the
+ * argument false is given, it will change the default behavior and instead the
+ * call to getSession() will throw a JMSException.
+ *
+ * The size of the session pool is controlled by the @see #maximumActive
* property.
- *
- * @param block - if true, the call to getSession() blocks if the pool
- * is full until a session object is available.
- * defaults to false.
+ *
+ * @param block - if true, the call to getSession() blocks if the pool is full
+ * until a session object is available. defaults to true.
*/
public void setBlockIfSessionPoolIsFull(boolean block) {
- this.blockIfSessionPoolIsFull = block;
+ this.blockIfSessionPoolIsFull = block;
}
/**
@@ -233,18 +229,18 @@ public class PooledConnectionFactory imp
/**
* Creates an ObjectPoolFactory. Its behavior is controlled by the two
* properties @see #maximumActive and @see #blockIfSessionPoolIsFull.
- *
+ *
* @return the newly created but empty ObjectPoolFactory
*/
protected ObjectPoolFactory createPoolFactory() {
- if (blockIfSessionPoolIsFull) {
- return new GenericObjectPoolFactory(null, maximumActive);
- } else {
- return new GenericObjectPoolFactory(null,
- maximumActive,
- GenericObjectPool.WHEN_EXHAUSTED_FAIL,
- GenericObjectPool.DEFAULT_MAX_WAIT);
- }
+ if (blockIfSessionPoolIsFull) {
+ return new GenericObjectPoolFactory(null, maximumActive);
+ } else {
+ return new GenericObjectPoolFactory(null,
+ maximumActive,
+ GenericObjectPool.WHEN_EXHAUSTED_FAIL,
+ GenericObjectPool.DEFAULT_MAX_WAIT);
+ }
}
public int getIdleTimeout() {
@@ -258,13 +254,13 @@ public class PooledConnectionFactory imp
/**
* allow connections to expire, irrespective of load or idle time. This is useful with failover
* to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
- *
+ *
* @param expiryTimeout non zero in milliseconds
*/
public void setExpiryTimeout(long expiryTimeout) {
- this.expiryTimeout = expiryTimeout;
+ this.expiryTimeout = expiryTimeout;
}
-
+
public long getExpiryTimeout() {
return expiryTimeout;
}
Modified: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java?rev=1164058&r1=1164057&r2=1164058&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java (original)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryTest.java Thu Sep 1 13:02:28 2011
@@ -22,18 +22,18 @@ import org.apache.log4j.Logger;
/**
* Checks the behavior of the PooledConnectionFactory when the maximum amount
- * of sessions is being reached.
+ * of sessions is being reached.
* Older versions simply block in the call to Connection.getSession(), which isn't good.
* An exception being returned is the better option, so JMS clients don't block.
- * This test succeeds if an exception is returned and fails if the call to getSession()
+ * This test succeeds if an exception is returned and fails if the call to getSession()
* blocks.
- *
+ *
*/
public class PooledConnectionFactoryTest extends TestCase
{
- public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryTest.class);
-
-
+ public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryTest.class);
+
+
/**
* Create the test case
*
@@ -53,95 +53,87 @@ public class PooledConnectionFactoryTest
}
/**
- * Tests the behavior of the sessionPool of the PooledConnectionFactory
- * when maximum number of sessions are reached. In older versions the call to
- * Connection.createSession() would simply block indefinitely if the maximum
- * number of sessions got reached (controled by
- * PooledConnectionFactory.setMaximumActive()).
- * Rather than blocking the entire thread, it should raise an exception
- * instead.
+ * Tests the behavior of the sessionPool of the PooledConnectionFactory
+ * when maximum number of sessions are reached.
*/
public void testApp() throws Exception
- {
- // using separate thread for testing so that we can interrupt the test
- // if the call to get a new session blocks.
-
- // start test runner thread
- ExecutorService executor = Executors.newSingleThreadExecutor();
- Future<Boolean> result = (Future<Boolean>) executor.submit(new TestRunner());
-
- // test should not take > 5secs, so test fails i
- Thread.sleep(5*1000);
-
- if (!result.isDone() || !result.get().booleanValue()) {
- PooledConnectionFactoryTest.LOG.error("2nd call to createSession()" +
- " is blocking but should have returned an error instead.");
-
- executor.shutdownNow();
-
- Assert.fail("SessionPool inside PooledConnectionFactory is blocking if " +
- "limit is exceeded but should return an exception instead.");
- }
+ {
+ // using separate thread for testing so that we can interrupt the test
+ // if the call to get a new session blocks.
+
+ // start test runner thread
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future<Boolean> result = (Future<Boolean>) executor.submit(new TestRunner());
+
+ // test should not take > 5secs, so test fails i
+ Thread.sleep(5*1000);
+
+ if (!result.isDone() || !result.get().booleanValue()) {
+ PooledConnectionFactoryTest.LOG.error("2nd call to createSession()" +
+ " is blocking but should have returned an error instead.");
+
+ executor.shutdownNow();
+
+ Assert.fail("SessionPool inside PooledConnectionFactory is blocking if " +
+ "limit is exceeded but should return an exception instead.");
+ }
}
}
class TestRunner implements Callable<Boolean> {
-
- public final static Logger LOG = Logger.getLogger(TestRunner.class);
-
- /**
- * @return true if test succeeded, false otherwise
- */
- public Boolean call() {
-
- Connection conn = null;
- Session one = null;
-
- // wait at most 5 seconds for the call to createSession
- try {
- ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
- PooledConnectionFactory cf = new PooledConnectionFactory(amq);
- cf.setMaxConnections(3);
- cf.setMaximumActive(1);
-
- // default should be false already but lets make sure a change to the default
- // setting does not make this test fail.
- cf.setBlockIfSessionPoolIsFull(false);
-
- conn = cf.createConnection();
- one = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Session two = null;
- try {
- // this should raise an exception as we called setMaximumActive(1)
- two = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- two.close();
-
- LOG.error("Expected JMSException wasn't thrown.");
- Assert.fail("seconds call to Connection.createSession() was supposed" +
- "to raise an JMSException as internal session pool" +
- "is exhausted. This did not happen and indiates a problem");
- return new Boolean(false);
- } catch (JMSException ex) {
- if (ex.getCause().getClass() == java.util.NoSuchElementException.class) {
- //expected, ignore but log
- LOG.info("Caught expected " + ex);
- } else {
- LOG.error(ex);
- return new Boolean(false);
- }
- } finally {
- if (one != null)
- one.close();
- if (conn != null)
- conn.close();
- }
- } catch (Exception ex) {
- LOG.error(ex.getMessage());
- return new Boolean(false);
- }
-
- // all good, test succeeded
- return new Boolean(true);
- }
+
+ public final static Logger LOG = Logger.getLogger(TestRunner.class);
+
+ /**
+ * @return true if test succeeded, false otherwise
+ */
+ public Boolean call() {
+
+ Connection conn = null;
+ Session one = null;
+
+ // wait at most 5 seconds for the call to createSession
+ try {
+ ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+ PooledConnectionFactory cf = new PooledConnectionFactory(amq);
+ cf.setMaxConnections(3);
+ cf.setMaximumActive(1);
+ cf.setBlockIfSessionPoolIsFull(false);
+
+ conn = cf.createConnection();
+ one = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session two = null;
+ try {
+ // this should raise an exception as we called setMaximumActive(1)
+ two = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ two.close();
+
+ LOG.error("Expected JMSException wasn't thrown.");
+ Assert.fail("seconds call to Connection.createSession() was supposed" +
+ "to raise an JMSException as internal session pool" +
+ "is exhausted. This did not happen and indiates a problem");
+ return new Boolean(false);
+ } catch (JMSException ex) {
+ if (ex.getCause().getClass() == java.util.NoSuchElementException.class) {
+ //expected, ignore but log
+ LOG.info("Caught expected " + ex);
+ } else {
+ LOG.error(ex);
+ return new Boolean(false);
+ }
+ } finally {
+ if (one != null)
+ one.close();
+ if (conn != null)
+ conn.close();
+ }
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage());
+ return new Boolean(false);
+ }
+
+ // all good, test succeeded
+ return new Boolean(true);
+ }
}