You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by jf...@apache.org on 2019/08/29 16:39:43 UTC

[tomee] branch master updated: Fix several JMS/JMS2.0 bugs

This is an automated email from the ASF dual-hosted git repository.

jfisher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomee.git


The following commit(s) were added to refs/heads/master by this push:
     new f56b2b2  Fix several JMS/JMS2.0 bugs
f56b2b2 is described below

commit f56b2b25a65b13472c6e6a02757cc2dc978c9953
Author: Jonathan S. Fisher <jo...@emoneyusa.com>
AuthorDate: Mon Aug 26 20:09:10 2019 -0500

    Fix several JMS/JMS2.0 bugs
    
    * Fix TOMEE-2229: JMSContext Injected by TomEE does not participate in JTA
    * Fix TOMEE-2650: TomEE Creating non-JTA Sessions
    * Fix TOMEE-2651: TomEE doesn't return JMS Connection to pool after a Transaction Timeout
    * Fix TOMEE-2652: TransactionSupport parameter not honored on JMS Connection Factory resources
---
 .../openejb/resource/AutoConnectionTracker.java    |  26 ++--
 .../openejb/resource/activemq/jms2/JMS2.java       |  13 ++
 .../resource/activemq/jms2/JMSContextImpl.java     |   7 +-
 .../resource/activemq/jms2/JMSProducerImpl.java    |   1 +
 .../activemq/jms2/TomEEConnectionFactory.java      |  42 +++++--
 .../activemq/jms2/TomEEManagedConnection.java      |  10 +-
 .../jms2/TomEEManagedConnectionFactory.java        |  45 ++++++-
 .../activemq/jms2/TomEEManagedConnectionProxy.java | 132 ++++++++++++++++++++-
 .../activemq/jms2/TomEERAConnectionFactory.java    | 108 ++++++++++++++++-
 .../activemq/jms2/cdi/JMS2CDIExtension.java        |  18 ++-
 10 files changed, 360 insertions(+), 42 deletions(-)

diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
index 49135c8..b5080ab 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
@@ -22,6 +22,7 @@ import org.apache.geronimo.connector.outbound.ConnectionReturnAction;
 import org.apache.geronimo.connector.outbound.ConnectionTrackingInterceptor;
 import org.apache.geronimo.connector.outbound.ManagedConnectionInfo;
 import org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTracker;
+import org.apache.geronimo.transaction.manager.TransactionImpl;
 import org.apache.openejb.dyni.DynamicSubclass;
 import org.apache.openejb.loader.SystemInstance;
 import org.apache.openejb.util.LogCategory;
@@ -32,7 +33,6 @@ import javax.resource.ResourceException;
 import javax.resource.spi.DissociatableManagedConnection;
 import javax.transaction.Synchronization;
 import javax.transaction.SystemException;
-import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import javax.transaction.TransactionSynchronizationRegistry;
 import java.lang.ref.PhantomReference;
