You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by jf...@apache.org on 2019/08/29 16:36:13 UTC
[tomee] branch tomee-7.1.x updated: Fix several JMS/JMS2.0 bugs
This is an automated email from the ASF dual-hosted git repository.
jfisher pushed a commit to branch tomee-7.1.x
in repository https://gitbox.apache.org/repos/asf/tomee.git
The following commit(s) were added to refs/heads/tomee-7.1.x by this push:
new d4097e3 Fix several JMS/JMS2.0 bugs
d4097e3 is described below
commit d4097e360673cef742d2c42ab664df754993e561
Author: Jonathan S. Fisher <jo...@emoneyusa.com>
AuthorDate: Mon Aug 26 20:09:10 2019 -0500
Fix several JMS/JMS2.0 bugs
* Fix TOMEE-2229: JMSContext Injected by TomEE does not participate in JTA
* Fix TOMEE-2650: TomEE Creating non-JTA Sessions
* Fix TOMEE-2651: TomEE doesn't return JMS Connection to pool after a Transaction Timeout
* Fix TOMEE-2652: TransactionSupport parameter not honored on JMS Connection Factory resources
---
.../openejb/resource/AutoConnectionTracker.java | 26 ++--
.../openejb/resource/activemq/jms2/JMS2.java | 13 ++
.../resource/activemq/jms2/JMSContextImpl.java | 7 +-
.../resource/activemq/jms2/JMSProducerImpl.java | 1 +
.../activemq/jms2/TomEEConnectionFactory.java | 42 +++++--
.../activemq/jms2/TomEEManagedConnection.java | 10 +-
.../jms2/TomEEManagedConnectionFactory.java | 45 ++++++-
.../activemq/jms2/TomEEManagedConnectionProxy.java | 132 ++++++++++++++++++++-
.../activemq/jms2/TomEERAConnectionFactory.java | 108 ++++++++++++++++-
.../activemq/jms2/cdi/JMS2CDIExtension.java | 18 ++-
10 files changed, 360 insertions(+), 42 deletions(-)
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
index 49f891e..3064aec 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
@@ -22,6 +22,7 @@ import org.apache.geronimo.connector.outbound.ConnectionReturnAction;
import org.apache.geronimo.connector.outbound.ConnectionTrackingInterceptor;
import org.apache.geronimo.connector.outbound.ManagedConnectionInfo;
import org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTracker;
+import org.apache.geronimo.transaction.manager.TransactionImpl;
import org.apache.openejb.dyni.DynamicSubclass;
import org.apache.openejb.loader.SystemInstance;
import org.apache.openejb.util.LogCategory;
@@ -32,7 +33,6 @@ import javax.resource.ResourceException;
import javax.resource.spi.DissociatableManagedConnection;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
-import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.TransactionSynchronizationRegistry;
import java.lang.ref.PhantomReference;
@@ -58,6 +58,7 @@ public class AutoConnectionTracker implements ConnectionTracker {
private final TransactionManager txMgr;
private final Logger logger = Logger.getInstance(LogCategory.OPENEJB_CONNECTOR, "org.apache.openejb.resource");
private final ConcurrentMap<ManagedConnectionInfo, ProxyPhantomReference> references = new ConcurrentHashMap<ManagedConnectionInfo, ProxyPhantomReference>();
+ @SuppressWarnings("rawtypes")
private final ReferenceQueue referenceQueue = new ReferenceQueue();
private final ConcurrentMap<Class<?>, Class<?>> proxies = new ConcurrentHashMap<>();
private final ConcurrentMap<Class<?>, Class<?>[]> interfaces = new ConcurrentHashMap<>();
@@ -80,6 +81,7 @@ public class AutoConnectionTracker implements ConnectionTracker {
* @param connectionInfo the connection to be obtained
* @param key the unique id of the connection manager
*/
+ @Override
public void setEnvironment(final ConnectionInfo connectionInfo, final String key) {
ProxyPhantomReference reference = (ProxyPhantomReference) referenceQueue.poll();
while (reference != null) {
@@ -103,12 +105,14 @@ public class AutoConnectionTracker implements ConnectionTracker {
* @param connectionInfo the connection that was obtained
* @param reassociate should always be false
*/
+ @SuppressWarnings("unchecked")
+ @Override
public void handleObtained(final ConnectionTrackingInterceptor interceptor, final ConnectionInfo connectionInfo, final boolean reassociate) throws ResourceException {
if (txMgr != null && registry != null) {
- Transaction currentTx = null;
+ TransactionImpl currentTx = null;
try {
- currentTx = txMgr.getTransaction();
- } catch (SystemException e) {
+ currentTx = (TransactionImpl) txMgr.getTransaction();
+ } catch (SystemException | ClassCastException e) {
//ignore
}
@@ -166,16 +170,18 @@ public class AutoConnectionTracker implements ConnectionTracker {
* @param connectionInfo the connection that was released
* @param action ignored
*/
+ @Override
+ @SuppressWarnings("unchecked")
public void handleReleased(final ConnectionTrackingInterceptor interceptor, final ConnectionInfo connectionInfo, final ConnectionReturnAction action) {
- Transaction currentTx = null;
+ TransactionImpl currentTx = null;
try {
- currentTx = txMgr.getTransaction();
- } catch (SystemException e) {
+ currentTx = (TransactionImpl) txMgr.getTransaction();
+ } catch (SystemException | ClassCastException e) {
//ignore
}
if (currentTx != null) {
- Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>> txConnections = (Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>>) registry.getResource(KEY);
+ Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>> txConnections = (Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>>) currentTx.getResource(KEY);
if (txConnections == null) {
txConnections = new HashMap<ManagedConnectionInfo, Map<ConnectionInfo, Object>>();
registry.putResource(KEY, txConnections);
@@ -193,6 +199,7 @@ public class AutoConnectionTracker implements ConnectionTracker {
}
}
+ @SuppressWarnings("rawtypes")
final PhantomReference phantomReference = references.remove(connectionInfo.getManagedConnectionInfo());
if (phantomReference != null) {
phantomReference.clear();
@@ -292,6 +299,7 @@ public class AutoConnectionTracker implements ConnectionTracker {
this.handle = handle;
}
+ @Override
public Object invoke(final Object object, final Method method, final Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
if (method.getName().equals("finalize")) {
@@ -330,7 +338,7 @@ public class AutoConnectionTracker implements ConnectionTracker {
public ProxyPhantomReference(final ConnectionTrackingInterceptor interceptor,
final ManagedConnectionInfo managedConnectionInfo,
final ConnectionInvocationHandler handler,
- final ReferenceQueue referenceQueue) {
+ @SuppressWarnings("rawtypes") final ReferenceQueue referenceQueue) {
super(handler, referenceQueue);
this.interceptor = interceptor;
this.managedConnectionInfo = managedConnectionInfo;
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
index 1f7e78a..29c0ebf 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
@@ -43,6 +43,9 @@ import javax.jms.TransactionInProgressException;
import javax.jms.TransactionInProgressRuntimeException;
import javax.jms.TransactionRolledBackException;
import javax.jms.TransactionRolledBackRuntimeException;
+import javax.transaction.SystemException;
+
+import org.apache.openejb.OpenEJB;
public final class JMS2 {
private JMS2() {
@@ -83,6 +86,7 @@ public final class JMS2 {
return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
+ @SuppressWarnings("unchecked")
public static <T extends Message> T wrap(final T message10) {
if (message10 == null) {
return null;
@@ -112,4 +116,13 @@ public final class JMS2 {
}
return (T) new DelegateMessage(message10);
}
+
+
+ public static boolean inTx() {
+ try {
+ return OpenEJB.getTransactionManager().getTransaction() != null;
+ } catch (SystemException | NullPointerException e) {
+ return false;
+ }
+ }
}
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
index db975be..872b1c9 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
@@ -42,6 +42,7 @@ import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.XAConnection;
+
import java.io.Serializable;
import static org.apache.openejb.resource.activemq.jms2.JMS2.toRuntimeException;
@@ -80,7 +81,6 @@ public class JMSContextImpl implements JMSContext {
if (connection == null) {
try {
connection = username != null ? factory.createConnection(username, password) : factory.createConnection();
- xa = XAConnection.class.isInstance(connection);
} catch (final JMSException e) {
throw toRuntimeException(e);
}
@@ -96,10 +96,11 @@ public class JMSContextImpl implements JMSContext {
}
if (session == null) {
try {
+ Connection connection = connection();
if (xa) {
- session = XAConnection.class.cast(connection()).createXASession();
+ session = XAConnection.class.cast(connection).createXASession();
} else {
- session = connection().createSession(sessionMode);
+ session = connection.createSession(sessionMode);
}
} catch (final JMSException e) {
throw toRuntimeException(e);
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
index 9969639..eabfad8 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
@@ -40,6 +40,7 @@ import java.util.Set;
import static org.apache.openejb.resource.activemq.jms2.JMS2.toRuntimeException;
import static org.apache.openejb.resource.activemq.jms2.JMS2.wrap;
+@SuppressWarnings("deprecation")
class JMSProducerImpl implements JMSProducer {
private final JMSContextImpl context;
private final MessageProducer producer;
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java
index 7bb6a52..9af3d0d 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java
@@ -17,35 +17,63 @@
package org.apache.openejb.resource.activemq.jms2;
import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQSslConnectionFactory;
+import org.apache.activemq.ActiveMQXASslConnectionFactory;
import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.transport.Transport;
import javax.jms.JMSContext;
-public class TomEEConnectionFactory extends ActiveMQSslConnectionFactory {
+public class TomEEConnectionFactory extends ActiveMQXASslConnectionFactory {
@Override
protected ActiveMQConnection createActiveMQConnection(final Transport transport, final JMSStatsImpl stats) throws Exception {
- return new TomEEConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats);
+ return new TomEEXAConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats);
}
@Override
public JMSContext createContext() {
- return new JMSContextImpl(this, -1, null, null, false);
+ boolean inTx = JMS2.inTx();
+ int mode;
+ if (inTx) {
+ mode = -1;
+ } else {
+ mode = JMSContext.AUTO_ACKNOWLEDGE;
+ }
+ return new JMSContextImpl(this, mode, null, null, inTx);
}
@Override
public JMSContext createContext(final int sessionMode) {
- return new JMSContextImpl(this, sessionMode, null, null, false);
+ boolean inTx = JMS2.inTx();
+ int mode;
+ if (inTx) {
+ mode = -1;
+ } else {
+ mode = sessionMode;
+ }
+ return new JMSContextImpl(this, mode, null, null, inTx);
}
@Override
public JMSContext createContext(final String userName, final String password) {
- return new JMSContextImpl(this, -1, userName, password, false);
+ boolean inTx = JMS2.inTx();
+ int mode;
+ if (inTx) {
+ mode = -1;
+ } else {
+ mode = JMSContext.AUTO_ACKNOWLEDGE;
+ }
+ return new JMSContextImpl(this, mode, userName, password, inTx);
}
@Override
public JMSContext createContext(final String userName, final String password, final int sessionMode) {
- return new JMSContextImpl(this, sessionMode, userName, password, false);
+ boolean inTx = JMS2.inTx();
+ int mode;
+ if (inTx) {
+ mode = -1;
+ } else {
+ mode = sessionMode;
+ }
+ return new JMSContextImpl(this, mode, userName, password, inTx);
}
}
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java
index 4d7bb29..fe8e2b7 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java
@@ -23,12 +23,14 @@ import org.apache.activemq.ra.ManagedConnectionProxy;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionRequestInfo;
+import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
import javax.security.auth.Subject;
import java.lang.reflect.Field;
import java.util.Collection;
public class TomEEManagedConnection extends ActiveMQManagedConnection {
private static final Field PROXY_CONNECTIONS_FIELD;
+ private TransactionSupportLevel transactionSupportLevel;
static {
try {
@@ -41,14 +43,16 @@ public class TomEEManagedConnection extends ActiveMQManagedConnection {
private final Collection<ManagedConnectionProxy> proxyConnections;
+ @SuppressWarnings("unchecked")
public TomEEManagedConnection(final Subject subject, final ActiveMQConnection physicalConnection,
- final ActiveMQConnectionRequestInfo info) throws ResourceException {
+ final ActiveMQConnectionRequestInfo info, TransactionSupportLevel transactionSupportLevel) throws ResourceException {
super(subject, physicalConnection, info);
try {
proxyConnections = Collection.class.cast(PROXY_CONNECTIONS_FIELD.get(this));
} catch (final IllegalAccessException e) {
throw new IllegalStateException("Incompatible AMQ", e);
}
+ this.transactionSupportLevel = transactionSupportLevel;
}
@Override
@@ -57,4 +61,8 @@ public class TomEEManagedConnection extends ActiveMQManagedConnection {
proxyConnections.add(proxy);
return proxy;
}
+
+ public TransactionSupportLevel getTransactionSupportLevel() {
+ return transactionSupportLevel;
+ }
}
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java
index 44ea157..22c44d0 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java
@@ -22,17 +22,25 @@ import org.apache.activemq.ra.ActiveMQManagedConnectionFactory;
import org.apache.activemq.ra.MessageActivationSpec;
import org.apache.activemq.ra.SimpleConnectionManager;
+import java.util.Locale;
+
import javax.jms.JMSException;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionManager;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.ManagedConnection;
+import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
import javax.security.auth.Subject;
public class TomEEManagedConnectionFactory extends ActiveMQManagedConnectionFactory {
+ private static final long serialVersionUID = 1L;
+ private TransactionSupportLevel transactionSupportLevel;
+
@Override
public Object createConnectionFactory(final ConnectionManager manager) throws ResourceException {
- return new TomEERAConnectionFactory(this, manager, getInfo());
+ TomEERAConnectionFactory factory = new TomEERAConnectionFactory(this, manager, getInfo());
+ factory.setTransactionSupport(transactionSupportLevel);
+ return factory;
}
@Override
@@ -56,7 +64,7 @@ public class TomEEManagedConnectionFactory extends ActiveMQManagedConnectionFact
amqInfo = getInfo();
}
try {
- return new TomEEManagedConnection(subject, makeConnection(amqInfo), amqInfo);
+ return new TomEEManagedConnection(subject, makeConnection(amqInfo), amqInfo, transactionSupportLevel);
} catch (final JMSException e) {
throw new ResourceException("Could not create connection.", e);
}
@@ -67,4 +75,37 @@ public class TomEEManagedConnectionFactory extends ActiveMQManagedConnectionFact
return !(object == null || !getClass().isInstance(object))
&& ((ActiveMQManagedConnectionFactory) object).getInfo().equals(getInfo());
}
+
+ public String getTransactionSupport() {
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ return "xa";
+ case LocalTransaction:
+ return "local";
+ case NoTransaction:
+ return "none";
+ default:
+ return null;
+ }
+ }
+
+ public void setTransactionSupport(String transactionSupport) {
+ if (transactionSupport == null) {
+ throw new IllegalArgumentException("transactionSupport cannot be not null");
+ } else {
+ switch (transactionSupport.toLowerCase(Locale.ENGLISH)) {
+ case "xa":
+ transactionSupportLevel = TransactionSupportLevel.XATransaction;
+ break;
+ case "local":
+ transactionSupportLevel = TransactionSupportLevel.LocalTransaction;
+ break;
+ case "none":
+ transactionSupportLevel = TransactionSupportLevel.NoTransaction;
+ break;
+ default:
+ throw new IllegalArgumentException("transactionSupport must be xa, local, or none:" + transactionSupport);
+ }
+ }
+ }
}
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
index f07b7a2..aefe4f9 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
@@ -18,21 +18,28 @@ package org.apache.openejb.resource.activemq.jms2;
import org.apache.activemq.ra.ActiveMQManagedConnection;
import org.apache.activemq.ra.ManagedConnectionProxy;
+import org.apache.openejb.OpenEJB;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ExceptionListener;
+import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
import javax.resource.spi.ConnectionRequestInfo;
+import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
// cause org.apache.openejb.resource.AutoConnectionTracker.proxyConnection() just uses getInterfaces()
- implements Connection, QueueConnection, TopicConnection, ExceptionListener {
+ implements Connection, QueueConnection, TopicConnection, ExceptionListener, XAConnection {
private volatile ActiveMQManagedConnection connection;
@@ -48,13 +55,119 @@ public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
}
@Override
- public Session createSession(final int sessionMode) throws JMSException {
- return connection.getPhysicalConnection().createSession(sessionMode);
+ public Session createSession(final int acknowledgeMode) throws JMSException {
+ // For the next three methods, we ignore the requested session mode per the
+ // spec:
+ // https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html#createSession-int-
+ //
+ // But we also allow the user to override this behavior. If they set
+ // transactionSupport on the connection factory
+ // we will not return to them a xa session, even though the underlying physical
+ // connection may support XA.
+
+ int mode;
+ boolean xa;
+ TransactionSupportLevel transactionSupportLevel;
+ if (connection instanceof TomEEManagedConnection) {
+ transactionSupportLevel = ((TomEEManagedConnection) connection).getTransactionSupportLevel();
+ } else {
+ transactionSupportLevel = TransactionSupportLevel.XATransaction;
+ }
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = acknowledgeMode;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+ }
+ if (xa) {
+ return createXASession();
+ } else {
+ return connection.getPhysicalConnection().createSession(mode);
+ }
+ }
+
+ @Override
+ public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+ int mode;
+ boolean xa;
+ TransactionSupportLevel transactionSupportLevel;
+ if (connection instanceof TomEEManagedConnection) {
+ transactionSupportLevel = ((TomEEManagedConnection) connection).getTransactionSupportLevel();
+ } else if (!transacted) {
+ transactionSupportLevel = TransactionSupportLevel.NoTransaction;
+ } else {
+ transactionSupportLevel = TransactionSupportLevel.XATransaction;
+ }
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = acknowledgeMode;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+ }
+ if (xa) {
+ return createXASession();
+ } else {
+ return connection.getPhysicalConnection().createSession(mode);
+ }
}
@Override
public Session createSession() throws JMSException {
- return connection.getPhysicalConnection().createSession();
+ int mode;
+ boolean xa;
+ TransactionSupportLevel transactionSupportLevel;
+ if (connection instanceof TomEEManagedConnection) {
+ transactionSupportLevel = ((TomEEManagedConnection) connection).getTransactionSupportLevel();
+ } else {
+ transactionSupportLevel = TransactionSupportLevel.XATransaction;
+ }
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = JMSContext.AUTO_ACKNOWLEDGE;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+ }
+ if (xa) {
+ return createXASession();
+ } else {
+ return connection.getPhysicalConnection().createSession(mode);
+ }
}
@Override
@@ -69,4 +182,15 @@ public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
final ServerSessionPool sessionPool, final int maxMessages) throws JMSException {
return connection.getPhysicalConnection().createSharedConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages);
}
+
+ @Override
+ public XASession createXASession() throws JMSException {
+ XASession session = ((XAConnection) connection.getPhysicalConnection()).createXASession();
+ try {
+ OpenEJB.getTransactionManager().getTransaction().enlistResource(session.getXAResource());
+ } catch (IllegalStateException | SystemException | RollbackException e) {
+ throw new RuntimeException(e);
+ }
+ return session;
+ }
}
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
index 75f3582..1a76f9f 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
@@ -16,15 +16,18 @@
*/
package org.apache.openejb.resource.activemq.jms2;
+import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
import org.apache.activemq.ra.ActiveMQConnectionFactory;
import org.apache.activemq.ra.ActiveMQConnectionRequestInfo;
import org.apache.activemq.ra.ActiveMQManagedConnectionFactory;
import javax.jms.JMSContext;
-import javax.jms.Session;
import javax.resource.spi.ConnectionManager;
public class TomEERAConnectionFactory extends ActiveMQConnectionFactory {
+ private static final long serialVersionUID = 1L;
+ private TransactionSupportLevel transactionSupportLevel = TransactionSupportLevel.XATransaction;
+
public TomEERAConnectionFactory(final ActiveMQManagedConnectionFactory factory, final ConnectionManager manager,
final ActiveMQConnectionRequestInfo connectionRequestInfo) {
super(factory, manager, connectionRequestInfo);
@@ -32,21 +35,116 @@ public class TomEERAConnectionFactory extends ActiveMQConnectionFactory {
@Override
public JMSContext createContext() {
- return new JMSContextImpl(this, Session.AUTO_ACKNOWLEDGE, null, null, false);
+ // See notes here. We _do_ allow the user to override session mode at the
+ // connectionFactory level, otherwise we follow the spec.
+ // https://docs.oracle.com/javaee/7/api/javax/jms/ConnectionFactory.html#createContext-int-
+ int mode;
+ boolean xa;
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = JMSContext.AUTO_ACKNOWLEDGE;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+ }
+ return new JMSContextImpl(this, mode, null, null, xa);
}
@Override
public JMSContext createContext(final int sessionMode) {
- return new JMSContextImpl(this, sessionMode, null, null, false);
+ int mode;
+ boolean xa;
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = sessionMode;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+ }
+ return new JMSContextImpl(this, mode, null, null, xa);
}
@Override
public JMSContext createContext(final String userName, final String password) {
- return new JMSContextImpl(this, Session.AUTO_ACKNOWLEDGE, userName, password, false);
+ int mode;
+ boolean xa;
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = JMSContext.AUTO_ACKNOWLEDGE;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+ }
+ return new JMSContextImpl(this, mode, userName, password, xa);
}
@Override
public JMSContext createContext(final String userName, final String password, final int sessionMode) {
- return new JMSContextImpl(this, sessionMode, userName, password, false);
+ int mode;
+ boolean xa;
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = sessionMode;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
+ }
+ return new JMSContextImpl(this, mode, userName, password, xa);
+ }
+
+ public TransactionSupportLevel getTransactionSupport() {
+ return transactionSupportLevel;
+ }
+
+ public void setTransactionSupport(TransactionSupportLevel transactionSupportLevel) {
+ if (transactionSupportLevel == null) {
+ throw new IllegalArgumentException("transactionSupportLevel cannot be null");
+ } else {
+ this.transactionSupportLevel = transactionSupportLevel;
+ }
}
}
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java
index 98ef326..fef6048 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java
@@ -16,10 +16,10 @@
*/
package org.apache.openejb.resource.activemq.jms2.cdi;
-import org.apache.openejb.OpenEJB;
import org.apache.openejb.assembler.classic.OpenEjbConfiguration;
import org.apache.openejb.assembler.classic.ResourceInfo;
import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.resource.activemq.jms2.JMS2;
import org.apache.openejb.spi.ContainerSystem;
import javax.annotation.PreDestroy;
@@ -56,7 +56,6 @@ import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.NamingException;
-import javax.transaction.SystemException;
import javax.transaction.TransactionScoped;
import java.io.Serializable;
import java.util.Map;
@@ -139,6 +138,7 @@ public class JMS2CDIExtension implements Extension {
}
public abstract static class AutoContextDestruction implements Serializable {
+ private static final long serialVersionUID = 1L;
private transient Map<Key, JMSContext> contexts = new ConcurrentHashMap<>();
public void push(final Key key, final JMSContext c) {
@@ -169,13 +169,16 @@ public class JMS2CDIExtension implements Extension {
@RequestScoped
public static class RequestAutoContextDestruction extends AutoContextDestruction {
+ private static final long serialVersionUID = 1L;
}
@TransactionScoped
public static class TransactionAutoContextDestruction extends AutoContextDestruction {
+ private static final long serialVersionUID = 1L;
}
public static class Key implements Serializable {
+ private static final long serialVersionUID = 1L;
private volatile ConnectionFactory connectionFactoryInstance;
private final String connectionFactory;
private final String username;
@@ -249,6 +252,7 @@ public class JMS2CDIExtension implements Extension {
}
public static class InternalJMSContext implements JMSContext, Serializable {
+ private static final long serialVersionUID = 1L;
private final Key key;
private final RequestAutoContextDestruction requestStorage;
private final TransactionAutoContextDestruction transactionStorage;
@@ -260,7 +264,7 @@ public class JMS2CDIExtension implements Extension {
}
private synchronized JMSContext context() {
- if (inTx()) {
+ if (JMS2.inTx()) {
return findOrCreateContext(transactionStorage);
}
return findOrCreateContext(requestStorage);
@@ -275,14 +279,6 @@ public class JMS2CDIExtension implements Extension {
return jmsContext;
}
- private boolean inTx() {
- try {
- return OpenEJB.getTransactionManager().getTransaction() != null;
- } catch (SystemException e) {
- return false;
- }
- }
-
// plain delegation now
@Override