You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/16 00:21:56 UTC

[11/17] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5015

https://issues.apache.org/jira/browse/AMQ-5015

Refactor the way sessions are pooled.  We don't need to keep the
PooledSession instances around since the state is unique to the session
it wraps we only need to keep the Session instances in the SessionPool
and create a new PooledSession on borrow to manage that session.  This
allows the PooledSession to have a real closed state that protects
against multiple close calls placing duplicate PooledSession instances
into the SessionPool.  This also simplifies the code in the
XaConnectionPool since it doesn't need to try and reset state in
PouledSessions before placing them back as it gets a fresh wrapper each
time with the correct state.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ac57ce9f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ac57ce9f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ac57ce9f

Branch: refs/heads/activemq-5.10.x
Commit: ac57ce9f68e2b3c5f2d004f507383d64161de3ec
Parents: ee9b3ef
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jun 12 19:12:56 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Mon Dec 15 16:51:42 2014 -0500

----------------------------------------------------------------------
 .../activemq/jms/pool/ConnectionPool.java       |  48 +++++----
 .../apache/activemq/jms/pool/PooledSession.java |  19 ++--
 .../activemq/jms/pool/XaConnectionPool.java     |  18 ----
 .../jms/pool/XaPooledConnectionFactory.java     |  18 ++--
 .../jms/pool/PooledConnectionTempQueueTest.java | 102 +++++++++++++++++++
 5 files changed, 153 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ac57ce9f/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
index eced588..26995ea 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
@@ -25,12 +25,12 @@ import javax.jms.Connection;
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.apache.commons.pool.impl.GenericObjectPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Holds a real JMS connection along with the session pools associated with it.
@@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory;
  */
 public class ConnectionPool {
 
-    private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);
-
     protected Connection connection;
     private int referenceCount;
     private long lastUsed = System.currentTimeMillis();