@@ -58,6 +58,7 @@ public class AutoConnectionTracker implements ConnectionTracker {
     private final TransactionManager txMgr;
     private final Logger logger = Logger.getInstance(LogCategory.OPENEJB_CONNECTOR, "org.apache.openejb.resource");
     private final ConcurrentMap<ManagedConnectionInfo, ProxyPhantomReference> references = new ConcurrentHashMap<>();
+    @SuppressWarnings("rawtypes")
     private final ReferenceQueue referenceQueue = new ReferenceQueue();
     private final ConcurrentMap<Class<?>, Class<?>> proxies = new ConcurrentHashMap<>();
     private final ConcurrentMap<Class<?>, Class<?>[]> interfaces = new ConcurrentHashMap<>();
@@ -80,6 +81,7 @@ public class AutoConnectionTracker implements ConnectionTracker {
      * @param connectionInfo the connection to be obtained
      * @param key            the unique id of the connection manager
      */
+    @Override
     public void setEnvironment(final ConnectionInfo connectionInfo, final String key) {
         ProxyPhantomReference reference = (ProxyPhantomReference) referenceQueue.poll();
         while (reference != null) {
@@ -103,12 +105,14 @@ public class AutoConnectionTracker implements ConnectionTracker {
      * @param connectionInfo the connection that was obtained
      * @param reassociate    should always be false
      */
+    @SuppressWarnings("unchecked")
+    @Override
     public void handleObtained(final ConnectionTrackingInterceptor interceptor, final ConnectionInfo connectionInfo, final boolean reassociate) throws ResourceException {
         if (txMgr != null && registry != null) {
-            Transaction currentTx = null;
+            TransactionImpl currentTx = null;
             try {
-                currentTx = txMgr.getTransaction();
-            } catch (SystemException e) {
+                currentTx = (TransactionImpl) txMgr.getTransaction();
+            } catch (SystemException | ClassCastException e) {
                 //ignore
             }
 
@@ -162,16 +166,18 @@ public class AutoConnectionTracker implements ConnectionTracker {
      * @param connectionInfo the connection that was released
      * @param action         ignored
      */
+    @Override
+    @SuppressWarnings("unchecked")
     public void handleReleased(final ConnectionTrackingInterceptor interceptor, final ConnectionInfo connectionInfo, final ConnectionReturnAction action) {
-        Transaction currentTx = null;
+        TransactionImpl currentTx = null;
         try {
-            currentTx = txMgr.getTransaction();
-        } catch (SystemException e) {
+            currentTx = (TransactionImpl) txMgr.getTransaction();
+        } catch (SystemException | ClassCastException e) {
             //ignore
         }
 
         if (currentTx != null) {
-            Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>> txConnections = (Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>>) registry.getResource(KEY);
+            Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>> txConnections = (Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>>) currentTx.getResource(KEY);
             if (txConnections == null) {
                 txConnections = new HashMap<>();
                 registry.putResource(KEY, txConnections);
@@ -185,6 +191,7 @@ public class AutoConnectionTracker implements ConnectionTracker {
             }
         }
 
+        @SuppressWarnings("rawtypes")
         final PhantomReference phantomReference = references.remove(connectionInfo.getManagedConnectionInfo());
         if (phantomReference != null) {
             phantomReference.clear();
@@ -284,6 +291,7 @@ public class AutoConnectionTracker implements ConnectionTracker {
             this.handle = handle;
         }
 
+        @Override
         public Object invoke(final Object object, final Method method, final Object[] args) throws Throwable {
             if (method.getDeclaringClass() == Object.class) {
                 if (method.getName().equals("finalize")) {
@@ -322,7 +330,7 @@ public class AutoConnectionTracker implements ConnectionTracker {
         public ProxyPhantomReference(final ConnectionTrackingInterceptor interceptor,
                                      final ManagedConnectionInfo managedConnectionInfo,
                                      final ConnectionInvocationHandler handler,
-                                     final ReferenceQueue referenceQueue) {
+                                     @SuppressWarnings("rawtypes") final ReferenceQueue referenceQueue) {
             super(handler, referenceQueue);
             this.interceptor = interceptor;
             this.managedConnectionInfo = managedConnectionInfo;
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
index 1f7e78a..29c0ebf 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
@@ -43,6 +43,9 @@ import javax.jms.TransactionInProgressException;
 import javax.jms.TransactionInProgressRuntimeException;
 import javax.jms.TransactionRolledBackException;
 import javax.jms.TransactionRolledBackRuntimeException;
+import javax.transaction.SystemException;
+
+import org.apache.openejb.OpenEJB;
 
 public final class JMS2 {
     private JMS2() {
@@ -83,6 +86,7 @@ public final class JMS2 {
         return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e);
     }
 
+    @SuppressWarnings("unchecked")
     public static <T extends Message> T wrap(final T message10) {
         if (message10 == null) {
             return null;
@@ -112,4 +116,13 @@ public final class JMS2 {
         }
         return (T) new DelegateMessage(message10);
     }
+
+
+    public static boolean inTx() {
+        try {
+            return OpenEJB.getTransactionManager().getTransaction() != null;
+        } catch (SystemException | NullPointerException e) {
+            return false;
+        }
+    }
 }
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
index db975be..872b1c9 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
@@ -42,6 +42,7 @@ import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.XAConnection;
+
 import java.io.Serializable;
 
 import static org.apache.openejb.resource.activemq.jms2.JMS2.toRuntimeException;
@@ -80,7 +81,6 @@ public class JMSContextImpl implements JMSContext {
         if (connection == null) {
             try {
                 connection = username != null ? factory.createConnection(username, password) : factory.createConnection();
-                xa = XAConnection.class.isInstance(connection);
             } catch (final JMSException e) {
                 throw toRuntimeException(e);
             }
@@ -96,10 +96,11 @@ public class JMSContextImpl implements JMSContext {
                 }
                 if (session == null) {
                     try {
+                        Connection connection = connection();
                         if (xa) {
-                            session = XAConnection.class.cast(connection()).createXASession();
+                            session = XAConnection.class.cast(connection).createXASession();
                         } else {
-                            session = connection().createSession(sessionMode);
+                            session = connection.createSession(sessionMode);
                         }
                     } catch (final JMSException e) {
                         throw toRuntimeException(e);
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
index 4d9a504..89841ca 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
@@ -40,6 +40,7 @@ import java.util.Set;
 import static org.apache.openejb.resource.activemq.jms2.JMS2.toRuntimeException;
 import static org.apache.openejb.resource.activemq.jms2.JMS2.wrap;
 
+@SuppressWarnings("deprecation")
 class JMSProducerImpl implements JMSProducer {
     private final JMSContextImpl context;
     private final MessageProducer producer;
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java
index 7bb6a52..9af3d0d 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java
@@ -17,35 +17,63 @@
 package org.apache.openejb.resource.activemq.jms2;
 
 import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQSslConnectionFactory;
+import org.apache.activemq.ActiveMQXASslConnectionFactory;
 import org.apache.activemq.management.JMSStatsImpl;
 import org.apache.activemq.transport.Transport;
 
 import javax.jms.JMSContext;
 
-public class TomEEConnectionFactory extends ActiveMQSslConnectionFactory {
+public class TomEEConnectionFactory extends ActiveMQXASslConnectionFactory {
     @Override
     protected ActiveMQConnection createActiveMQConnection(final Transport transport, final JMSStatsImpl stats) throws Exception {
-        return new TomEEConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats);
+        return new TomEEXAConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats);
     }
 
     @Override
     public JMSContext createContext() {
-        return new JMSContextImpl(this, -1, null, null, false);
+        boolean inTx = JMS2.inTx();
+        int mode;
+        if (inTx) {
+            mode = -1;
+        } else {
+            mode = JMSContext.AUTO_ACKNOWLEDGE;
+        }
+        return new JMSContextImpl(this, mode, null, null, inTx);
     }
 
     @Override
     public JMSContext createContext(final int sessionMode) {
-        return new JMSContextImpl(this, sessionMode, null, null, false);
+        boolean inTx = JMS2.inTx();
+        int mode;
+        if (inTx) {
+            mode = -1;
+        } else {
+            mode = sessionMode;
+        }
+        return new JMSContextImpl(this, mode, null, null, inTx);
     }
 
     @Override
     public JMSContext createContext(final String userName, final String password) {
-        return new JMSContextImpl(this, -1, userName, password, false);
+        boolean inTx = JMS2.inTx();
+        int mode;
+        if (inTx) {
+            mode = -1;
+        } else {
+            mode = JMSContext.AUTO_ACKNOWLEDGE;
+        }
+        return new JMSContextImpl(this, mode, userName, password, inTx);
     }
 
     @Override
     public JMSContext createContext(final String userName, final String password, final int sessionMode) {
-        return new JMSContextImpl(this, sessionMode, userName, password, false);
+        boolean inTx = JMS2.inTx();
+        int mode;
+        if (inTx) {
+            mode = -1;
+        } else {
+            mode = sessionMode;
+        }
+        return new JMSContextImpl(this, mode, userName, password, inTx);
     }
 }
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java
index 4d7bb29..fe8e2b7 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java
@@ -23,12 +23,14 @@ import org.apache.activemq.ra.ManagedConnectionProxy;
 
 import javax.resource.ResourceException;
 import javax.resource.spi.ConnectionRequestInfo;
+import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
 import javax.security.auth.Subject;
 import java.lang.reflect.Field;
 import java.util.Collection;
 
 public class TomEEManagedConnection extends ActiveMQManagedConnection {
     private static final Field PROXY_CONNECTIONS_FIELD;
+    private TransactionSupportLevel transactionSupportLevel;
 
     static {
         try {
@@ -41,14 +43,16 @@ public class TomEEManagedConnection extends ActiveMQManagedConnection {
 
     private final Collection<ManagedConnectionProxy> proxyConnections;
 
+    @SuppressWarnings("unchecked")
     public TomEEManagedConnection(final Subject subject, final ActiveMQConnection physicalConnection,
-                                  final ActiveMQConnectionRequestInfo info) throws ResourceException {
+                                  final ActiveMQConnectionRequestInfo info, TransactionSupportLevel transactionSupportLevel) throws ResourceException {
         super(subject, physicalConnection, info);
         try {
             proxyConnections = Collection.class.cast(PROXY_CONNECTIONS_FIELD.get(this));
         } catch (final IllegalAccessException e) {
             throw new IllegalStateException("Incompatible AMQ", e);
         }
+        this.transactionSupportLevel = transactionSupportLevel;
     }
 
     @Override
@@ -57,4 +61,8 @@ public class TomEEManagedConnection extends ActiveMQManagedConnection {
         proxyConnections.add(proxy);
         return proxy;
     }
+
+    public TransactionSupportLevel getTransactionSupportLevel() {
+        return transactionSupportLevel;
+    }
 }
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java
index 44ea157..22c44d0 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java
@@ -22,17 +22,25 @@ import org.apache.activemq.ra.ActiveMQManagedConnectionFactory;
 import org.apache.activemq.ra.MessageActivationSpec;
 import org.apache.activemq.ra.SimpleConnectionManager;
 
+import java.util.Locale;
+
 import javax.jms.JMSException;
 import javax.resource.ResourceException;
 import javax.resource.spi.ConnectionManager;
 import javax.resource.spi.ConnectionRequestInfo;
 import javax.resource.spi.ManagedConnection;
+import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
 import javax.security.auth.Subject;
 
 public class TomEEManagedConnectionFactory extends ActiveMQManagedConnectionFactory {
+    private static final long serialVersionUID = 1L;
+    private TransactionSupportLevel transactionSupportLevel;
+
     @Override
     public Object createConnectionFactory(final ConnectionManager manager) throws ResourceException {
-        return new TomEERAConnectionFactory(this, manager, getInfo());
+        TomEERAConnectionFactory factory = new TomEERAConnectionFactory(this, manager, getInfo());
+        factory.setTransactionSupport(transactionSupportLevel);
+        return factory;
     }
 
     @Override
@@ -56,7 +64,7 @@ public class TomEEManagedConnectionFactory extends ActiveMQManagedConnectionFact
             amqInfo = getInfo();
         }
         try {
-            return new TomEEManagedConnection(subject, makeConnection(amqInfo), amqInfo);
+            return new TomEEManagedConnection(subject, makeConnection(amqInfo), amqInfo, transactionSupportLevel);
         } catch (final JMSException e) {
             throw new ResourceException("Could not create connection.", e);
         }
@@ -67,4 +75,37 @@ public class TomEEManagedConnectionFactory extends ActiveMQManagedConnectionFact
         return !(object == null || !getClass().isInstance(object))
             && ((ActiveMQManagedConnectionFactory) object).getInfo().equals(getInfo());
     }
+
+    public String getTransactionSupport() {
+        switch (transactionSupportLevel) {
+            case XATransaction:
+                return "xa";
+            case LocalTransaction:
+                return "local";
+            case NoTransaction:
+                return "none";
+            default:
+                return null;
+        }
+    }
+
+    public void setTransactionSupport(String transactionSupport) {
+        if (transactionSupport == null) {
+            throw new IllegalArgumentException("transactionSupport cannot be not null");
+        } else {
+            switch (transactionSupport.toLowerCase(Locale.ENGLISH)) {
+                case "xa":
+                    transactionSupportLevel = TransactionSupportLevel.XATransaction;
+                    break;
+                case "local":
+                    transactionSupportLevel = TransactionSupportLevel.LocalTransaction;
+                    break;
+                case "none":
+                    transactionSupportLevel = TransactionSupportLevel.NoTransaction;
+                    break;
+                default:
+                    throw new IllegalArgumentException("transactionSupport must be xa, local, or none:" + transactionSupport);
+            }
+        }
+    }
 }
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
index f07b7a2..aefe4f9 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
@@ -18,21 +18,28 @@ package org.apache.openejb.resource.activemq.jms2;
 
 import org.apache.activemq.ra.ActiveMQManagedConnection;
 import org.apache.activemq.ra.ManagedConnectionProxy;
+import org.apache.openejb.OpenEJB;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
 import javax.jms.ExceptionListener;
+import javax.jms.JMSContext;
 import javax.jms.JMSException;
 import javax.jms.QueueConnection;
 import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
 import javax.resource.spi.ConnectionRequestInfo;
+import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
 
 public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
     // cause org.apache.openejb.resource.AutoConnectionTracker.proxyConnection() just uses getInterfaces()
-    implements Connection, QueueConnection, TopicConnection, ExceptionListener {
+    implements Connection, QueueConnection, TopicConnection, ExceptionListener, XAConnection {
 
     private volatile ActiveMQManagedConnection connection;
 
@@ -48,13 +55,119 @@ public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
     }
 
     @Override
-    public Session createSession(final int sessionMode) throws JMSException {
-        return connection.getPhysicalConnection().createSession(sessionMode);
+    public Session createSession(final int acknowledgeMode) throws JMSException {
+        // For the next three methods, we ignore the requested session mode per the
+        // spec:
+        // https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html#createSession-int-
+        //
+        // But we also allow the user to override this behavior. If they set
+        // transactionSupport on the connection factory
+        // we will not return to them a xa session, even though the underlying physical
+        // connection may support XA.
+
+        int mode;
+        boolean xa;
+        TransactionSupportLevel transactionSupportLevel;
+        if (connection instanceof TomEEManagedConnection) {
+            transactionSupportLevel = ((TomEEManagedConnection) connection).getTransactionSupportLevel();
+        } else {
+            transactionSupportLevel = TransactionSupportLevel.XATransaction;
+        }
+        switch (transactionSupportLevel) {
+            case XATransaction:
+                if (JMS2.inTx()) {
+                    mode = -1;
+                    xa = true;
+                    break;
+                }
+            case NoTransaction:
+                mode = acknowledgeMode;
+                xa = false;
+                break;
+            case LocalTransaction:
+                mode = JMSContext.SESSION_TRANSACTED;
+                xa = false;
+                break;
+            default:
+                throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+        }
+        if (xa) {
+            return createXASession();
+        } else {
+            return connection.getPhysicalConnection().createSession(mode);
+        }
+    }
+
+    @Override
+    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        int mode;
+        boolean xa;
+        TransactionSupportLevel transactionSupportLevel;
+        if (connection instanceof TomEEManagedConnection) {
+            transactionSupportLevel = ((TomEEManagedConnection) connection).getTransactionSupportLevel();
+        } else if (!transacted) {
+            transactionSupportLevel = TransactionSupportLevel.NoTransaction;
+        } else {
+            transactionSupportLevel = TransactionSupportLevel.XATransaction;
+        }
+        switch (transactionSupportLevel) {
+            case XATransaction:
+                if (JMS2.inTx()) {
+                    mode = -1;
+                    xa = true;
+                    break;
+                }
+            case NoTransaction:
+                mode = acknowledgeMode;
+                xa = false;
+                break;
+            case LocalTransaction:
+                mode = JMSContext.SESSION_TRANSACTED;
+                xa = false;
+                break;
+            default:
+                throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+        }
+        if (xa) {
+            return createXASession();
+        } else {
+            return connection.getPhysicalConnection().createSession(mode);
+        }
     }
 
     @Override
     public Session createSession() throws JMSException {
-        return connection.getPhysicalConnection().createSession();
+        int mode;
+        boolean xa;
+        TransactionSupportLevel transactionSupportLevel;
+        if (connection instanceof TomEEManagedConnection) {
+            transactionSupportLevel = ((TomEEManagedConnection) connection).getTransactionSupportLevel();
+        } else {
+            transactionSupportLevel = TransactionSupportLevel.XATransaction;
+        }
+        switch (transactionSupportLevel) {
+            case XATransaction:
+                if (JMS2.inTx()) {
+                    mode = -1;
+                    xa = true;
+                    break;
+                }
+            case NoTransaction:
+                mode = JMSContext.AUTO_ACKNOWLEDGE;
+                xa = false;
+                break;
+            case LocalTransaction:
+                mode = JMSContext.SESSION_TRANSACTED;
+                xa = false;
+                break;
+            default:
+                throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+        }
+        if (xa) {
+            return createXASession();
+        } else {
+            return connection.getPhysicalConnection().createSession(mode);
+        }
     }
 
     @Override
@@ -69,4 +182,15 @@ public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
                                                              final ServerSessionPool sessionPool, final int maxMessages) throws JMSException {
         return connection.getPhysicalConnection().createSharedConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages);
     }
+
+    @Override
+    public XASession createXASession() throws JMSException {
+        XASession session = ((XAConnection) connection.getPhysicalConnection()).createXASession();
+        try {
+            OpenEJB.getTransactionManager().getTransaction().enlistResource(session.getXAResource());
+        } catch (IllegalStateException | SystemException | RollbackException e) {
+            throw new RuntimeException(e);
+        }
+        return session;
+    }
 }
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
index 75f3582..1a76f9f 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
@@ -16,15 +16,18 @@
  */
 package org.apache.openejb.resource.activemq.jms2;
 
+import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
 import org.apache.activemq.ra.ActiveMQConnectionFactory;
 import org.apache.activemq.ra.ActiveMQConnectionRequestInfo;
 import org.apache.activemq.ra.ActiveMQManagedConnectionFactory;
 
 import javax.jms.JMSContext;
-import javax.jms.Session;
 import javax.resource.spi.ConnectionManager;
 
 public class TomEERAConnectionFactory extends ActiveMQConnectionFactory {
+    private static final long serialVersionUID = 1L;
+    private TransactionSupportLevel transactionSupportLevel = TransactionSupportLevel.XATransaction;
+
     public TomEERAConnectionFactory(final ActiveMQManagedConnectionFactory factory, final ConnectionManager manager,
                                     final ActiveMQConnectionRequestInfo connectionRequestInfo) {
         super(factory, manager, connectionRequestInfo);
@@ -32,21 +35,116 @@ public class TomEERAConnectionFactory extends ActiveMQConnectionFactory {
 
     @Override
     public JMSContext createContext() {
-        return new JMSContextImpl(this, Session.AUTO_ACKNOWLEDGE, null, null, false);
+        // See notes here. We _do_ allow the user to override session mode at the
+        // connectionFactory level, otherwise we follow the spec.
+        // https://docs.oracle.com/javaee/7/api/javax/jms/ConnectionFactory.html#createContext-int-
+        int mode;
+        boolean xa;
+        switch (transactionSupportLevel) {
+            case XATransaction:
+                if (JMS2.inTx()) {
+                    mode = -1;
+                    xa = true;
+                    break;
+                }
+            case NoTransaction:
+                mode = JMSContext.AUTO_ACKNOWLEDGE;
+                xa = false;
+                break;
+            case LocalTransaction:
+                mode = JMSContext.SESSION_TRANSACTED;
+                xa = false;
+                break;
+            default:
+                throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+        }
+        return new JMSContextImpl(this, mode, null, null, xa);
     }
 
     @Override
     public JMSContext createContext(final int sessionMode) {
-        return new JMSContextImpl(this, sessionMode, null, null, false);
+        int mode;
+        boolean xa;
+        switch (transactionSupportLevel) {
+            case XATransaction:
+                if (JMS2.inTx()) {
+                    mode = -1;
+                    xa = true;
+                    break;
+                }
+            case NoTransaction:
+                mode = sessionMode;
+                xa = false;
+                break;
+            case LocalTransaction:
+                mode = JMSContext.SESSION_TRANSACTED;
+                xa = false;
+                break;
+            default:
+                throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+        }
+        return new JMSContextImpl(this, mode, null, null, xa);
     }
 
     @Override
     public JMSContext createContext(final String userName, final String password) {
-        return new JMSContextImpl(this, Session.AUTO_ACKNOWLEDGE, userName, password, false);
+        int mode;
+        boolean xa;
+        switch (transactionSupportLevel) {
+            case XATransaction:
+                if (JMS2.inTx()) {
+                    mode = -1;
+                    xa = true;
+                    break;
+                }
+            case NoTransaction:
+                mode = JMSContext.AUTO_ACKNOWLEDGE;
+                xa = false;
+                break;
+            case LocalTransaction:
+                mode = JMSContext.SESSION_TRANSACTED;
+                xa = false;
+                break;
+            default:
+                throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+        }
+        return new JMSContextImpl(this, mode, userName, password, xa);
     }
 
     @Override
     public JMSContext createContext(final String userName, final String password, final int sessionMode) {
-        return new JMSContextImpl(this, sessionMode, userName, password, false);
+        int mode;
+        boolean xa;
+        switch (transactionSupportLevel) {
+            case XATransaction:
+                if (JMS2.inTx()) {
+                    mode = -1;
+                    xa = true;
+                    break;
+                }
+            case NoTransaction:
+                mode = sessionMode;
+                xa = false;
+                break;
+            case LocalTransaction:
+                mode = JMSContext.SESSION_TRANSACTED;
+                xa = false;
+                break;
+            default:
+                throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+        }
+        return new JMSContextImpl(this, mode, userName, password, xa);
+    }
+
+    public TransactionSupportLevel getTransactionSupport() {
+        return transactionSupportLevel;
+    }
+
+    public void setTransactionSupport(TransactionSupportLevel transactionSupportLevel) {
+        if (transactionSupportLevel == null) {
+            throw new IllegalArgumentException("transactionSupportLevel cannot be null");
+        } else {
+            this.transactionSupportLevel = transactionSupportLevel;
+        }
     }
 }
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java
index 1fe4f2a..042b39a 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java
@@ -16,10 +16,10 @@
  */
 package org.apache.openejb.resource.activemq.jms2.cdi;
 
-import org.apache.openejb.OpenEJB;
 import org.apache.openejb.assembler.classic.OpenEjbConfiguration;
 import org.apache.openejb.assembler.classic.ResourceInfo;
 import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.resource.activemq.jms2.JMS2;
 import org.apache.openejb.spi.ContainerSystem;
 
 import javax.annotation.PreDestroy;
@@ -56,7 +56,6 @@ import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.naming.NamingException;
-import javax.transaction.SystemException;
 import javax.transaction.TransactionScoped;
 import java.io.Serializable;
 import java.util.Map;
@@ -140,6 +139,7 @@ public class JMS2CDIExtension implements Extension {
     }
 
     public abstract static class AutoContextDestruction implements Serializable {
+          private static final long serialVersionUID = 1L;
         private transient Map<Key, JMSContext> contexts = new ConcurrentHashMap<>();
 
         public void push(final Key key, final JMSContext c) {
@@ -170,13 +170,16 @@ public class JMS2CDIExtension implements Extension {
 
     @RequestScoped
     public static class RequestAutoContextDestruction extends AutoContextDestruction {
+        private static final long serialVersionUID = 1L;
     }
 
     @TransactionScoped
     public static class TransactionAutoContextDestruction extends AutoContextDestruction {
+        private static final long serialVersionUID = 1L;
     }
 
     public static class Key implements Serializable {
+        private static final long serialVersionUID = 1L;
         private volatile ConnectionFactory connectionFactoryInstance;
         private final String connectionFactory;
         private final String username;
@@ -250,6 +253,7 @@ public class JMS2CDIExtension implements Extension {
     }
 
     public static class InternalJMSContext implements JMSContext, Serializable {
+        private static final long serialVersionUID = 1L;
         private final Key key;
         private final RequestAutoContextDestruction requestStorage;
         private final TransactionAutoContextDestruction transactionStorage;
@@ -261,7 +265,7 @@ public class JMS2CDIExtension implements Extension {
         }
 
         private synchronized JMSContext context() {
-            if (inTx()) {
+            if (JMS2.inTx()) {
                 return findOrCreateContext(transactionStorage);
             }
             return findOrCreateContext(requestStorage);
@@ -276,14 +280,6 @@ public class JMS2CDIExtension implements Extension {
             return jmsContext;
         }
 
-        private boolean inTx() {
-            try {
-                return OpenEJB.getTransactionManager().getTransaction() != null;
-            } catch (SystemException e) {
-                return false;
-            }
-        }
-
         // plain delegation now
 
         @Override