You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by rm...@apache.org on 2013/04/28 23:00:50 UTC

svn commit: r1476834 - in /tomee/tomee/trunk/container/openejb-core/src: main/java/org/apache/openejb/resource/jdbc/router/ test/java/org/apache/openejb/resource/jdbc/

Author: rmannibucau
Date: Sun Apr 28 21:00:50 2013
New Revision: 1476834

URL: http://svn.apache.org/r1476834
Log:
TOMEE-912 HA datasources + jta handling

Added:
    tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/FailOverRouters.java
    tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/JtaFailOverRouterTest.java
      - copied, changed from r1476817, tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/ReverseFailOverRouterTest.java
Modified:
    tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java
    tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/FailOverRouterTest.java
    tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/ReverseFailOverRouterTest.java

Modified: tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java?rev=1476834&r1=1476833&r2=1476834&view=diff
==============================================================================
--- tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java (original)
+++ tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java Sun Apr 28 21:00:50 2013
@@ -16,12 +16,17 @@
  */
 package org.apache.openejb.resource.jdbc.router;
 
+import org.apache.openejb.OpenEJB;
 import org.apache.openejb.util.LogCategory;
 import org.apache.openejb.util.Logger;
 
 import javax.naming.NamingException;
 import javax.sql.DataSource;
 import javax.sql.XADataSource;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -29,8 +34,10 @@ import java.lang.reflect.Proxy;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -117,6 +124,8 @@ public class FailOverRouter extends Abst
     }
 
     private static class FacadeHandler implements InvocationHandler {
+        private final ThreadLocal<Map<Transaction, DataSource>> txDs = new ThreadLocal<Map<Transaction, DataSource>>();
+
         private final Collection<DataSource> delegates;
         private final String strategy;
         private final AtomicInteger currentIdx = new AtomicInteger(0); // used by some strategies
@@ -140,24 +149,85 @@ public class FailOverRouter extends Abst
                 }
             }
 
+            final TransactionManager txMgr = OpenEJB.getTransactionManager();
+            final Transaction transaction = txMgr.getTransaction();
+
+            final Map<Transaction, DataSource> currentDs = txDs.get();
+            if (currentDs != null && currentDs.containsKey(transaction)) {
+                return method.invoke(currentDs.get(transaction), args);
+            } else {
+                txDs.remove();
+            }
+
             int ex = 0;
             final Collection<DataSource> sources = sortFollowingStrategy(strategy, delegates, currentIdx);
+            final int size = sources.size();
+
+            Object out = null;
             for (final DataSource ds : sources) {
                 try {
-                    if (method.getName().startsWith("set")) {
+                    final boolean set = method.getName().startsWith("set");
+                    if (set) { // set on all datasources because of failover which can happen
                         method.invoke(ds, args);
-                    } else { // getConnection are here
-                        return method.invoke(ds, args); // return the first one succeeding
+                    } else { // getConnection methods are here
+                        out = method.invoke(ds, args);
+                    }
+
+                    if (transaction != null) { // if a tx is in progress save the datasource to use for the tx
+                        transaction.registerSynchronization(new CleanUpSynchronization(txDs));
+
+                        final Map<Transaction, DataSource> map;
+                        if (currentDs == null) {
+                            map = new HashMap<Transaction, DataSource>();
+                            txDs.set(map);
+                        } else {
+                            map = currentDs;
+                        }
+
+                        map.put(transaction, ds); // save the ds to use for this transaction
+
+                        break;
+                    }
+
+                    if (!set) { // if no exception and not a set all is done so return out
+                        break;
                     }
                 } catch (final InvocationTargetException ite) {
                     ex++;
-                    if (ex == sources.size()) { // all failed so throw the exception
+                    if (ex == size) { // all failed so throw the exception
                         throw ite.getCause();
                     }
                 }
             }
 
-            return null;
+            return out;
+        }
+    }
+
+    private static class CleanUpSynchronization implements Synchronization {
+        private final ThreadLocal<Map<Transaction, DataSource>> tl;
+
+        private CleanUpSynchronization(final ThreadLocal<Map<Transaction, DataSource>> tl) {
+            this.tl = tl;
+        }
+
+        @Override
+        public void beforeCompletion() {
+            // no-op
+        }
+
+        @Override
+        public void afterCompletion(final int status) {
+            try {
+                final Transaction transaction = OpenEJB.getTransactionManager().getTransaction();
+                final Map<Transaction, DataSource> map = tl.get();
+                map.remove(transaction);
+                if (map.isEmpty()) {
+                    tl.remove();
+                }
+            } catch (final SystemException e) {
+                // no-op
+            }
         }
     }
 