@@ -54,7 +52,7 @@ public class ConnectionPool {
     private boolean useAnonymousProducers = true;
 
     private final AtomicBoolean started = new AtomicBoolean(false);
-    private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool;
+    private final GenericKeyedObjectPool<SessionKey, Session> sessionPool;
     private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
 
     public ConnectionPool(Connection connection) {
@@ -62,33 +60,29 @@ public class ConnectionPool {
         this.connection = wrap(connection);
 
         // Create our internal Pool of session instances.
-        this.sessionPool = new GenericKeyedObjectPool<SessionKey, PooledSession>(
-            new KeyedPoolableObjectFactory<SessionKey, PooledSession>() {
+        this.sessionPool = new GenericKeyedObjectPool<SessionKey, Session>(
+            new KeyedPoolableObjectFactory<SessionKey, Session>() {
 
                 @Override
-                public void activateObject(SessionKey key, PooledSession session) throws Exception {
-                    ConnectionPool.this.loanedSessions.add(session);
+                public void activateObject(SessionKey key, Session session) throws Exception {
                 }
 
                 @Override
-                public void destroyObject(SessionKey key, PooledSession session) throws Exception {
-                    ConnectionPool.this.loanedSessions.remove(session);
-                    session.getInternalSession().close();
+                public void destroyObject(SessionKey key, Session session) throws Exception {
+                    session.close();
                 }
 
                 @Override
-                public PooledSession makeObject(SessionKey key) throws Exception {
-                    Session session = makeSession(key);
-                    return new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers);
+                public Session makeObject(SessionKey key) throws Exception {
+                    return makeSession(key);
                 }
 
                 @Override
-                public void passivateObject(SessionKey key, PooledSession session) throws Exception {
-                    ConnectionPool.this.loanedSessions.remove(session);
+                public void passivateObject(SessionKey key, Session session) throws Exception {
                 }
 
                 @Override
-                public boolean validateObject(SessionKey key, PooledSession session) {
+                public boolean validateObject(SessionKey key, Session session) {
                     return true;
                 }
             }
@@ -130,7 +124,23 @@ public class ConnectionPool {
         SessionKey key = new SessionKey(transacted, ackMode);
         PooledSession session;
         try {
-            session = sessionPool.borrowObject(key);
+            session = new PooledSession(key, sessionPool.borrowObject(key), sessionPool, key.isTransacted(), useAnonymousProducers);
+            session.addSessionEventListener(new PooledSessionEventListener() {
+
+                @Override
+                public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
+                }
+
+                @Override
+                public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
+                }
+
+                @Override
+                public void onSessionClosed(PooledSession session) {
+                    ConnectionPool.this.loanedSessions.remove(session);
+                }
+            });
+            this.loanedSessions.add(session);
         } catch (Exception e) {
             IllegalStateException illegalStateException = new IllegalStateException(e.toString());
             illegalStateException.initCause(e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/ac57ce9f/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
index 1d3fc2f..3a2e698 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
@@ -19,6 +19,7 @@ package org.apache.activemq.jms.pool;
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
@@ -54,10 +55,11 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
     private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
 
     private final SessionKey key;
-    private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
+    private final KeyedObjectPool<SessionKey, Session> sessionPool;
     private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
     private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
     private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>();
+    private final AtomicBoolean closed = new AtomicBoolean();
 
     private MessageProducer producer;
     private TopicPublisher publisher;
@@ -69,7 +71,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
     private boolean isXa;
     private boolean useAnonymousProducers = true;
 
-    public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional, boolean anonymous) {
+    public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, Session> sessionPool, boolean transactional, boolean anonymous) {
         this.key = key;
         this.session = session;
         this.sessionPool = sessionPool;
@@ -94,7 +96,11 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
 
     @Override
     public void close() throws JMSException {
-        if (!ignoreClose) {
+        if (ignoreClose) {
+            return;
+        }
+
+        if (closed.compareAndSet(false, true)) {
             boolean invalidate = false;
             try {
                 // lets reset the session
@@ -140,22 +146,23 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
                     } catch (JMSException e1) {
                         LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
                     }
-                    session = null;
                 }
                 try {
-                    sessionPool.invalidateObject(key, this);
+                    sessionPool.invalidateObject(key, session);
                 } catch (Exception e) {
                     LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
                 }
             } else {
                 try {
-                    sessionPool.returnObject(key, this);
+                    sessionPool.returnObject(key, session);
                 } catch (Exception e) {
                     javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
                     illegalStateException.initCause(e);
                     throw illegalStateException;
                 }
             }
+
+            session = null;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ac57ce9f/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java
index 4f87153..0f86b55 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java
@@ -19,8 +19,6 @@ package org.apache.activemq.jms.pool;
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
 import javax.jms.XAConnection;
 import javax.transaction.RollbackException;
 import javax.transaction.Status;
@@ -65,22 +63,6 @@ public class XaConnectionPool extends ConnectionPool {
             }
             PooledSession session = (PooledSession) super.createSession(transacted, ackMode);
             if (isXa) {
-                session.addSessionEventListener(new PooledSessionEventListener() {
-
-                    @Override
-                    public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
-                    }
-
-                    @Override
-                    public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
-                    }
-
-                    @Override
-                    public void onSessionClosed(PooledSession session) {
-                        session.setIgnoreClose(true);
-                        session.setIsXa(false);
-                    }
-                });
                 session.setIgnoreClose(true);
                 session.setIsXa(true);
                 transactionManager.getTransaction().registerSynchronization(new Synchronization(session));

http://git-wip-us.apache.org/repos/asf/activemq/blob/ac57ce9f/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
index 0567509..5e44be2 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
@@ -18,14 +18,13 @@ package org.apache.activemq.jms.pool;
 
 import java.io.Serializable;
 import java.util.Hashtable;
+
 import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.QueueConnection;
 import javax.jms.QueueConnectionFactory;
 import javax.jms.TopicConnection;
 import javax.jms.TopicConnectionFactory;
-import javax.jms.XAConnectionFactory;
 import javax.naming.Binding;
 import javax.naming.Context;
 import javax.naming.InitialContext;
@@ -38,13 +37,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A pooled connection factory that automatically enlists
- * sessions in the current active XA transaction if any.
+ * A pooled connection factory that automatically enlists sessions in the
+ * current active XA transaction if any.
  */
-public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory,
-        Serializable, QueueConnectionFactory, TopicConnectionFactory {
+public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory, Serializable, QueueConnectionFactory, TopicConnectionFactory {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(XaPooledConnectionFactory.class);
+    private static final long serialVersionUID = -6545688026350913005L;
+
     private TransactionManager transactionManager;
     private boolean tmFromJndi = false;
     private String tmJndiName = "java:/TransactionManager";
@@ -87,10 +87,10 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement
             name = name.substring(0, name.lastIndexOf('/')) + "/conf" + name.substring(name.lastIndexOf('/'));
             try {
                 InitialContext ctx = new InitialContext();
-                NamingEnumeration bindings = ctx.listBindings(name);
+                NamingEnumeration<Binding> bindings = ctx.listBindings(name);
 
                 while (bindings.hasMore()) {
-                    Binding bd = (Binding)bindings.next();
+                    Binding bd = bindings.next();
                     IntrospectionSupport.setProperty(this, bd.getName(), bd.getObject());
                 }
 
@@ -116,6 +116,7 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement
 
     /**
      * Allow transaction manager resolution from JNDI (ee deployment)
+     *
      * @param tmFromJndi
      */
     public void setTmFromJndi(boolean tmFromJndi) {
@@ -141,5 +142,4 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement
     public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
         return (TopicConnection) createConnection(userName, password);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ac57ce9f/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java
new file mode 100644
index 0000000..fc0fbf4
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java
@@ -0,0 +1,102 @@
+package org.apache.activemq.jms.pool;
+
+import java.util.concurrent.Executors;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PooledConnectionTempQueueTest {
+
+    private final Logger LOG = LoggerFactory.getLogger(PooledConnectionTempQueueTest.class);
+
+    protected static final String SERVICE_QUEUE = "queue1";
+
+    @Test
+    public void testTempQueueIssue() throws JMSException, InterruptedException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        final PooledConnectionFactory cf = new PooledConnectionFactory();
+        cf.setConnectionFactory(factory);
+
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        LOG.info("First connection was {}", connection);
+
+        // This order seems to matter to reproduce the issue
+        connection.close();
+        session.close();
+
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    receiveAndRespondWithMessageIdAsCorrelationId(cf, SERVICE_QUEUE);
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        sendWithReplyToTemp(cf, SERVICE_QUEUE);
+    }
+
+    private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException,
+        InterruptedException {
+        Connection con = cf.createConnection();
+        con.start();
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TemporaryQueue tempQueue = session.createTemporaryQueue();
+        TextMessage msg = session.createTextMessage("Request");
+        msg.setJMSReplyTo(tempQueue);
+        MessageProducer producer = session.createProducer(session.createQueue(serviceQueue));
+        producer.send(msg);
+
+        // This sleep also seems to matter
+        Thread.sleep(5000);
+
+        MessageConsumer consumer = session.createConsumer(tempQueue);
+        Message replyMsg = consumer.receive();
+        System.out.println(replyMsg.getJMSCorrelationID());
+
+        consumer.close();
+
+        producer.close();
+        session.close();
+        con.close();
+    }
+
+    public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory,
+                                                              String queueName) throws JMSException {
+        Connection con = connectionFactory.createConnection();
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
+        final javax.jms.Message inMessage = consumer.receive();
+
+        String requestMessageId = inMessage.getJMSMessageID();
+        System.out.println("Received message " + requestMessageId);
+        final TextMessage replyMessage = session.createTextMessage("Result");
+        replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID());
+        final MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo());
+        System.out.println("Sending reply to " + inMessage.getJMSReplyTo());
+        producer.send(replyMessage);
+
+        producer.close();
+        consumer.close();
+        session.close();
+        con.close();
+    }
+
+}