You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by jl...@apache.org on 2016/12/03 16:56:29 UTC

[35/50] tomee git commit: TOMEE-1900 better xa pooling handling

TOMEE-1900 better xa pooling handling


Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/b4bd095d
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/b4bd095d
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/b4bd095d

Branch: refs/heads/tomee-1.7.x
Commit: b4bd095d71dce43eeae756ec56f7a037c2791eff
Parents: 280808f
Author: Romain manni-Bucau <rm...@gmail.com>
Authored: Thu Aug 11 13:59:59 2016 +0200
Committer: Jonathan Gallimore <jo...@jrg.me.uk>
Committed: Thu Aug 11 13:59:17 2016 +0100

----------------------------------------------------------------------
 .../jdbc/managed/local/ManagedConnection.java   |  55 +++++----
 .../tomee/jdbc/TomcatXADataSourceTest.java      | 113 +++++++++++++++++++
 2 files changed, 145 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tomee/blob/b4bd095d/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java
index 5b03dd5..c73250a 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java
@@ -101,9 +101,12 @@ public class ManagedConnection implements InvocationHandler {
                         return null;
                     }
 
-                    closeConnection(xaConnection, delegate);
+                    closeConnection(true);
                     return null;
                 }
+                if ("isClosed".equals(mtdName) && closed) {
+                    return true;
+                }
                 if (delegate == null) {
                     newConnection();
                 }
@@ -115,27 +118,29 @@ public class ManagedConnection implements InvocationHandler {
                 if (!currentTransaction.equals(transaction)) {
                     throw new SQLException("Connection can not be used while enlisted in another transaction");
                 }
-                return invokeUnderTransaction(delegate, method, args);
+                return invokeUnderTransaction(method, args);
             }
 
             // get the already bound connection to the current transaction or enlist this one in the tx
