You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by jl...@apache.org on 2016/12/03 16:56:29 UTC
[35/50] tomee git commit: TOMEE-1900 better xa pooling handling
TOMEE-1900 better xa pooling handling
Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/b4bd095d
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/b4bd095d
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/b4bd095d
Branch: refs/heads/tomee-1.7.x
Commit: b4bd095d71dce43eeae756ec56f7a037c2791eff
Parents: 280808f
Author: Romain manni-Bucau <rm...@gmail.com>
Authored: Thu Aug 11 13:59:59 2016 +0200
Committer: Jonathan Gallimore <jo...@jrg.me.uk>
Committed: Thu Aug 11 13:59:17 2016 +0100
----------------------------------------------------------------------
.../jdbc/managed/local/ManagedConnection.java | 55 +++++----
.../tomee/jdbc/TomcatXADataSourceTest.java | 113 +++++++++++++++++++
2 files changed, 145 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tomee/blob/b4bd095d/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java
index 5b03dd5..c73250a 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java
@@ -101,9 +101,12 @@ public class ManagedConnection implements InvocationHandler {
return null;
}
- closeConnection(xaConnection, delegate);
+ closeConnection(true);
return null;
}
+ if ("isClosed".equals(mtdName) && closed) {
+ return true;
+ }
if (delegate == null) {
newConnection();
}
@@ -115,27 +118,29 @@ public class ManagedConnection implements InvocationHandler {
if (!currentTransaction.equals(transaction)) {
throw new SQLException("Connection can not be used while enlisted in another transaction");
}
- return invokeUnderTransaction(delegate, method, args);
+ return invokeUnderTransaction(method, args);
}
// get the already bound connection to the current transaction or enlist this one in the tx
- if (isUnderTransaction(transaction.getStatus())) {
+ final int transactionStatus = transaction.getStatus();
+ if (isUnderTransaction(transactionStatus)) {
Connection connection = Connection.class.cast(registry.getResource(key));
if (connection == null && delegate == null) {
newConnection();
- connection = delegate;
- registry.putResource(key, delegate);
currentTransaction = transaction;
try {
- transaction.enlistResource(getXAResource());
+ if (!transaction.enlistResource(getXAResource())) {
+ throw new SQLException("Unable to enlist connection in transaction: enlistResource returns 'false'.");
+ }
} catch (final RollbackException ignored) {
// no-op
} catch (final SystemException e) {
throw new SQLException("Unable to enlist connection the transaction", e);
}
- transaction.registerSynchronization(new ClosingSynchronization(xaConnection, delegate));
+ registry.putResource(key, delegate);
+ transaction.registerSynchronization(new ClosingSynchronization());
try {
setAutoCommit(false);
@@ -152,7 +157,14 @@ public class ManagedConnection implements InvocationHandler {
delegate = connection;
}
- return invokeUnderTransaction(connection, method, args);
+ return invokeUnderTransaction(method, args);
+ }
+
+ if ("isClosed".equals(mtdName) && closed) {
+ return true;
+ }
+ if ("close".equals(mtdName)) { // let it be handled by the ClosingSynchronisation since we have a tx there
+ return close();
}
// we shouldn't come here, tempted to just throw an exception
@@ -171,8 +183,8 @@ public class ManagedConnection implements InvocationHandler {
(key.user == null ? XADataSource.class.cast(key.ds).getXAConnection() : XADataSource.class.cast(key.ds).getXAConnection(key.user, key.pwd));
if (XAConnection.class.isInstance(connection)) {
xaConnection = XAConnection.class.cast(connection);
- delegate = xaConnection.getConnection();
xaResource = xaConnection.getXAResource();
+ delegate = xaConnection.getConnection();
} else {
delegate = Connection.class.cast(connection);
xaResource = new LocalXAResource(delegate);
@@ -194,7 +206,7 @@ public class ManagedConnection implements InvocationHandler {
}
}
- private Object invokeUnderTransaction(final Connection delegate, final Method method, final Object[] args) throws Exception {
+ private Object invokeUnderTransaction(final Method method, final Object[] args) throws Exception {
final String mtdName = method.getName();
if ("setAutoCommit".equals(mtdName)
|| "commit".equals(mtdName)
@@ -227,15 +239,7 @@ public class ManagedConnection implements InvocationHandler {
return new SQLException("can't call " + mtdName + " when the connection is JtaManaged");
}
- private static class ClosingSynchronization implements Synchronization {
- private final XAConnection xaConnection;
- private final Connection connection;
-
- public ClosingSynchronization(final XAConnection xaConnection, final Connection delegate) {
- this.xaConnection = xaConnection;
- this.connection = delegate;
- }
-
+ private class ClosingSynchronization implements Synchronization {
@Override
public void beforeCompletion() {
// no-op
@@ -243,19 +247,24 @@ public class ManagedConnection implements InvocationHandler {
@Override
public void afterCompletion(final int status) {
- closeConnection(xaConnection, connection);
+ closeConnection(true);
}
}
- private static void closeConnection(final XAConnection xaConnection, final Connection connection) {
+ private void closeConnection(final boolean force) {
+ if (!force && closed) {
+ return;
+ }
try {
if (xaConnection != null) { // handles the underlying connection
xaConnection.close();
- } else if (connection != null && !connection.isClosed()) {
- connection.close();
+ } else if (delegate != null && !delegate.isClosed()) {
+ delegate.close();
}
} catch (final SQLException e) {
// no-op
+ } finally {
+ close(); // set the flag
}
}
http://git-wip-us.apache.org/repos/asf/tomee/blob/b4bd095d/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java
----------------------------------------------------------------------
diff --git a/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java b/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java
index 96739d0..56a95ce 100644
--- a/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java
+++ b/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java
@@ -16,9 +16,11 @@
*/
package org.apache.tomee.jdbc;
+import org.apache.openejb.OpenEJB;
import org.apache.openejb.jee.EjbJar;
import org.apache.openejb.junit.ApplicationComposer;
import org.apache.openejb.resource.jdbc.managed.local.ManagedDataSource;
+import org.apache.openejb.testing.Classes;
import org.apache.openejb.testing.Configuration;
import org.apache.openejb.testing.Module;
import org.apache.openejb.testng.PropertiesBuilder;
@@ -28,24 +30,34 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import javax.annotation.Resource;
+import javax.ejb.EJB;
+import javax.ejb.Singleton;
import javax.sql.DataSource;
+import javax.transaction.Synchronization;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
@RunWith(ApplicationComposer.class)
public class TomcatXADataSourceTest {
@Resource(name = "xadb")
private DataSource ds;
+ @EJB
+ private TxP tx;
+
@Module
+ @Classes(TxP.class)
public EjbJar mandatory() {
return new EjbJar();
}
@@ -115,5 +127,106 @@ public class TomcatXADataSourceTest {
assertEquals(0, tds.getActive());
assertEquals(25, tds.getIdle());
}
+ // in tx - closing in tx
+ for (int it = 0; it < 5; it++) { // ensures it always works and not only the first time
+ for (int i = 0; i < 25; i++) {
+ tx.run(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Connection c = null;
+ for (int i = 0; i < 25; i++) {
+ final Connection connection = ds.getConnection();
+ connection.getMetaData(); // trigger connection retrieving otherwise nothing is done (pool is not used)
+ if (c != null) {
+ assertEquals(c.unwrap(Connection.class), connection.unwrap(Connection.class));
+ } else {
+ c = connection;
+ }
+ }
+ c.close(); // ensure we handle properly eager close invocations
+ } catch (final SQLException sql) {
+ fail(sql.getMessage());
+ }
+ }
+ });
+ }
+ assertEquals(0, tds.getActive());
+ assertEquals(25, tds.getIdle());
+ }
+
+
+ // in tx - closing after tx
+ for (int it = 0; it < 5; it++) { // ensures it always works and not only the first time
+ for (int i = 0; i < 25; i++) {
+ final AtomicReference<Connection> ref = new AtomicReference<Connection>();
+ tx.run(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Connection c = null;
+ for (int i = 0; i < 25; i++) {
+ final Connection connection = ds.getConnection();
+ connection.getMetaData(); // trigger connection retrieving otherwise nothing is done (pool is not used)
+ if (c != null) {
+ assertEquals(c.unwrap(Connection.class), connection.unwrap(Connection.class));
+ } else {
+ c = connection;
+ ref.set(c);
+ }
+ }
+ } catch (final SQLException sql) {
+ fail(sql.getMessage());
+ }
+ }
+ });
+ assertTrue(ref.get().isClosed()); // closed with tx
+ ref.get().close();
+ assertTrue(ref.get().isClosed());
+ }
+ assertEquals(0, tds.getActive());
+ assertEquals(25, tds.getIdle());
+ }
+
+ // in tx - closing in commit
+ for (int it = 0; it < 5; it++) { // ensures it always works and not only the first time
+ for (int i = 0; i < 25; i++) {
+ tx.run(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final Connection ref = ds.getConnection();
+ ref.getMetaData();
+ OpenEJB.getTransactionManager().getTransaction().registerSynchronization(new Synchronization() {
+ @Override
+ public void beforeCompletion() {
+ // no-op
+ }
+
+ @Override
+ public void afterCompletion(final int status) { // JPA does it
+ try {
+ ref.close();
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+ });
+ } catch (final Exception sql) {
+ fail(sql.getMessage());
+ }
+ }
+ });
+ }
+ assertEquals(0, tds.getActive());
+ assertEquals(25, tds.getIdle());
+ }
+ }
+
+ @Singleton
+ public static class TxP {
+ public void run(final Runnable r) {
+ r.run();
+ }
}
}