Modified: tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/FailOverRouterTest.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/FailOverRouterTest.java?rev=1476834&r1=1476833&r2=1476834&view=diff
==============================================================================
--- tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/FailOverRouterTest.java (original)
+++ tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/FailOverRouterTest.java Sun Apr 28 21:00:50 2013
@@ -29,13 +29,13 @@ import org.junit.runner.RunWith;
 
 import javax.annotation.Resource;
 import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Properties;
 
+import static org.apache.openejb.resource.jdbc.FailOverRouters.datasource;
+import static org.apache.openejb.resource.jdbc.FailOverRouters.url;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
@@ -88,21 +88,6 @@ public class FailOverRouterTest {
                 .build();
     }
 
-    private static String url(final Connection c) throws SQLException {
-        final DatabaseMetaData dmd = c.getMetaData();
-        try {
-            return dmd.getURL();
-        } finally {
-            c.close();
-        }
-    }
-
-    private PropertiesBuilder datasource(final PropertiesBuilder propertiesBuilder, final String name) {
-        return propertiesBuilder
-                .property(name, "new://Resource?type=DataSource")
-                .property(name + ".JdbcUrl", "jdbc:hsqldb:mem:" + name);
-    }
-
     @Module
     public IAnnotationFinder finder() { // needed to run the test
         return new AnnotationFinder(new ClassesArchive());

Added: tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/FailOverRouters.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/FailOverRouters.java?rev=1476834&view=auto
==============================================================================
--- tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/FailOverRouters.java (added)
+++ tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/FailOverRouters.java Sun Apr 28 21:00:50 2013
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.jdbc;
+
+import org.apache.openejb.testng.PropertiesBuilder;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+
+public final class FailOverRouters {
+    public static String url(final Connection c) throws SQLException {
+        final DatabaseMetaData dmd = c.getMetaData();
+        try {
+            return dmd.getURL();
+        } finally {
+            c.close();
+        }
+    }
+
+    public static PropertiesBuilder datasource(final PropertiesBuilder propertiesBuilder, final String name) {
+        return propertiesBuilder
+                .property(name, "new://Resource?type=DataSource")
+                .property(name + ".JdbcUrl", "jdbc:hsqldb:mem:" + name);
+    }
+
+    private FailOverRouters() {
+        // no-op
+    }
+}

Copied: tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/JtaFailOverRouterTest.java (from r1476817, tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/ReverseFailOverRouterTest.java)
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/JtaFailOverRouterTest.java?p2=tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/JtaFailOverRouterTest.java&p1=tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/ReverseFailOverRouterTest.java&r1=1476817&r2=1476834&rev=1476834&view=diff
==============================================================================
--- tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/ReverseFailOverRouterTest.java (original)
+++ tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/JtaFailOverRouterTest.java Sun Apr 28 21:00:50 2013
@@ -21,32 +21,32 @@ import org.apache.openejb.resource.jdbc.
 import org.apache.openejb.testing.Configuration;
 import org.apache.openejb.testing.Module;
 import org.apache.openejb.testng.PropertiesBuilder;
-import org.apache.xbean.finder.AnnotationFinder;
-import org.apache.xbean.finder.IAnnotationFinder;
-import org.apache.xbean.finder.archive.ClassesArchive;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import javax.annotation.Resource;
+import javax.ejb.EJB;
+import javax.ejb.Singleton;
 import javax.sql.DataSource;
 import java.sql.Connection;
-import java.sql.DatabaseMetaData;
 import java.sql.SQLException;
 import java.util.Properties;
 
+import static org.apache.openejb.resource.jdbc.FailOverRouters.datasource;
+import static org.apache.openejb.resource.jdbc.FailOverRouters.url;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 @RunWith(ApplicationComposer.class)
-public class ReverseFailOverRouterTest {
-    @Resource(name = "routedDs")
-    private DataSource failover;
+public class JtaFailOverRouterTest {
+    @EJB
+    private JtaWrapper wrapper;
 
     @Test
     public void test() throws SQLException {
         int i = 2;
         for (int it = 0; it < 6; it++) {
-            assertEquals("Iteration #" + i, "jdbc:hsqldb:mem:fo" + i, url(failover.getConnection()));
+            wrapper.inTx("jdbc:hsqldb:mem:fo" + i);
             i = 1 + (i % 2);
         }
     }
@@ -54,34 +54,47 @@ public class ReverseFailOverRouterTest {
     @Configuration
     public Properties configuration() {
         return datasource(datasource(new PropertiesBuilder(), "fo1"), "fo2")
-
+                .property("fo1.JtaManaged", "true")
+                .property("fo2.JtaManaged", "true")
                 .property("router", "new://Resource?class-name=" + FailOverRouter.class.getName())
                 .property("router.datasourceNames", "fo1,fo2")
                 .property("router.strategy", "reverse")
-
                 .property("routedDs", "new://Resource?provider=RoutedDataSource&type=DataSource")
                 .property("routedDs.router", "router")
-
                 .build();
     }
 
-    private static String url(final Connection c) throws SQLException {
-        final DatabaseMetaData dmd = c.getMetaData();
-        try {
-            return dmd.getURL();
-        } finally {
-            c.close();
-        }
-    }
-
-    private PropertiesBuilder datasource(final PropertiesBuilder propertiesBuilder, final String name) {
-        return propertiesBuilder
-                .property(name, "new://Resource?type=DataSource")
-                .property(name + ".JdbcUrl", "jdbc:hsqldb:mem:" + name);
+    @Module
+    public Class<?>[] classes() {
+        return new Class<?>[] { JtaWrapper.class };
     }
 
-    @Module
-    public IAnnotationFinder finder() { // needed to run the test
-        return new AnnotationFinder(new ClassesArchive());
+    @Singleton
+    public static class JtaWrapper {
+        @Resource(name = "routedDs")
+        private DataSource ds;
+
+        public void inTx(final String url) {
+            Connection firstConnection = null;
+            try {
+                firstConnection  = ds.getConnection();
+                assertEquals(url, url(firstConnection));
+                for (int i = 0; i < 1; i++) { // 4 is kind of random, > 2 is enough
+                    final Connection anotherConnection = ds.getConnection();
+                    assertEquals(firstConnection, anotherConnection); // in tx so should be the same ds and if the ds is JtaManaged same anotherConnection
+                    assertEquals(url, url(anotherConnection));
+                }
+            } catch (final SQLException e) {
+                fail(e.getMessage());
+            } finally {
+                try {
+                    if (firstConnection != null) {
+                        firstConnection.close();
+                    }
+                } catch (final Exception e) {
+                    // no-op
+                }
+            }
+        }
     }
 }

Modified: tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/ReverseFailOverRouterTest.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/ReverseFailOverRouterTest.java?rev=1476834&r1=1476833&r2=1476834&view=diff
==============================================================================
--- tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/ReverseFailOverRouterTest.java (original)
+++ tomee/tomee/trunk/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/ReverseFailOverRouterTest.java Sun Apr 28 21:00:50 2013
@@ -29,13 +29,12 @@ import org.junit.runner.RunWith;
 
 import javax.annotation.Resource;
 import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
 import java.sql.SQLException;
 import java.util.Properties;
 
+import static org.apache.openejb.resource.jdbc.FailOverRouters.datasource;
+import static org.apache.openejb.resource.jdbc.FailOverRouters.url;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 
 @RunWith(ApplicationComposer.class)
 public class ReverseFailOverRouterTest {
@@ -65,21 +64,6 @@ public class ReverseFailOverRouterTest {
                 .build();
     }
 
-    private static String url(final Connection c) throws SQLException {
-        final DatabaseMetaData dmd = c.getMetaData();
-        try {
-            return dmd.getURL();
-        } finally {
-            c.close();
-        }
-    }
-
-    private PropertiesBuilder datasource(final PropertiesBuilder propertiesBuilder, final String name) {
-        return propertiesBuilder
-                .property(name, "new://Resource?type=DataSource")
-                .property(name + ".JdbcUrl", "jdbc:hsqldb:mem:" + name);
-    }
-
     @Module
     public IAnnotationFinder finder() { // needed to run the test
         return new AnnotationFinder(new ClassesArchive());