You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by jg...@apache.org on 2020/10/05 08:50:17 UTC
[tomee] 02/02: Fix issues with XA transactions after ActiveMQ
upgrade
This is an automated email from the ASF dual-hosted git repository.
jgallimore pushed a commit to branch tomee-7.1.x
in repository https://gitbox.apache.org/repos/asf/tomee.git
commit 7e89a43b300a414111720e7ba82bb8b9caa749f7
Author: Jonathan Gallimore <jo...@jrg.me.uk>
AuthorDate: Tue Sep 29 10:26:16 2020 +0100
Fix issues with XA transactions after ActiveMQ upgrade
---
.../activemq/jms2/TomEEManagedConnectionProxy.java | 30 +++++++++++++++++-----
.../activemq/jms2/TomEERAConnectionFactory.java | 1 +
.../resource/activemq/jms2/TomEEXAConnection.java | 2 +-
.../resource/activemq/jms2/TomEEXASession.java | 5 ++++
.../apache/openejb/activemq/AMQXASupportTest.java | 11 ++------
5 files changed, 32 insertions(+), 17 deletions(-)
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
index aefe4f9..f52f928 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
@@ -36,6 +36,7 @@ import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
+import javax.transaction.Transaction;
public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
// cause org.apache.openejb.resource.AutoConnectionTracker.proxyConnection() just uses getInterfaces()
@@ -131,7 +132,9 @@ public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
if (xa) {
return createXASession();
} else {
- return connection.getPhysicalConnection().createSession(mode);
+ final Session session = connection.getPhysicalConnection().createSession(mode);
+ enlistInTransactionIfNeeded(session);
+ return session;
}
}
@@ -166,7 +169,9 @@ public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
if (xa) {
return createXASession();
} else {
- return connection.getPhysicalConnection().createSession(mode);
+ final Session session = connection.getPhysicalConnection().createSession(mode);
+ enlistInTransactionIfNeeded(session);
+ return session;
}
}
@@ -186,11 +191,22 @@ public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
@Override
public XASession createXASession() throws JMSException {
XASession session = ((XAConnection) connection.getPhysicalConnection()).createXASession();
- try {
- OpenEJB.getTransactionManager().getTransaction().enlistResource(session.getXAResource());
- } catch (IllegalStateException | SystemException | RollbackException e) {
- throw new RuntimeException(e);
- }
+ enlistInTransactionIfNeeded(session);
return session;
}
+
+ private void enlistInTransactionIfNeeded(final Session session) {
+ if (session instanceof XASession) {
+ XASession xaSession = XASession.class.cast(session);
+
+ try {
+ final Transaction transaction = OpenEJB.getTransactionManager().getTransaction();
+ if (transaction != null) {
+ transaction.enlistResource(xaSession.getXAResource());
+ }
+ } catch (IllegalStateException | SystemException | RollbackException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
index 1a76f9f..6babdce 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
@@ -17,6 +17,7 @@
package org.apache.openejb.resource.activemq.jms2;
import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
+
import org.apache.activemq.ra.ActiveMQConnectionFactory;
import org.apache.activemq.ra.ActiveMQConnectionRequestInfo;
import org.apache.activemq.ra.ActiveMQManagedConnectionFactory;
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java
index ff54579..ea71b77 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java
@@ -42,7 +42,7 @@ public class TomEEXAConnection extends ActiveMQXAConnection {
@Override
public Session createSession(final int sessionMode) throws JMSException {
- return super.createSession(sessionMode == Session.SESSION_TRANSACTED, sessionMode);
+ return createSession(sessionMode == Session.SESSION_TRANSACTED, sessionMode);
}
@Override
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXASession.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXASession.java
index 9c811fb..881957d 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXASession.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXASession.java
@@ -60,4 +60,9 @@ public class TomEEXASession extends ActiveMQXASession {
public MessageConsumer createSharedDurableConsumer(final Topic topic, final String name, final String messageSelector) throws JMSException {
return createDurableSubscriber(topic, name, messageSelector, false);
}
+
+ protected void doStartTransaction() throws JMSException {
+ // allow non transactional auto ack work on an XASession
+ // Seems ok by the spec that an XAConnection can be used without an XA tx
+ }
}
diff --git a/container/openejb-core/src/test/java/org/apache/openejb/activemq/AMQXASupportTest.java b/container/openejb-core/src/test/java/org/apache/openejb/activemq/AMQXASupportTest.java
index 1391f1e..3b1bc67 100644
--- a/container/openejb-core/src/test/java/org/apache/openejb/activemq/AMQXASupportTest.java
+++ b/container/openejb-core/src/test/java/org/apache/openejb/activemq/AMQXASupportTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.openejb.activemq;
-import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.openejb.jee.MessageDrivenBean;
import org.apache.openejb.junit.ApplicationComposer;
import org.apache.openejb.testing.Configuration;
@@ -69,9 +68,6 @@ public class AMQXASupportTest {
.p("cf", "new://Resource?type=" + ConnectionFactory.class.getName())
.p("cf.ResourceAdapter", "amq")
- .p("xaCf", "new://Resource?class-name=" + ActiveMQXAConnectionFactory.class.getName())
- .p("xaCf.BrokerURL", "vm://localhost")
-
.build();
}
@@ -83,9 +79,6 @@ public class AMQXASupportTest {
@Resource(name = "target")
private Queue destination;
- @Resource(name = "xaCf")
- private XAConnectionFactory xacf;
-
@Resource(name = "cf")
private ConnectionFactory cf;
@@ -105,9 +98,9 @@ public class AMQXASupportTest {
@Test
public void xaCode() throws Exception {
- assertNotNull(xacf);
+ assertNotNull(cf);
- final Connection connection = xacf.createXAConnection();
+ final Connection connection = cf.createConnection();
assertThat(connection, instanceOf(XAConnection.class));
testConnection(connection);
}