You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by jg...@apache.org on 2020/10/05 08:50:17 UTC

[tomee] 02/02: Fix issues with XA transactions after ActiveMQ upgrade

This is an automated email from the ASF dual-hosted git repository.

jgallimore pushed a commit to branch tomee-7.1.x
in repository https://gitbox.apache.org/repos/asf/tomee.git

commit 7e89a43b300a414111720e7ba82bb8b9caa749f7
Author: Jonathan Gallimore <jo...@jrg.me.uk>
AuthorDate: Tue Sep 29 10:26:16 2020 +0100

    Fix issues with XA transactions after ActiveMQ upgrade
---
 .../activemq/jms2/TomEEManagedConnectionProxy.java | 30 +++++++++++++++++-----
 .../activemq/jms2/TomEERAConnectionFactory.java    |  1 +
 .../resource/activemq/jms2/TomEEXAConnection.java  |  2 +-
 .../resource/activemq/jms2/TomEEXASession.java     |  5 ++++
 .../apache/openejb/activemq/AMQXASupportTest.java  | 11 ++------
 5 files changed, 32 insertions(+), 17 deletions(-)

diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
index aefe4f9..f52f928 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
@@ -36,6 +36,7 @@ import javax.resource.spi.ConnectionRequestInfo;
 import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
 import javax.transaction.RollbackException;
 import javax.transaction.SystemException;
+import javax.transaction.Transaction;
 
 public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
     // cause org.apache.openejb.resource.AutoConnectionTracker.proxyConnection() just uses getInterfaces()
@@ -131,7 +132,9 @@ public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
         if (xa) {
             return createXASession();
         } else {
-            return connection.getPhysicalConnection().createSession(mode);
+            final Session session = connection.getPhysicalConnection().createSession(mode);
+            enlistInTransactionIfNeeded(session);
+            return session;
         }
     }
 
@@ -166,7 +169,9 @@ public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
         if (xa) {
             return createXASession();
         } else {
-            return connection.getPhysicalConnection().createSession(mode);
+            final Session session = connection.getPhysicalConnection().createSession(mode);
+            enlistInTransactionIfNeeded(session);
+            return session;
         }
     }
 
@@ -186,11 +191,22 @@ public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
     @Override
     public XASession createXASession() throws JMSException {
         XASession session = ((XAConnection) connection.getPhysicalConnection()).createXASession();
-        try {
-            OpenEJB.getTransactionManager().getTransaction().enlistResource(session.getXAResource());
-        } catch (IllegalStateException | SystemException | RollbackException e) {
-            throw new RuntimeException(e);
-        }
+        enlistInTransactionIfNeeded(session);
         return session;
     }
+
+    private void enlistInTransactionIfNeeded(final Session session) {
+        if (session instanceof XASession) {
+            XASession xaSession = XASession.class.cast(session);
+
+            try {
+                final Transaction transaction = OpenEJB.getTransactionManager().getTransaction();
+                if (transaction != null) {
+                    transaction.enlistResource(xaSession.getXAResource());
+                }
+            } catch (IllegalStateException | SystemException | RollbackException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
 }
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
index 1a76f9f..6babdce 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
@@ -17,6 +17,7 @@
 package org.apache.openejb.resource.activemq.jms2;
 
 import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
+
 import org.apache.activemq.ra.ActiveMQConnectionFactory;
 import org.apache.activemq.ra.ActiveMQConnectionRequestInfo;
 import org.apache.activemq.ra.ActiveMQManagedConnectionFactory;
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java
index ff54579..ea71b77 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java
@@ -42,7 +42,7 @@ public class TomEEXAConnection extends ActiveMQXAConnection {
 
     @Override
     public Session createSession(final int sessionMode) throws JMSException {
-        return super.createSession(sessionMode == Session.SESSION_TRANSACTED, sessionMode);
+        return createSession(sessionMode == Session.SESSION_TRANSACTED, sessionMode);
     }
 
     @Override
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXASession.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXASession.java
index 9c811fb..881957d 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXASession.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXASession.java
@@ -60,4 +60,9 @@ public class TomEEXASession extends ActiveMQXASession {
     public MessageConsumer createSharedDurableConsumer(final Topic topic, final String name, final String messageSelector) throws JMSException {
         return createDurableSubscriber(topic, name, messageSelector, false);
     }
+
+    protected void doStartTransaction() throws JMSException {
+        // allow non transactional auto ack work on an XASession
+        // Seems ok by the spec that an XAConnection can be used without an XA tx
+    }
 }
diff --git a/container/openejb-core/src/test/java/org/apache/openejb/activemq/AMQXASupportTest.java b/container/openejb-core/src/test/java/org/apache/openejb/activemq/AMQXASupportTest.java
index 1391f1e..3b1bc67 100644
--- a/container/openejb-core/src/test/java/org/apache/openejb/activemq/AMQXASupportTest.java
+++ b/container/openejb-core/src/test/java/org/apache/openejb/activemq/AMQXASupportTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.openejb.activemq;
 
-import org.apache.activemq.ActiveMQXAConnectionFactory;
 import org.apache.openejb.jee.MessageDrivenBean;
 import org.apache.openejb.junit.ApplicationComposer;
 import org.apache.openejb.testing.Configuration;
@@ -69,9 +68,6 @@ public class AMQXASupportTest {
             .p("cf", "new://Resource?type=" + ConnectionFactory.class.getName())
             .p("cf.ResourceAdapter", "amq")
 
-            .p("xaCf", "new://Resource?class-name=" + ActiveMQXAConnectionFactory.class.getName())
-            .p("xaCf.BrokerURL", "vm://localhost")
-
             .build();
     }
 
@@ -83,9 +79,6 @@ public class AMQXASupportTest {
     @Resource(name = "target")
     private Queue destination;
 
-    @Resource(name = "xaCf")
-    private XAConnectionFactory xacf;
-
     @Resource(name = "cf")
     private ConnectionFactory cf;
 
@@ -105,9 +98,9 @@ public class AMQXASupportTest {
 
     @Test
     public void xaCode() throws Exception {
-        assertNotNull(xacf);
+        assertNotNull(cf);
 
-        final Connection connection = xacf.createXAConnection();
+        final Connection connection = cf.createConnection();
         assertThat(connection, instanceOf(XAConnection.class));
         testConnection(connection);
     }