You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2020/02/29 06:09:05 UTC

[activemq] branch activemq-5.15.x updated: [AMQ-7131] Add connectionTimeout to avoid starvation

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
     new 314d898  [AMQ-7131] Add connectionTimeout to avoid starvation
314d898 is described below

commit 314d898ebbd41434c95ec337df598b07a9bcd60e
Author: Benjamin Graf <be...@gmx.net>
AuthorDate: Thu Jan 10 20:33:10 2019 +0100

    [AMQ-7131] Add connectionTimeout to avoid starvation
    
    (cherry picked from commit 92dec52e1c1ecd26014fa62f5aad7072ecdbb265)
---
 .../activemq/jms/pool/PooledConnectionFactory.java | 52 ++++++++++++++++++----
 .../jms/pool/PooledConnectionFactoryTest.java      | 26 ++++++++---
 2 files changed, 62 insertions(+), 16 deletions(-)

diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
index 967f46e..11fbbbb 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.jms.pool;
 
+import java.util.NoSuchElementException;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -74,6 +75,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
 
     private int maximumActiveSessionPerConnection = 500;
     private int idleTimeout = 30 * 1000;
+    private int connectionTimeout = 30 * 1000;
     private boolean blockIfSessionPoolIsFull = true;
     private long blockIfSessionPoolIsFullTimeout = -1L;
     private long expiryTimeout = 0l;
@@ -144,6 +146,9 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
 
                 }, poolConfig);
 
+            // Set max wait time to control borrow from pool.
+            this.connectionsPool.setMaxWaitMillis(getConnectionTimeout());
+
             // Set max idle (not max active) since our connections always idle in the pool.
             this.connectionsPool.setMaxIdlePerKey(1);
             this.connectionsPool.setLifo(false);
@@ -232,16 +237,24 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
                 // under lock to prevent another thread from triggering an expiration check and
                 // pulling the rug out from under us.
                 while (connection == null) {
-                    connection = connectionsPool.borrowObject(key);
-                    synchronized (connection) {
-                        if (connection.getConnection() != null) {
-                            connection.incrementReferenceCount();
-                            break;
+                    try {
+                        connection = connectionsPool.borrowObject(key);
+                    } catch (NoSuchElementException ex) {
+                        if (!"Timeout waiting for idle object".equals(ex.getMessage())) {
+                            throw ex;
+                        }
+                    }
+                    if (connection != null) {
+                        synchronized (connection) {
+                            if (connection.getConnection() != null) {
+                                connection.incrementReferenceCount();
+                                break;
+                            }
+
+                            // Return the bad one to the pool and let if get destroyed as normal.
+                            connectionsPool.returnObject(key, connection);
+                            connection = null;
                         }
-
-                        // Return the bad one to the pool and let if get destroyed as normal.
-                        connectionsPool.returnObject(key, connection);
-                        connection = null;
                     }
                 }
             } catch (Exception e) {
@@ -421,6 +434,27 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
     }
 
     /**
+     * Gets the connection timeout value. The maximum time waited to get a Connection from the pool.
+     * The default value is 30 seconds.
+     *
+     * @return connection timeout value (milliseconds)
+     */
+    public int getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    /**
+     * Sets the connection timeout value for getting Connections from this pool in Milliseconds,
+     * defaults to 30 seconds.
+     *
+     * @param connectionTimeout
+     *      The maximum time to wait for getting a pooled Connection.
+     */
+    public void setConnectionTimeout(int connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    /**
      * allow connections to expire, irrespective of load or idle time. This is useful with failover
      * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
      *
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
index 0be8108..a13f312 100644
--- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
@@ -16,13 +16,6 @@
  */
 package org.apache.activemq.jms.pool;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -46,6 +39,8 @@ import org.apache.activemq.util.Wait;
 import org.apache.log4j.Logger;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 /**
  * Checks the behavior of the PooledConnectionFactory when the maximum amount of
  * sessions is being reached.
@@ -67,6 +62,23 @@ public class PooledConnectionFactoryTest extends JmsPoolTestSupport {
         pcf.stop();
     }
 
+    @Test(timeout = 120000)
+    public void testConnectionTimeout() throws Exception {
+        ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false&broker.useJmx=false");
+        PooledConnectionFactory cf = new PooledConnectionFactory();
+        cf.setConnectionFactory(amq);
+        cf.setConnectionTimeout(100);
+
+        PooledConnection connection = (PooledConnection) cf.createConnection();
+        assertEquals(1, cf.getNumConnections());
+
+        // wait for the connection timeout
+        Thread.sleep(300);
+
+        connection = (PooledConnection) cf.createConnection();
+        assertEquals(1, cf.getNumConnections());
+    }
+
     @Test(timeout = 60000)
     public void testClearAllConnections() throws Exception {