You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/06/13 18:35:31 UTC

[3/4] git commit: https://issues.apache.org/jira/browse/AMQ-5224

https://issues.apache.org/jira/browse/AMQ-5224

Support for XAConnectionFactory that do not implement ConnectionFactory 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/577b29c0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/577b29c0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/577b29c0

Branch: refs/heads/trunk
Commit: 577b29c0e5ef666c5c8112031d7a9d2f3ce94935
Parents: 459593c
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jun 13 12:30:17 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jun 13 12:30:17 2014 -0400

----------------------------------------------------------------------
 .../jms/pool/GenericResourceManager.java        |  7 ++++-
 .../jms/pool/PooledConnectionFactory.java       | 32 ++++++++------------
 .../jms/pool/XaPooledConnectionFactory.java     | 24 +++++++++++++++
 .../activemq/jms/pool/XAConnectionPoolTest.java | 26 ++++++++++++++--
 .../activemq/pool/XAConnectionPoolTest.java     | 31 +++++++++----------
 5 files changed, 81 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/577b29c0/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java
index b903906..48963ac 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java
@@ -151,7 +151,12 @@ public class GenericResourceManager {
                     @Override
                     public NamedXAResource getNamedXAResource() throws SystemException {
                         try {
-                            final XAConnection xaConnection = connFactory.createXAConnection(rm.getUserName(), rm.getPassword());
+                            final XAConnection xaConnection;
+                            if (rm.getUserName() != null && rm.getPassword() != null) {
+                                xaConnection = connFactory.createXAConnection(rm.getUserName(), rm.getPassword());
+                            } else {
+                                xaConnection = connFactory.createXAConnection();
+                            }
                             final XASession session = xaConnection.createXASession();
                             xaConnection.start();
                             LOGGER.debug("new namedXAResource's connection: " + xaConnection);

http://git-wip-us.apache.org/repos/asf/activemq/blob/577b29c0/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
index 16f2701..83807f9 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
@@ -26,7 +26,6 @@ import javax.jms.QueueConnection;
 import javax.jms.QueueConnectionFactory;
 import javax.jms.TopicConnection;
 import javax.jms.TopicConnectionFactory;
-import javax.jms.XAConnectionFactory;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
@@ -68,7 +67,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
     protected final AtomicBoolean stopped = new AtomicBoolean(false);
     private GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
 
-    private ConnectionFactory connectionFactory;
+    protected Object connectionFactory;
 
     private int maximumActiveSessionPerConnection = 500;
     private int idleTimeout = 30 * 1000;
@@ -156,7 +155,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
     /**
      * @return the currently configured ConnectionFactory used to create the pooled Connections.
      */
-    public ConnectionFactory getConnectionFactory() {
+    public Object getConnectionFactory() {
         return connectionFactory;
     }
 
@@ -170,20 +169,11 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
      * @param toUse
      *      The factory to use to create pooled Connections.
      */
-    public void setConnectionFactory(final ConnectionFactory toUse) {
-        if (toUse instanceof XAConnectionFactory) {
-            connectionFactory = new ConnectionFactory() {
-                @Override
-                public Connection createConnection() throws JMSException {
-                    return ((XAConnectionFactory)toUse).createXAConnection();
-                }
-                @Override
-                public Connection createConnection(String userName, String password) throws JMSException {
-                    return ((XAConnectionFactory)toUse).createXAConnection(userName, password);
-                }
-            };
-        } else {
+    public void setConnectionFactory(final Object toUse) {
+        if (toUse instanceof ConnectionFactory) {
             this.connectionFactory = toUse;
+        } else {
+            throw new IllegalArgumentException("connectionFactory should implement javax.jmx.ConnectionFactory");
         }
     }
 
@@ -278,10 +268,14 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
     }
 
     protected Connection createConnection(ConnectionKey key) throws JMSException {
-        if (key.getUserName() == null && key.getPassword() == null) {
-            return connectionFactory.createConnection();
+        if (connectionFactory instanceof ConnectionFactory) {
+            if (key.getUserName() == null && key.getPassword() == null) {
+                return ((ConnectionFactory) connectionFactory).createConnection();
+            } else {
+                return ((ConnectionFactory) connectionFactory).createConnection(key.getUserName(), key.getPassword());
+            }
         } else {
-            return connectionFactory.createConnection(key.getUserName(), key.getPassword());
+            throw new IllegalStateException("connectionFactory should implement javax.jms.ConnectionFactory");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/577b29c0/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
index d5fbc60..a5d5aea 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
@@ -20,6 +20,8 @@ import java.io.Serializable;
 import java.util.Hashtable;
 
 import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.XAConnectionFactory;
 import javax.naming.Binding;
 import javax.naming.Context;
 import javax.naming.InitialContext;
@@ -62,6 +64,28 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement
     }
 
     @Override
+    public void setConnectionFactory(Object toUse) {
+        if (toUse instanceof XAConnectionFactory) {
+            connectionFactory = toUse;
+        } else {
+            throw new IllegalArgumentException("connectionFactory should implement javax.xml.XAConnectionFactory");
+        }
+    }
+
+    @Override
+    protected Connection createConnection(ConnectionKey key) throws JMSException {
+        if (connectionFactory instanceof XAConnectionFactory) {
+            if (key.getUserName() == null && key.getPassword() == null) {
+                return ((XAConnectionFactory) connectionFactory).createXAConnection();
+            } else {
+                return ((XAConnectionFactory) connectionFactory).createXAConnection(key.getUserName(), key.getPassword());
+            }
+        } else {
+            throw new IllegalStateException("connectionFactory should implement javax.jms.XAConnectionFactory");
+        }
+    }
+
+    @Override
     protected ConnectionPool createConnectionPool(Connection connection) {
         return new XaConnectionPool(connection, getTransactionManager());
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/577b29c0/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java
index daf3af5..c56f8de 100644
--- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.jms.pool;
 import java.util.Hashtable;
 import java.util.Vector;
 
+import javax.jms.JMSException;
 import javax.jms.QueueConnection;
 import javax.jms.QueueConnectionFactory;
 import javax.jms.QueueSender;
@@ -28,6 +29,8 @@ import javax.jms.TopicConnection;
 import javax.jms.TopicConnectionFactory;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
 import javax.naming.spi.ObjectFactory;
 import javax.transaction.HeuristicMixedException;
 import javax.transaction.HeuristicRollbackException;
@@ -53,8 +56,7 @@ public class XAConnectionPoolTest extends TestSupport {
         final Vector<Synchronization> syncs = new Vector<Synchronization>();
         ActiveMQTopic topic = new ActiveMQTopic("test");
         XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
-        pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
-
+        pcf.setConnectionFactory(new XAConnectionFactoryOnly(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false")));
         // simple TM that is in a tx and will track syncs
         pcf.setTransactionManager(new TransactionManager(){
             @Override
@@ -154,7 +156,7 @@ public class XAConnectionPoolTest extends TestSupport {
         final Vector<Synchronization> syncs = new Vector<Synchronization>();
         ActiveMQTopic topic = new ActiveMQTopic("test");
         XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
-        pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false&jms.xaAckMode=" + Session.CLIENT_ACKNOWLEDGE));
+        pcf.setConnectionFactory(new XAConnectionFactoryOnly(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false&jms.xaAckMode=" + Session.CLIENT_ACKNOWLEDGE)));
 
         // simple TM that is in a tx and will track syncs
         pcf.setTransactionManager(new TransactionManager(){
@@ -345,4 +347,22 @@ public class XAConnectionPoolTest extends TestSupport {
 
         connection.close();
     }
+
+    static class XAConnectionFactoryOnly implements XAConnectionFactory {
+        private final XAConnectionFactory connectionFactory;
+
+        XAConnectionFactoryOnly(XAConnectionFactory connectionFactory) {
+            this.connectionFactory = connectionFactory;
+        }
+
+        @Override
+        public XAConnection createXAConnection() throws JMSException {
+            return connectionFactory.createXAConnection();
+        }
+
+        @Override
+        public XAConnection createXAConnection(String userName, String password) throws JMSException {
+            return connectionFactory.createXAConnection(userName, password);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/577b29c0/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
index 8223f2d..9c85cde 100644
--- a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
+++ b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.pool;
 
 import java.util.Hashtable;
 import java.util.Vector;
+
 import javax.jms.QueueConnection;
 import javax.jms.QueueConnectionFactory;
 import javax.jms.QueueSender;
@@ -39,9 +40,8 @@ import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAResource;
-import org.apache.activemq.ActiveMQConnectionFactory;
+
 import org.apache.activemq.ActiveMQXAConnectionFactory;
-import org.apache.activemq.ActiveMQXASession;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.jms.pool.PooledSession;
 import org.apache.activemq.test.TestSupport;
@@ -53,16 +53,17 @@ public class XAConnectionPoolTest extends TestSupport {
         final Vector<Synchronization> syncs = new Vector<Synchronization>();
         ActiveMQTopic topic = new ActiveMQTopic("test");
         XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
-        pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test?broker.persistent=false"));
+        pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
 
         // simple TM that is in a tx and will track syncs
-        pcf.setTransactionManager(new TransactionManager(){
+        pcf.setTransactionManager(new TransactionManager() {
             @Override
             public void begin() throws NotSupportedException, SystemException {
             }
 
             @Override
-            public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
+            public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException,
+                SystemException {
             }
 
             @Override
@@ -105,7 +106,6 @@ public class XAConnectionPoolTest extends TestSupport {
                     public void setRollbackOnly() throws IllegalStateException, SystemException {
                     }
                 };
-
             }
 
             @Override
@@ -134,8 +134,6 @@ public class XAConnectionPoolTest extends TestSupport {
         TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
 
         assertTrue(session instanceof PooledSession);
-//        PooledSession pooledSession = (PooledSession) session;
-//        assertTrue(pooledSession.getInternalSession() instanceof ActiveMQXASession);
 
         TopicPublisher publisher = session.createPublisher(topic);
         publisher.publish(session.createMessage());
@@ -157,13 +155,14 @@ public class XAConnectionPoolTest extends TestSupport {
         pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false&jms.xaAckMode=" + Session.CLIENT_ACKNOWLEDGE));
 
         // simple TM that is in a tx and will track syncs
-        pcf.setTransactionManager(new TransactionManager(){
+        pcf.setTransactionManager(new TransactionManager() {
             @Override
             public void begin() throws NotSupportedException, SystemException {
             }
 
             @Override
-            public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
+            public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException,
+                SystemException {
             }
 
             @Override
@@ -206,7 +205,6 @@ public class XAConnectionPoolTest extends TestSupport {
                     public void setRollbackOnly() throws IllegalStateException, SystemException {
                     }
                 };
-
             }
 
             @Override
@@ -248,7 +246,7 @@ public class XAConnectionPoolTest extends TestSupport {
         connection.close();
     }
 
-    public void testInstanceOf() throws  Exception {
+    public void testInstanceOf() throws Exception {
         XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
         assertTrue(pcf instanceof QueueConnectionFactory);
         assertTrue(pcf instanceof TopicConnectionFactory);
@@ -257,7 +255,7 @@ public class XAConnectionPoolTest extends TestSupport {
     public void testBindable() throws Exception {
         XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
         assertTrue(pcf instanceof ObjectFactory);
-        assertTrue(((ObjectFactory)pcf).getObjectInstance(null, null, null, null) instanceof XaPooledConnectionFactory);
+        assertTrue(((ObjectFactory) pcf).getObjectInstance(null, null, null, null) instanceof XaPooledConnectionFactory);
         assertTrue(pcf.isTmFromJndi());
     }
 
@@ -272,7 +270,7 @@ public class XAConnectionPoolTest extends TestSupport {
 
     public void testSenderAndPublisherDest() throws Exception {
         XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
-        pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test?broker.persistent=false"));
+        pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
 
         QueueConnection connection = pcf.createQueueConnection();
         QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -291,7 +289,7 @@ public class XAConnectionPoolTest extends TestSupport {
 
     public void testSessionArgsIgnoredWithTm() throws Exception {
         XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
-        pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test?broker.persistent=false"));
+        pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
         // simple TM that with no tx
         pcf.setTransactionManager(new TransactionManager() {
             @Override
@@ -300,7 +298,8 @@ public class XAConnectionPoolTest extends TestSupport {
             }
 
             @Override
-            public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
+            public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException,
+                SystemException {
                 throw new IllegalStateException("NoTx");
             }