You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/06/13 18:35:31 UTC
[3/4] git commit: https://issues.apache.org/jira/browse/AMQ-5224
https://issues.apache.org/jira/browse/AMQ-5224
Support for XAConnectionFactory that do not implement ConnectionFactory
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/577b29c0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/577b29c0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/577b29c0
Branch: refs/heads/trunk
Commit: 577b29c0e5ef666c5c8112031d7a9d2f3ce94935
Parents: 459593c
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jun 13 12:30:17 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jun 13 12:30:17 2014 -0400
----------------------------------------------------------------------
.../jms/pool/GenericResourceManager.java | 7 ++++-
.../jms/pool/PooledConnectionFactory.java | 32 ++++++++------------
.../jms/pool/XaPooledConnectionFactory.java | 24 +++++++++++++++
.../activemq/jms/pool/XAConnectionPoolTest.java | 26 ++++++++++++++--
.../activemq/pool/XAConnectionPoolTest.java | 31 +++++++++----------
5 files changed, 81 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/577b29c0/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java
index b903906..48963ac 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java
@@ -151,7 +151,12 @@ public class GenericResourceManager {
@Override
public NamedXAResource getNamedXAResource() throws SystemException {
try {
- final XAConnection xaConnection = connFactory.createXAConnection(rm.getUserName(), rm.getPassword());
+ final XAConnection xaConnection;
+ if (rm.getUserName() != null && rm.getPassword() != null) {
+ xaConnection = connFactory.createXAConnection(rm.getUserName(), rm.getPassword());
+ } else {
+ xaConnection = connFactory.createXAConnection();
+ }
final XASession session = xaConnection.createXASession();
xaConnection.start();
LOGGER.debug("new namedXAResource's connection: " + xaConnection);
http://git-wip-us.apache.org/repos/asf/activemq/blob/577b29c0/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
index 16f2701..83807f9 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
@@ -26,7 +26,6 @@ import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
-import javax.jms.XAConnectionFactory;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
@@ -68,7 +67,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
protected final AtomicBoolean stopped = new AtomicBoolean(false);
private GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
- private ConnectionFactory connectionFactory;
+ protected Object connectionFactory;
private int maximumActiveSessionPerConnection = 500;
private int idleTimeout = 30 * 1000;
@@ -156,7 +155,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
/**
* @return the currently configured ConnectionFactory used to create the pooled Connections.
*/
- public ConnectionFactory getConnectionFactory() {
+ public Object getConnectionFactory() {
return connectionFactory;
}
@@ -170,20 +169,11 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
* @param toUse
* The factory to use to create pooled Connections.
*/
- public void setConnectionFactory(final ConnectionFactory toUse) {
- if (toUse instanceof XAConnectionFactory) {
- connectionFactory = new ConnectionFactory() {
- @Override
- public Connection createConnection() throws JMSException {
- return ((XAConnectionFactory)toUse).createXAConnection();
- }
- @Override
- public Connection createConnection(String userName, String password) throws JMSException {
- return ((XAConnectionFactory)toUse).createXAConnection(userName, password);
- }
- };
- } else {
+ public void setConnectionFactory(final Object toUse) {
+ if (toUse instanceof ConnectionFactory) {
this.connectionFactory = toUse;
+ } else {
+ throw new IllegalArgumentException("connectionFactory should implement javax.jmx.ConnectionFactory");
}
}
@@ -278,10 +268,14 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
}
protected Connection createConnection(ConnectionKey key) throws JMSException {
- if (key.getUserName() == null && key.getPassword() == null) {
- return connectionFactory.createConnection();
+ if (connectionFactory instanceof ConnectionFactory) {
+ if (key.getUserName() == null && key.getPassword() == null) {
+ return ((ConnectionFactory) connectionFactory).createConnection();
+ } else {
+ return ((ConnectionFactory) connectionFactory).createConnection(key.getUserName(), key.getPassword());
+ }
} else {
- return connectionFactory.createConnection(key.getUserName(), key.getPassword());
+ throw new IllegalStateException("connectionFactory should implement javax.jms.ConnectionFactory");
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/577b29c0/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
index d5fbc60..a5d5aea 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
@@ -20,6 +20,8 @@ import java.io.Serializable;
import java.util.Hashtable;
import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.XAConnectionFactory;
import javax.naming.Binding;
import javax.naming.Context;
import javax.naming.InitialContext;
@@ -62,6 +64,28 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement
}
@Override
+ public void setConnectionFactory(Object toUse) {
+ if (toUse instanceof XAConnectionFactory) {
+ connectionFactory = toUse;
+ } else {
+ throw new IllegalArgumentException("connectionFactory should implement javax.xml.XAConnectionFactory");
+ }
+ }
+
+ @Override
+ protected Connection createConnection(ConnectionKey key) throws JMSException {
+ if (connectionFactory instanceof XAConnectionFactory) {
+ if (key.getUserName() == null && key.getPassword() == null) {
+ return ((XAConnectionFactory) connectionFactory).createXAConnection();
+ } else {
+ return ((XAConnectionFactory) connectionFactory).createXAConnection(key.getUserName(), key.getPassword());
+ }
+ } else {
+ throw new IllegalStateException("connectionFactory should implement javax.jms.XAConnectionFactory");
+ }
+ }
+
+ @Override
protected ConnectionPool createConnectionPool(Connection connection) {
return new XaConnectionPool(connection, getTransactionManager());
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/577b29c0/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java
index daf3af5..c56f8de 100644
--- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.jms.pool;
import java.util.Hashtable;
import java.util.Vector;
+import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
@@ -28,6 +29,8 @@ import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
import javax.naming.spi.ObjectFactory;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
@@ -53,8 +56,7 @@ public class XAConnectionPoolTest extends TestSupport {
final Vector<Synchronization> syncs = new Vector<Synchronization>();
ActiveMQTopic topic = new ActiveMQTopic("test");
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
- pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
-
+ pcf.setConnectionFactory(new XAConnectionFactoryOnly(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false")));
// simple TM that is in a tx and will track syncs
pcf.setTransactionManager(new TransactionManager(){
@Override
@@ -154,7 +156,7 @@ public class XAConnectionPoolTest extends TestSupport {
final Vector<Synchronization> syncs = new Vector<Synchronization>();
ActiveMQTopic topic = new ActiveMQTopic("test");
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
- pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false&jms.xaAckMode=" + Session.CLIENT_ACKNOWLEDGE));
+ pcf.setConnectionFactory(new XAConnectionFactoryOnly(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false&jms.xaAckMode=" + Session.CLIENT_ACKNOWLEDGE)));
// simple TM that is in a tx and will track syncs
pcf.setTransactionManager(new TransactionManager(){
@@ -345,4 +347,22 @@ public class XAConnectionPoolTest extends TestSupport {
connection.close();
}
+
+ static class XAConnectionFactoryOnly implements XAConnectionFactory {
+ private final XAConnectionFactory connectionFactory;
+
+ XAConnectionFactoryOnly(XAConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ @Override
+ public XAConnection createXAConnection() throws JMSException {
+ return connectionFactory.createXAConnection();
+ }
+
+ @Override
+ public XAConnection createXAConnection(String userName, String password) throws JMSException {
+ return connectionFactory.createXAConnection(userName, password);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/577b29c0/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
index 8223f2d..9c85cde 100644
--- a/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
+++ b/activemq-pool/src/test/java/org/apache/activemq/pool/XAConnectionPoolTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.pool;
import java.util.Hashtable;
import java.util.Vector;
+
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
@@ -39,9 +40,8 @@ import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
-import org.apache.activemq.ActiveMQConnectionFactory;
+
import org.apache.activemq.ActiveMQXAConnectionFactory;
-import org.apache.activemq.ActiveMQXASession;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.jms.pool.PooledSession;
import org.apache.activemq.test.TestSupport;
@@ -53,16 +53,17 @@ public class XAConnectionPoolTest extends TestSupport {
final Vector<Synchronization> syncs = new Vector<Synchronization>();
ActiveMQTopic topic = new ActiveMQTopic("test");
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
- pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test?broker.persistent=false"));
+ pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
// simple TM that is in a tx and will track syncs
- pcf.setTransactionManager(new TransactionManager(){
+ pcf.setTransactionManager(new TransactionManager() {
@Override
public void begin() throws NotSupportedException, SystemException {
}
@Override
- public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
+ public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException,
+ SystemException {
}
@Override
@@ -105,7 +106,6 @@ public class XAConnectionPoolTest extends TestSupport {
public void setRollbackOnly() throws IllegalStateException, SystemException {
}
};
-
}
@Override
@@ -134,8 +134,6 @@ public class XAConnectionPoolTest extends TestSupport {
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
assertTrue(session instanceof PooledSession);
-// PooledSession pooledSession = (PooledSession) session;
-// assertTrue(pooledSession.getInternalSession() instanceof ActiveMQXASession);
TopicPublisher publisher = session.createPublisher(topic);
publisher.publish(session.createMessage());
@@ -157,13 +155,14 @@ public class XAConnectionPoolTest extends TestSupport {
pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false&jms.xaAckMode=" + Session.CLIENT_ACKNOWLEDGE));
// simple TM that is in a tx and will track syncs
- pcf.setTransactionManager(new TransactionManager(){
+ pcf.setTransactionManager(new TransactionManager() {
@Override
public void begin() throws NotSupportedException, SystemException {
}
@Override
- public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
+ public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException,
+ SystemException {
}
@Override
@@ -206,7 +205,6 @@ public class XAConnectionPoolTest extends TestSupport {
public void setRollbackOnly() throws IllegalStateException, SystemException {
}
};
-
}
@Override
@@ -248,7 +246,7 @@ public class XAConnectionPoolTest extends TestSupport {
connection.close();
}
- public void testInstanceOf() throws Exception {
+ public void testInstanceOf() throws Exception {
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
assertTrue(pcf instanceof QueueConnectionFactory);
assertTrue(pcf instanceof TopicConnectionFactory);
@@ -257,7 +255,7 @@ public class XAConnectionPoolTest extends TestSupport {
public void testBindable() throws Exception {
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
assertTrue(pcf instanceof ObjectFactory);
- assertTrue(((ObjectFactory)pcf).getObjectInstance(null, null, null, null) instanceof XaPooledConnectionFactory);
+ assertTrue(((ObjectFactory) pcf).getObjectInstance(null, null, null, null) instanceof XaPooledConnectionFactory);
assertTrue(pcf.isTmFromJndi());
}
@@ -272,7 +270,7 @@ public class XAConnectionPoolTest extends TestSupport {
public void testSenderAndPublisherDest() throws Exception {
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
- pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test?broker.persistent=false"));
+ pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
QueueConnection connection = pcf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -291,7 +289,7 @@ public class XAConnectionPoolTest extends TestSupport {
public void testSessionArgsIgnoredWithTm() throws Exception {
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
- pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test?broker.persistent=false"));
+ pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
// simple TM that with no tx
pcf.setTransactionManager(new TransactionManager() {
@Override
@@ -300,7 +298,8 @@ public class XAConnectionPoolTest extends TestSupport {
}
@Override
- public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
+ public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException,
+ SystemException {
throw new IllegalStateException("NoTx");
}