-            if (isUnderTransaction(transaction.getStatus())) {
+            final int transactionStatus = transaction.getStatus();
+            if (isUnderTransaction(transactionStatus)) {
                 Connection connection = Connection.class.cast(registry.getResource(key));
                 if (connection == null && delegate == null) {
                     newConnection();
-                    connection = delegate;
 
-                    registry.putResource(key, delegate);
                     currentTransaction = transaction;
                     try {
-                        transaction.enlistResource(getXAResource());
+                        if (!transaction.enlistResource(getXAResource())) {
+                            throw new SQLException("Unable to enlist connection in transaction: enlistResource returns 'false'.");
+                        }
                     } catch (final RollbackException ignored) {
                         // no-op
                     } catch (final SystemException e) {
                         throw new SQLException("Unable to enlist connection the transaction", e);
                     }
 
-                    transaction.registerSynchronization(new ClosingSynchronization(xaConnection, delegate));
+                    registry.putResource(key, delegate);
+                    transaction.registerSynchronization(new ClosingSynchronization());
 
                     try {
                         setAutoCommit(false);
@@ -152,7 +157,14 @@ public class ManagedConnection implements InvocationHandler {
                     delegate = connection;
                 }
 
-                return invokeUnderTransaction(connection, method, args);
+                return invokeUnderTransaction(method, args);
+            }
+
+            if ("isClosed".equals(mtdName) && closed) {
+                return true;
+            }
+            if ("close".equals(mtdName)) { // let it be handled by the ClosingSynchronisation since we have a tx there
+                return close();
             }
 
             // we shouldn't come here, tempted to just throw an exception
@@ -171,8 +183,8 @@ public class ManagedConnection implements InvocationHandler {
                 (key.user == null ? XADataSource.class.cast(key.ds).getXAConnection() : XADataSource.class.cast(key.ds).getXAConnection(key.user, key.pwd));
         if (XAConnection.class.isInstance(connection)) {
             xaConnection = XAConnection.class.cast(connection);
-            delegate = xaConnection.getConnection();
             xaResource = xaConnection.getXAResource();
+            delegate = xaConnection.getConnection();
         } else {
             delegate = Connection.class.cast(connection);
             xaResource = new LocalXAResource(delegate);
@@ -194,7 +206,7 @@ public class ManagedConnection implements InvocationHandler {
         }
     }
 
-    private Object invokeUnderTransaction(final Connection delegate, final Method method, final Object[] args) throws Exception {
+    private Object invokeUnderTransaction(final Method method, final Object[] args) throws Exception {
         final String mtdName = method.getName();
         if ("setAutoCommit".equals(mtdName)
             || "commit".equals(mtdName)
@@ -227,15 +239,7 @@ public class ManagedConnection implements InvocationHandler {
         return new SQLException("can't call " + mtdName + " when the connection is JtaManaged");
     }
 
-    private static class ClosingSynchronization implements Synchronization {
-        private final XAConnection xaConnection;
-        private final Connection connection;
-
-        public ClosingSynchronization(final XAConnection xaConnection, final Connection delegate) {
-            this.xaConnection = xaConnection;
-            this.connection = delegate;
-        }
-
+    private class ClosingSynchronization implements Synchronization {
         @Override
         public void beforeCompletion() {
             // no-op
@@ -243,19 +247,24 @@ public class ManagedConnection implements InvocationHandler {
 
         @Override
         public void afterCompletion(final int status) {
-            closeConnection(xaConnection, connection);
+            closeConnection(true);
         }
     }
 
-    private static void closeConnection(final XAConnection xaConnection, final Connection connection) {
+    private void closeConnection(final boolean force) {
+        if (!force && closed) {
+            return;
+        }
         try {
             if (xaConnection != null) { // handles the underlying connection
                 xaConnection.close();
-            } else if (connection != null && !connection.isClosed()) {
-                connection.close();
+            } else if (delegate != null && !delegate.isClosed()) {
+                delegate.close();
             }
         } catch (final SQLException e) {
             // no-op
+        } finally {
+            close(); // set the flag
         }
     }
 

http://git-wip-us.apache.org/repos/asf/tomee/blob/b4bd095d/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java
----------------------------------------------------------------------
diff --git a/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java b/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java
index 96739d0..56a95ce 100644
--- a/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java
+++ b/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.tomee.jdbc;
 
+import org.apache.openejb.OpenEJB;
 import org.apache.openejb.jee.EjbJar;
 import org.apache.openejb.junit.ApplicationComposer;
 import org.apache.openejb.resource.jdbc.managed.local.ManagedDataSource;
+import org.apache.openejb.testing.Classes;
 import org.apache.openejb.testing.Configuration;
 import org.apache.openejb.testing.Module;
 import org.apache.openejb.testng.PropertiesBuilder;
@@ -28,24 +30,34 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import javax.annotation.Resource;
+import javax.ejb.EJB;
+import javax.ejb.Singleton;
 import javax.sql.DataSource;
+import javax.transaction.Synchronization;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @RunWith(ApplicationComposer.class)
 public class TomcatXADataSourceTest {
     @Resource(name = "xadb")
     private DataSource ds;
 
+    @EJB
+    private TxP tx;
+
     @Module
+    @Classes(TxP.class)
     public EjbJar mandatory() {
         return new EjbJar();
     }
@@ -115,5 +127,106 @@ public class TomcatXADataSourceTest {
             assertEquals(0, tds.getActive());
             assertEquals(25, tds.getIdle());
         }
+        // in tx - closing in tx
+        for (int it = 0; it < 5; it++) { // ensures it always works and not only the first time
+            for (int i = 0; i < 25; i++) {
+                tx.run(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            Connection c = null;
+                            for (int i = 0; i < 25; i++) {
+                                final Connection connection = ds.getConnection();
+                                connection.getMetaData(); // trigger connection retrieving otherwise nothing is done (pool is not used)
+                                if (c != null) {
+                                    assertEquals(c.unwrap(Connection.class), connection.unwrap(Connection.class));
+                                } else {
+                                    c = connection;
+                                }
+                            }
+                            c.close(); // ensure we handle properly eager close invocations
+                        } catch (final SQLException sql) {
+                            fail(sql.getMessage());
+                        }
+                    }
+                });
+            }
+            assertEquals(0, tds.getActive());
+            assertEquals(25, tds.getIdle());
+        }
+
+
+        // in tx - closing after tx
+        for (int it = 0; it < 5; it++) { // ensures it always works and not only the first time
+            for (int i = 0; i < 25; i++) {
+                final AtomicReference<Connection> ref = new AtomicReference<Connection>();
+                tx.run(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            Connection c = null;
+                            for (int i = 0; i < 25; i++) {
+                                final Connection connection = ds.getConnection();
+                                connection.getMetaData(); // trigger connection retrieving otherwise nothing is done (pool is not used)
+                                if (c != null) {
+                                    assertEquals(c.unwrap(Connection.class), connection.unwrap(Connection.class));
+                                } else {
+                                    c = connection;
+                                    ref.set(c);
+                                }
+                            }
+                        } catch (final SQLException sql) {
+                            fail(sql.getMessage());
+                        }
+                    }
+                });
+                assertTrue(ref.get().isClosed()); // closed with tx
+                ref.get().close();
+                assertTrue(ref.get().isClosed());
+            }
+            assertEquals(0, tds.getActive());
+            assertEquals(25, tds.getIdle());
+        }
+
+        // in tx - closing in commit
+        for (int it = 0; it < 5; it++) { // ensures it always works and not only the first time
+            for (int i = 0; i < 25; i++) {
+                tx.run(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            final Connection ref = ds.getConnection();
+                            ref.getMetaData();
+                            OpenEJB.getTransactionManager().getTransaction().registerSynchronization(new Synchronization() {
+                                @Override
+                                public void beforeCompletion() {
+                                    // no-op
+                                }
+
+                                @Override
+                                public void afterCompletion(final int status) { // JPA does it
+                                    try {
+                                        ref.close();
+                                    } catch (final SQLException e) {
+                                        fail(e.getMessage());
+                                    }
+                                }
+                            });
+                        } catch (final Exception sql) {
+                            fail(sql.getMessage());
+                        }
+                    }
+                });
+            }
+            assertEquals(0, tds.getActive());
+            assertEquals(25, tds.getIdle());
+        }
+    }
+	
+    @Singleton
+    public static class TxP {
+        public void run(final Runnable r) {
+            r.run();
+        }
     }
 }