You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/06/21 14:15:24 UTC

svn commit: r956541 - in /activemq/trunk/activemq-pool/src: main/java/org/apache/activemq/pool/ test/java/org/apache/activemq/pool/

Author: gtully
Date: Mon Jun 21 12:15:23 2010
New Revision: 956541

URL: http://svn.apache.org/viewvc?rev=956541&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2376 - add expiryTimeout such that pooled connections can have a determintic life span after which they expire. This allows the pool to be refreshed such that loadbalancing of failover reconnect can ocurr post recovery

Added:
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionExpiryEvictsFromPoolTest.java   (with props)
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java   (with props)
Removed:
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java
Modified:
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java?rev=956541&r1=956540&r2=956541&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java Mon Jun 21 12:15:23 2010
@@ -25,11 +25,6 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.jms.JMSException;
 import javax.jms.Session;
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.SystemException;
-import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAResource;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.transport.TransportListener;
@@ -48,9 +43,11 @@ public class ConnectionPool {
     private int referenceCount;
     private ObjectPoolFactory poolFactory;
     private long lastUsed = System.currentTimeMillis();
+    private long firstUsed = lastUsed;
     private boolean hasFailed;
     private boolean hasExpired;
     private int idleTimeout = 30 * 1000;
+    private long expiryTimeout = 0l;
 
     public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
         this(connection, new HashMap<SessionKey, SessionPool>(), poolFactory);
@@ -63,6 +60,8 @@ public class ConnectionPool {
 
             public void onException(IOException error) {
                 synchronized (ConnectionPool.this) {
+                    System.err.println("HasFaile=true on :" + error);
+                    Thread.dumpStack();
                     hasFailed = true;
                 }
             }
@@ -163,7 +162,9 @@ public class ConnectionPool {
             }
             return true;
         }
-        if (hasFailed || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)) {
+        if (hasFailed 
+                || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
+                || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
             hasExpired = true;
             if (referenceCount == 0) {
                 close();
@@ -185,4 +186,12 @@ public class ConnectionPool {
         return new SessionPool(this, key, poolFactory.createPool());
     }
 
+    public void setExpiryTimeout(long expiryTimeout) {
+        this.expiryTimeout  = expiryTimeout;
+    }
+    
+    public long getExpiryTimeout() {
+        return expiryTimeout;
+    }
+
 }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java?rev=956541&r1=956540&r2=956541&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java (original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java Mon Jun 21 12:15:23 2010
@@ -56,6 +56,7 @@ public class PooledConnectionFactory imp
     private int maxConnections = 1;
     private int idleTimeout = 30 * 1000;
     private AtomicBoolean stopped = new AtomicBoolean(false);
+    private long expiryTimeout = 0l;
 
     public PooledConnectionFactory() {
         this(new ActiveMQConnectionFactory());
@@ -117,6 +118,7 @@ public class PooledConnectionFactory imp
     protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
         ConnectionPool result =  new ConnectionPool(connection, getPoolFactory());
         result.setIdleTimeout(getIdleTimeout());
+        result.setExpiryTimeout(getExpiryTimeout());
         return result;
     }
 
@@ -145,9 +147,7 @@ public class PooledConnectionFactory imp
         LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size());
         stopped.set(true);
         for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
-            LinkedList list = iter.next();
-            for (Iterator i = list.iterator(); i.hasNext();) {
-                ConnectionPool connection = (ConnectionPool) i.next();
+            for (ConnectionPool connection : iter.next()) {
                 try {
                     connection.close();
                 }catch(Exception e) {
@@ -209,4 +209,18 @@ public class PooledConnectionFactory imp
     public void setIdleTimeout(int idleTimeout) {
         this.idleTimeout = idleTimeout;
     }
+
+    /**
+     * allow connections to expire, irrespective of load or idle time. This is useful with failover
+     * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
+     * 
+     * @param expiryTimeout non zero in milliseconds
+     */
+    public void setExpiryTimeout(long expiryTimeout) {
+        this.expiryTimeout = expiryTimeout;   
+    }
+    
+    public long getExpiryTimeout() {
+        return expiryTimeout;
+    }
 }

Added: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionExpiryEvictsFromPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionExpiryEvictsFromPoolTest.java?rev=956541&view=auto
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionExpiryEvictsFromPoolTest.java (added)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionExpiryEvictsFromPoolTest.java Mon Jun 21 12:15:23 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pool;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.test.TestSupport;
+
+public class ConnectionExpiryEvictsFromPoolTest extends TestSupport {
+
+    private BrokerService broker;
+    private ActiveMQConnectionFactory factory;
+    private PooledConnectionFactory pooledFactory;
+
+    protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        TransportConnector connector = broker.addConnector("tcp://localhost:0");
+        broker.start();
+        factory = new ActiveMQConnectionFactory("mock:" + connector.getConnectUri());
+        pooledFactory = new PooledConnectionFactory(factory);
+        pooledFactory.setMaxConnections(1);
+    }
+
+    public void testEvictionOfIdle() throws Exception {
+        pooledFactory.setIdleTimeout(10);
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+        ActiveMQConnection amq1 = connection.getConnection();
+        
+        connection.close();
+        // let it idle timeout
+        TimeUnit.SECONDS.sleep(1);
+        
+        PooledConnection connection2 = (PooledConnection) pooledFactory.createConnection();
+        ActiveMQConnection amq2 = connection2.getConnection();
+        assertTrue("not equal", !amq1.equals(amq2));
+    }
+    
+    
+    public void testEvictionOfExpired() throws Exception {
+        pooledFactory.setExpiryTimeout(10);
+        Connection connection = pooledFactory.createConnection();
+        ActiveMQConnection amq1 = ((PooledConnection) connection).getConnection();
+        
+        // let it expire while in use
+        TimeUnit.SECONDS.sleep(1);
+        connection.close();
+        
+        Connection connection2 = pooledFactory.createConnection();
+        ActiveMQConnection amq2 = ((PooledConnection) connection2).getConnection();
+        assertTrue("not equal", !amq1.equals(amq2));
+    }
+    
+    
+    protected void tearDown() throws Exception {
+        broker.stop();
+    }
+}

Propchange: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionExpiryEvictsFromPoolTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionExpiryEvictsFromPoolTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java?rev=956541&view=auto
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java (added)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java Mon Jun 21 12:15:23 2010
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pool;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.test.TestSupport;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.mock.MockTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ConnectionFailureEvictsFromPoolTest extends TestSupport {
+    private static final Log LOG = LogFactory.getLog(ConnectionFailureEvictsFromPoolTest.class);
+    private BrokerService broker;
+    private ActiveMQConnectionFactory factory;
+    private PooledConnectionFactory pooledFactory;
+
+    protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        TransportConnector connector = broker.addConnector("tcp://localhost:0");
+        broker.start();
+        factory = new ActiveMQConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false");
+        pooledFactory = new PooledConnectionFactory(factory);
+    }
+
+    public void testEviction() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+        ActiveMQConnection amqC = connection.getConnection();
+        final CountDownLatch gotExceptionEvent = new CountDownLatch(1);
+        amqC.addTransportListener(new TransportListener() {
+            public void onCommand(Object command) {
+            }
+            public void onException(IOException error) {
+                // we know connection is dead...
+                // listeners are fired async
+                gotExceptionEvent.countDown();
+            }
+            public void transportInterupted() {
+            }
+            public void transportResumed() {
+            }
+        });
+        
+        sendMessage(connection);
+        LOG.info("sent one message worked fine");
+        createConnectionFailure(connection);
+        try {
+            sendMessage(connection);
+            fail("Expected Error");
+        } catch (JMSException e) {
+        } finally {
+            connection.close();
+        }
+        assertTrue("exception event propagated ok", gotExceptionEvent.await(5, TimeUnit.SECONDS));
+        // If we get another connection now it should be a new connection that
+        // works.
+        LOG.info("expect new connection after failure");
+        Connection connection2 = pooledFactory.createConnection();
+        sendMessage(connection2);
+    }
+
+    private void createConnectionFailure(Connection connection) throws Exception {
+        ActiveMQConnection c = ((PooledConnection)connection).getConnection();
+        MockTransport t = (MockTransport)c.getTransportChannel().narrow(MockTransport.class);
+        t.onException(new IOException("forcing exception for " + getName() + " to force pool eviction"));
+        LOG.info("arranged for failure, chucked exception");
+    }
+
+    private void sendMessage(Connection connection) throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("FOO"));
+        producer.send(session.createTextMessage("Test"));
+        session.close();
+    }
+
+    protected void tearDown() throws Exception {
+        broker.stop();
+    }
+}

Propchange: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date