You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2020/02/29 06:09:05 UTC
[activemq] branch activemq-5.15.x updated: [AMQ-7131] Add
connectionTimeout to avoid starvation
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
new 314d898 [AMQ-7131] Add connectionTimeout to avoid starvation
314d898 is described below
commit 314d898ebbd41434c95ec337df598b07a9bcd60e
Author: Benjamin Graf <be...@gmx.net>
AuthorDate: Thu Jan 10 20:33:10 2019 +0100
[AMQ-7131] Add connectionTimeout to avoid starvation
(cherry picked from commit 92dec52e1c1ecd26014fa62f5aad7072ecdbb265)
---
.../activemq/jms/pool/PooledConnectionFactory.java | 52 ++++++++++++++++++----
.../jms/pool/PooledConnectionFactoryTest.java | 26 ++++++++---
2 files changed, 62 insertions(+), 16 deletions(-)
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
index 967f46e..11fbbbb 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.jms.pool;
+import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -74,6 +75,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
private int maximumActiveSessionPerConnection = 500;
private int idleTimeout = 30 * 1000;
+ private int connectionTimeout = 30 * 1000;
private boolean blockIfSessionPoolIsFull = true;
private long blockIfSessionPoolIsFullTimeout = -1L;
private long expiryTimeout = 0l;
@@ -144,6 +146,9 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
}, poolConfig);
+ // Set max wait time to control borrow from pool.
+ this.connectionsPool.setMaxWaitMillis(getConnectionTimeout());
+
// Set max idle (not max active) since our connections always idle in the pool.
this.connectionsPool.setMaxIdlePerKey(1);
this.connectionsPool.setLifo(false);
@@ -232,16 +237,24 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
// under lock to prevent another thread from triggering an expiration check and
// pulling the rug out from under us.
while (connection == null) {
- connection = connectionsPool.borrowObject(key);
- synchronized (connection) {
- if (connection.getConnection() != null) {
- connection.incrementReferenceCount();
- break;
+ try {
+ connection = connectionsPool.borrowObject(key);
+ } catch (NoSuchElementException ex) {
+ if (!"Timeout waiting for idle object".equals(ex.getMessage())) {
+ throw ex;
+ }
+ }
+ if (connection != null) {
+ synchronized (connection) {
+ if (connection.getConnection() != null) {
+ connection.incrementReferenceCount();
+ break;
+ }
+
+ // Return the bad one to the pool and let if get destroyed as normal.
+ connectionsPool.returnObject(key, connection);
+ connection = null;
}
-
- // Return the bad one to the pool and let if get destroyed as normal.
- connectionsPool.returnObject(key, connection);
- connection = null;
}
}
} catch (Exception e) {
@@ -421,6 +434,27 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
}
/**
+ * Gets the connection timeout value. The maximum time waited to get a Connection from the pool.
+ * The default value is 30 seconds.
+ *
+ * @return connection timeout value (milliseconds)
+ */
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ /**
+ * Sets the connection timeout value for getting Connections from this pool in Milliseconds,
+ * defaults to 30 seconds.
+ *
+ * @param connectionTimeout
+ * The maximum time to wait for getting a pooled Connection.
+ */
+ public void setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ /**
* allow connections to expire, irrespective of load or idle time. This is useful with failover
* to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
*
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
index 0be8108..a13f312 100644
--- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
@@ -16,13 +16,6 @@
*/
package org.apache.activemq.jms.pool;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -46,6 +39,8 @@ import org.apache.activemq.util.Wait;
import org.apache.log4j.Logger;
import org.junit.Test;
+import static org.junit.Assert.*;
+
/**
* Checks the behavior of the PooledConnectionFactory when the maximum amount of
* sessions is being reached.
@@ -67,6 +62,23 @@ public class PooledConnectionFactoryTest extends JmsPoolTestSupport {
pcf.stop();
}
+ @Test(timeout = 120000)
+ public void testConnectionTimeout() throws Exception {
+ ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false&broker.useJmx=false");
+ PooledConnectionFactory cf = new PooledConnectionFactory();
+ cf.setConnectionFactory(amq);
+ cf.setConnectionTimeout(100);
+
+ PooledConnection connection = (PooledConnection) cf.createConnection();
+ assertEquals(1, cf.getNumConnections());
+
+ // wait for the connection timeout
+ Thread.sleep(300);
+
+ connection = (PooledConnection) cf.createConnection();
+ assertEquals(1, cf.getNumConnections());
+ }
+
@Test(timeout = 60000)
public void testClearAllConnections() throws Exception {