You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/04/20 15:48:31 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5510 https://issues.apache.org/jira/browse/AMQ-5534

Repository: activemq
Updated Branches:
  refs/heads/master 8a30026e8 -> b65c0d1be


https://issues.apache.org/jira/browse/AMQ-5510
https://issues.apache.org/jira/browse/AMQ-5534

PooledConnectioFactory added reconnection support but can break if the
holder of the connection adds their own ExceptionListener as the
PooledConnection doesn't protect the internal ExceptionListener from
replacement which leads to cases where the loaned Connection is not
automatically closed so that the next create returns the same failed
connection.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b65c0d1b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b65c0d1b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b65c0d1b

Branch: refs/heads/master
Commit: b65c0d1be4b0812229d3f166e50d963766856c53
Parents: 8a30026
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Apr 20 09:48:06 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Apr 20 09:48:06 2016 -0400

----------------------------------------------------------------------
 .../activemq/jms/pool/ConnectionPool.java       |  34 +++---
 .../activemq/jms/pool/PooledConnection.java     |   4 +-
 .../jms/pool/PooledConnectionFactory.java       |  19 +--
 .../jms/pool/PooledConnectionFailoverTest.java  | 117 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |   6 +-
 5 files changed, 144 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b65c0d1b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
index 15bdd33..2be4a6f 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
@@ -67,6 +67,11 @@ public class ConnectionPool implements ExceptionListener {
         final GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
         poolConfig.setJmxEnabled(false);
         this.connection = wrap(connection);
+        try {
+            this.connection.setExceptionListener(this);
+        } catch (JMSException ex) {
+            LOG.warn("Could not set exception listener on create of ConnectionPool");
+        }
 
         // Create our internal Pool of session instances.
         this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>(
@@ -79,7 +84,7 @@ public class ConnectionPool implements ExceptionListener {
 
                 @Override
                 public void destroyObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) throws Exception {
-                    ((SessionHolder)pooledObject.getObject()).close();
+                    pooledObject.getObject().close();
                 }
 
                 @Override
@@ -357,26 +362,21 @@ public class ConnectionPool implements ExceptionListener {
      */
     public void setReconnectOnException(boolean reconnectOnException) {
         this.reconnectOnException = reconnectOnException;
-        try {
-            if (isReconnectOnException()) {
-                if (connection.getExceptionListener() != null) {
-                    parentExceptionListener = connection.getExceptionListener();
-                }
-                connection.setExceptionListener(this);
-            } else {
-                if (parentExceptionListener != null) {
-                    connection.setExceptionListener(parentExceptionListener);
-                }
-                parentExceptionListener = null;
-            }
-        } catch (JMSException jmse) {
-            LOG.warn("Cannot set reconnect exception listener", jmse);
-        }
+    }
+
+    ExceptionListener getParentExceptionListener() {
+        return parentExceptionListener;
+    }
+
+    void setParentExceptionListener(ExceptionListener parentExceptionListener) {
+        this.parentExceptionListener = parentExceptionListener;
     }
 
     @Override
     public void onException(JMSException exception) {
-        close();
+        if (isReconnectOnException()) {
+            close();
+        }
         if (parentExceptionListener != null) {
             parentExceptionListener.onException(exception);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b65c0d1b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
index b7b56ba..111e730 100755
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
@@ -122,7 +122,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole
 
     @Override
     public ExceptionListener getExceptionListener() throws JMSException {
-        return getConnection().getExceptionListener();
+        return pool.getParentExceptionListener();
     }
 
     @Override
@@ -132,7 +132,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole
 
     @Override
     public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
-        getConnection().setExceptionListener(exceptionListener);
+        pool.setParentExceptionListener(exceptionListener);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/b65c0d1b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
index f507cda..839d668 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -63,7 +63,6 @@ import org.slf4j.LoggerFactory;
  * eviction thread may be configured using the {@link org.apache.activemq.jms.pool.PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method.  By
  * default the value is -1 which means no eviction thread will be run.  Set to a non-negative value to
  * configure the idle eviction thread to run.
- *
  */
 public class PooledConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory {
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
@@ -106,9 +105,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
                         connection.setUseAnonymousProducers(isUseAnonymousProducers());
                         connection.setReconnectOnException(isReconnectOnException());
 
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Created new connection: {}", connection);
-                        }
+                        LOG.trace("Created new connection: {}", connection);
 
                         PooledConnectionFactory.this.mostRecentlyCreated.set(connection);
 
@@ -119,9 +116,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
                     public void destroyObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) throws Exception {
                         ConnectionPool connection = pooledObject.getObject();
                         try {
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Destroying connection: {}", connection);
-                            }
+                            LOG.trace("Destroying connection: {}", connection);
                             connection.close();
                         } catch (Exception e) {
                             LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
@@ -132,10 +127,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
                     public boolean validateObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) {
                         ConnectionPool connection = pooledObject.getObject();
                         if (connection != null && connection.expiredCheck()) {
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Connection has expired: {} and will be destroyed", connection);
-                            }
-
+                            LOG.trace("Connection has expired: {} and will be destroyed", connection);
                             return false;
                         }
 
@@ -305,7 +297,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
     public void stop() {
         if (stopped.compareAndSet(false, true)) {
             LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
-                    connectionsPool != null ? connectionsPool.getNumActive() : 0);
+                      connectionsPool != null ? connectionsPool.getNumActive() : 0);
             try {
                 if (connectionsPool != null) {
                     connectionsPool.close();
@@ -322,7 +314,6 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
      * are in use be client's will be closed.
      */
     public void clear() {
-
         if (stopped.get()) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b65c0d1b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFailoverTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFailoverTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFailoverTest.java
new file mode 100644
index 0000000..503cc0b
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFailoverTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.jms.pool;
+
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PooledConnectionFailoverTest extends JmsPoolTestSupport {
+
+    protected ActiveMQConnectionFactory directConnFact;
+    protected PooledConnectionFactory pooledConnFact;
+
+    @Override
+    @Before
+    public void setUp() throws java.lang.Exception {
+        super.setUp();
+
+        String connectionURI = createBroker();
+
+        // Create the ActiveMQConnectionFactory and the PooledConnectionFactory.
+        directConnFact = new ActiveMQConnectionFactory(connectionURI);
+        pooledConnFact = new PooledConnectionFactory();
+        pooledConnFact.setConnectionFactory(directConnFact);
+        pooledConnFact.setMaxConnections(1);
+        pooledConnFact.setReconnectOnException(true);
+    }
+
+    @Test
+    public void testConnectionFailures() throws Exception {
+
+        final CountDownLatch failed = new CountDownLatch(1);
+
+        Connection connection = pooledConnFact.createConnection();
+        LOG.info("Fetched new connection from the pool: {}", connection);
+        connection.setExceptionListener(new ExceptionListener() {
+
+            @Override
+            public void onException(JMSException exception) {
+                LOG.info("Pooled Connection failed");
+                failed.countDown();
+            }
+        });
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getTestName());
+        MessageProducer producer = session.createProducer(queue);
+
+        brokerService.stop();
+
+        assertTrue(failed.await(15, TimeUnit.SECONDS));
+
+        createBroker();
+
+        try {
+            producer.send(session.createMessage());
+            fail("Should be disconnected");
+        } catch (JMSException ex) {
+            LOG.info("Producer failed as expected: {}", ex.getMessage());
+        }
+
+        Connection connection2 = pooledConnFact.createConnection();
+        assertNotSame(connection, connection2);
+        LOG.info("Fetched new connection from the pool: {}", connection2);
+        session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        connection2.close();
+
+        pooledConnFact.stop();
+    }
+
+    private String createBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setBrokerName("PooledConnectionSessionCleanupTestBroker");
+        brokerService.setUseJmx(true);
+        brokerService.getManagementContext().setCreateConnector(false);
+        brokerService.setPersistent(false);
+        brokerService.setSchedulerSupport(false);
+        brokerService.setAdvisorySupport(false);
+        TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:61626");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        return "failover:(" + connector.getPublishableConnectString() + ")?maxReconnectAttempts=5";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b65c0d1b/activemq-jms-pool/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/resources/log4j.properties b/activemq-jms-pool/src/test/resources/log4j.properties
index b42af1a..2543c16 100755
--- a/activemq-jms-pool/src/test/resources/log4j.properties
+++ b/activemq-jms-pool/src/test/resources/log4j.properties
@@ -5,9 +5,9 @@
 ## The ASF licenses this file to You under the Apache License, Version 2.0
 ## (the "License"); you may not use this file except in compliance with
 ## the License.  You may obtain a copy of the License at
-## 
+##
 ## http://www.apache.org/licenses/LICENSE-2.0
-## 
+##
 ## Unless required by applicable law or agreed to in writing, software
 ## distributed under the License is distributed on an "AS IS" BASIS,
 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,7 +23,7 @@ log4j.rootLogger=INFO, out, stdout
 log4j.logger.org.apache.activemq.spring=WARN
 #log4j.logger.org.apache.activemq.usecases=DEBUG
 #log4j.logger.org.apache.activemq.broker.region=DEBUG
-log4j.logger.org.apache.activemq.pool=DEBUG
+log4j.logger.org.apache.activemq.jms.pool=DEBUG
 
 # CONSOLE appender not used by default
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender