You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by rm...@apache.org on 2016/06/07 11:10:41 UTC

[2/2] tomee git commit: TOMEE-1831 adding ErrorHandler to FailoverRouter

TOMEE-1831 adding ErrorHandler to FailoverRouter


Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/28a95b71
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/28a95b71
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/28a95b71

Branch: refs/heads/master
Commit: 28a95b71c95bffdcdbee85f7e137948068f3c8d7
Parents: 16cc79b
Author: Romain manni-Bucau <rm...@gmail.com>
Authored: Tue Jun 7 13:10:17 2016 +0200
Committer: Romain manni-Bucau <rm...@gmail.com>
Committed: Tue Jun 7 13:10:17 2016 +0200

----------------------------------------------------------------------
 .../resource/jdbc/router/FailOverRouter.java    |  273 ++-
 .../jdbc/FailOverRouterErrorHandlerTest.java    | 2163 ++++++++++++++++++
 .../resource/jdbc/FailOverRouterTest.java       |    4 +-
 3 files changed, 2347 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tomee/blob/28a95b71/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java
index dea3669..0ab5b35 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java
@@ -22,9 +22,9 @@ import org.apache.openejb.loader.SystemInstance;
 import org.apache.openejb.util.LogCategory;
 import org.apache.openejb.util.Logger;
 
+import javax.annotation.PostConstruct;
 import javax.naming.NamingException;
 import javax.sql.DataSource;
-import javax.sql.XADataSource;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import javax.transaction.TransactionSynchronizationRegistry;
@@ -35,11 +35,11 @@ import java.lang.reflect.Proxy;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Locale;
+import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 public class FailOverRouter extends AbstractRouter {
@@ -47,8 +47,10 @@ public class FailOverRouter extends AbstractRouter {
 
     public static final String DEFAULT_STRATEGY = "default";
 
-    private final AtomicReference<DataSource> facade = new AtomicReference<DataSource>();
-    private final Collection<DataSource> dataSources = new CopyOnWriteArrayList<DataSource>();
+    private ErrorHandler errorHandlerRuntime;
+    private Strategy strategyRuntime;
+    private DataSource facade;
+    private final List<DataSourceHolder> dataSources = new CopyOnWriteArrayList<>();
 
     private String delimiter = ",";
     private String strategy = DEFAULT_STRATEGY;
@@ -56,39 +58,153 @@ public class FailOverRouter extends AbstractRouter {
 
     @Override
     public DataSource getDataSource() {
-        return facade.get();
+        return facade;
+    }
+
+    @PostConstruct
+    private void init() {
+        initDataSources();
+        initStrategy();
+        initFacade();
     }
 
     public void setDatasourceNames(final String datasourceNames) {
         this.datasourceNames = datasourceNames;
-        initDataSources();
     }
 
     public void setDelimiter(final String delimiter) {
         this.delimiter = delimiter;
-        initDataSources();
     }
 
     public void setStrategy(final String strategy) {
-        if (strategy == null) {
-            this.strategy = DEFAULT_STRATEGY;
-        } else {
-            this.strategy = strategy.toLowerCase(Locale.ENGLISH).trim();
+        this.strategy = strategy;
+    }
+
+    public void setStrategyInstance(final Strategy strategy) {
+        this.strategyRuntime = strategy;
+    }
+
+    public void setErrorHandlerInstance(final ErrorHandler errorHandler) {
+        errorHandlerRuntime = errorHandler;
+    }
+
+    public void setErrorHandler(final String errorHandler) {
+        try {
+            errorHandlerRuntime = ErrorHandler.class.cast(
+                    Thread.currentThread().getContextClassLoader().loadClass(errorHandler.trim()).newInstance());
+        } catch (final InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    private void initStrategy() {
+        switch (strategy) {
+            case "round-robin":
+                strategyRuntime = new Strategy() { // simply rotating the list each time
+                    private final AtomicInteger idx = new AtomicInteger(0);
+
+                    @Override
+                    public Collection<DataSourceHolder> prepare(final Collection<DataSourceHolder> list) {
+                        final int step;
+                        if (strategy.contains("%")) {
+                            step = Math.max(1, Integer.parseInt(strategy.substring(strategy.lastIndexOf("%") + 1)));
+                        } else {
+                            step = 1;
+                        }
+
+                        final List<DataSourceHolder> ds = new ArrayList<>(list);
+                        int currentIdx = 0;
+                        for (int i = 0; i < step; i++) {
+                            currentIdx = idx.incrementAndGet();
+                        }
+                        Collections.rotate(ds, 1 + currentIdx % ds.size());
+                        return ds;
+                    }
+
+                    @Override
+                    public void used(final DataSourceHolder holder) {
+                        // no-op
+                    }
+                };
+                break;
+            case "random":
+                strategyRuntime = new Strategy() { // simply rotating the list each time
+                    @Override
+                    public Collection<DataSourceHolder> prepare(final Collection<DataSourceHolder> list) {
+                        final List<DataSourceHolder> ds = new ArrayList<DataSourceHolder>(list);
+                        Collections.shuffle(ds);
+                        return ds;
+                    }
+
+                    @Override
+                    public void used(final DataSourceHolder holder) {
+                        // no-op
+                    }
+                };
+                break;
+            case "reverse":
+                strategyRuntime = new Strategy() { // simply rotating the list each time
+                    private final AtomicInteger idx = new AtomicInteger();
+
+                    @Override
+                    public Collection<DataSourceHolder> prepare(final Collection<DataSourceHolder> list) {
+                        final List<DataSourceHolder> ds = new ArrayList<>(list);
+                        final int times = idx.incrementAndGet() % ds.size();
+                        for (int i = 0; i < times; i++) {
+                            Collections.reverse(ds);
+                        }
+                        return ds;
+                    }
+
+                    @Override
+                    public void used(final DataSourceHolder holder) {
+                        // no-op
+                    }
+                };
+                break;
+            case DEFAULT_STRATEGY:
+            default:
+                strategyRuntime = new Strategy() { // use the list and save a working item as first one
+                    @Override
+                    public Collection<DataSourceHolder> prepare(final Collection<DataSourceHolder> list) {
+                        return list;
+                    }
+
+                    @Override
+                    public void used(final DataSourceHolder holder) {
+                        if (dataSources.get(0) == holder) { // no lock
+                            return;
+                        }
+                        synchronized (this) {
+                            if (dataSources.get(0) == holder) {
+                                return;
+                            }
+
+                            final DataSourceHolder old = dataSources.set(0, holder); // locks
+                            if (old != holder) {
+                                dataSources.set(dataSources.lastIndexOf(holder), old);
+                            }
+                        }
+                    }
+                };
+                break;
         }
-        initFacade();
     }
 
     private void initDataSources() {
         dataSources.clear();
         for (final String ds : datasourceNames.split(Pattern.quote(delimiter))) {
             try {
-                final Object o = getOpenEJBResource(ds.trim());
+                final String name = ds.trim();
+                final Object o = getOpenEJBResource(name);
                 if (DataSource.class.isInstance(o)) {
                     LOGGER.debug("Found datasource '" + ds + "'");
-                    dataSources.add(DataSource.class.cast(o));
+                    dataSources.add(new DataSourceHolder(DataSource.class.cast(o), name));
+                } else {
+                    throw new IllegalArgumentException(name + " (" + o + ") is not a datasource");
                 }
             } catch (final NamingException error) {
-                LOGGER.error("Can't find datasource '" + ds + "'", error);
+                throw new IllegalStateException(error);
             }
         }
 
@@ -96,27 +212,16 @@ public class FailOverRouter extends AbstractRouter {
     }
 
     private void initFacade() {
-        Class<?> clazz = DataSource.class;
-        int xads = 0;
-        for (final DataSource ds : dataSources) {
-            if (XADataSource.class.isInstance(ds)) {
-                xads++;
-            }
-        }
-        if (xads > 0 && xads == dataSources.size()) {
-            clazz = XADataSource.class;
-        }
-
-        facade.set(DataSource.class.cast(Proxy.newProxyInstance(
-            Thread.currentThread().getContextClassLoader(),
-            new Class<?>[]{clazz}, new FacadeHandler(dataSources, strategy))));
+        facade = DataSource.class.cast(Proxy.newProxyInstance(
+                Thread.currentThread().getContextClassLoader(),
+                new Class<?>[]{DataSource.class}, new FacadeHandler(dataSources, strategyRuntime, errorHandlerRuntime)));
     }
 
-    public Collection<DataSource> getDataSources() {
+    public Collection<DataSourceHolder> getDataSources() {
         return dataSources;
     }
 
-    public void updateDataSources(final Collection<DataSource> ds) {
+    public void updateDataSources(final Collection<DataSourceHolder> ds) {
         dataSources.clear();
         dataSources.addAll(ds);
         initFacade();
@@ -124,15 +229,17 @@ public class FailOverRouter extends AbstractRouter {
 
     private static class FacadeHandler implements InvocationHandler {
         private static final TransactionSynchronizationRegistry SYNCHRONIZATION_REGISTRY = SystemInstance.get().getComponent(TransactionSynchronizationRegistry.class);
-        private static final String DATASOURCE_KEY = "router_datasource_in_use";
 
-        private final Collection<DataSource> delegates;
-        private final String strategy;
-        private final AtomicInteger currentIdx = new AtomicInteger(0); // used by some strategies
+        private final Collection<DataSourceHolder> delegates;
+        private final Strategy strategy;
+        private final TransactionManager transactionManager;
+        private final ErrorHandler handler;
 
-        public FacadeHandler(final Collection<DataSource> dataSources, final String strategy) {
+        public FacadeHandler(final Collection<DataSourceHolder> dataSources, final Strategy strategy, final ErrorHandler handler) {
             this.delegates = dataSources;
             this.strategy = strategy;
+            this.handler = handler;
+            this.transactionManager = OpenEJB.getTransactionManager();
         }
 
         @Override
@@ -149,33 +256,33 @@ public class FailOverRouter extends AbstractRouter {
                 }
             }
 
-            final TransactionManager txMgr = OpenEJB.getTransactionManager();
-            final Transaction transaction = txMgr.getTransaction();
-
+            final Transaction transaction = transactionManager.getTransaction();
             if (transaction != null) {
-
-                final DataSource currentDs = DataSource.class.cast(SYNCHRONIZATION_REGISTRY.getResource(DATASOURCE_KEY));
+                final DataSource currentDs = DataSource.class.cast(SYNCHRONIZATION_REGISTRY.getResource(FacadeHandler.class.getName()));
                 if (currentDs != null) {
                     return method.invoke(currentDs, args);
                 }
             }
 
             int ex = 0;
-            final Collection<DataSource> sources = sortFollowingStrategy(strategy, delegates, currentIdx);
+            final Collection<DataSourceHolder> sources = strategy.prepare(delegates);
             final int size = sources.size();
 
             Object out = null;
-            for (final DataSource ds : sources) {
+            Map<String, Throwable> failed = null;
+            DataSourceHolder used = null;
+            for (final DataSourceHolder ds : sources) {
+                used = ds;
                 try {
                     final boolean set = method.getName().startsWith("set");
-                    if (set) { // set on all datasources because of failover which can happen
-                        method.invoke(ds, args);
+                    if (set) { // should set on all datasources because of failover which can happen but can also be bound to the tx
+                        method.invoke(ds.dataSource, args);
                     } else { // getConnection methods are here
-                        out = method.invoke(ds, args);
+                        out = method.invoke(ds.dataSource, args);
                     }
 
                     if (transaction != null) { // if a tx is in progress save the datasource to use for the tx
-                        SYNCHRONIZATION_REGISTRY.putResource(DATASOURCE_KEY, ds);
+                        SYNCHRONIZATION_REGISTRY.putResource(FacadeHandler.class.getName(), ds.dataSource);
                         break;
                     }
 
@@ -183,64 +290,48 @@ public class FailOverRouter extends AbstractRouter {
                         break;
                     }
                 } catch (final InvocationTargetException ite) {
+                    if (handler != null) {
+                        if (failed == null) {
+                            failed = new HashMap<>();
+                        }
+                        failed.put(ds.name, ite.getCause());
+                    }
+
                     ex++;
                     if (ex == size) { // all failed so throw the exception
+                        if (failed != null) {
+                            handler.onError(failed, null);
+                        }
                         throw ite.getCause();
                     }
                 }
             }
 
+            if (failed != null) {
+                handler.onError(failed, used);
+            }
+            strategy.used(used);
             return out;
         }
     }
 
-    private static Collection<DataSource> sortFollowingStrategy(final String strategy, final Collection<DataSource> delegates, final AtomicInteger idx) {
-        if (strategy == null) {
-            return delegates;
-        }
-
-        if (DEFAULT_STRATEGY.equals(strategy) || strategy.isEmpty()) {
-            return delegates;
-        }
-
-        //
-        // take care next strategies can break multiple calls on the facade
-        // it is only intended to be used for connection selection
-        //
-
-        if ("random".equals(strategy)) {
-            final List<DataSource> ds = new ArrayList<DataSource>(delegates);
-            Collections.shuffle(ds);
-            return ds;
-        }
+    public interface ErrorHandler {
+        void onError(final Map<String, Throwable> errorByFailingDataSource, final DataSourceHolder finallyUsedOrNull);
+    }
 
-        if ("reverse".equals(strategy)) {
-            final List<DataSource> ds = new ArrayList<DataSource>(delegates);
-            final int times = idx.incrementAndGet() % ds.size();
-            for (int i = 0; i < times; i++) {
-                Collections.reverse(ds);
-            }
-            return ds;
-        }
+    public interface Strategy {
+        Collection<DataSourceHolder> prepare(final Collection<DataSourceHolder> list);
 
-        if (strategy.startsWith("round-robin")) {
-            final int step;
-            if (strategy.contains("%")) {
-                step = Math.max(1, Integer.parseInt(strategy.substring(strategy.lastIndexOf("%") + 1)));
-            } else {
-                step = 1;
-            }
+        void used(DataSourceHolder holder);
+    }
 
-            final List<DataSource> ds = new ArrayList<DataSource>(delegates);
+    public static final class DataSourceHolder {
+        private final DataSource dataSource;
+        private final String name;
 
-            int currentIdx = 0;
-            for (int i = 0; i < step; i++) {
-                currentIdx = idx.incrementAndGet();
-            }
-            Collections.rotate(ds, 1 + currentIdx % ds.size());
-            return ds;
+        public DataSourceHolder(final DataSource dataSource, final String name) {
+            this.dataSource = dataSource;
+            this.name = name;
         }
-
-        return delegates;
     }
 }