You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by rm...@apache.org on 2016/06/07 11:10:41 UTC
[2/2] tomee git commit: TOMEE-1831 adding ErrorHandler to
FailoverRouter
TOMEE-1831 adding ErrorHandler to FailoverRouter
Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/28a95b71
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/28a95b71
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/28a95b71
Branch: refs/heads/master
Commit: 28a95b71c95bffdcdbee85f7e137948068f3c8d7
Parents: 16cc79b
Author: Romain manni-Bucau <rm...@gmail.com>
Authored: Tue Jun 7 13:10:17 2016 +0200
Committer: Romain manni-Bucau <rm...@gmail.com>
Committed: Tue Jun 7 13:10:17 2016 +0200
----------------------------------------------------------------------
.../resource/jdbc/router/FailOverRouter.java | 273 ++-
.../jdbc/FailOverRouterErrorHandlerTest.java | 2163 ++++++++++++++++++
.../resource/jdbc/FailOverRouterTest.java | 4 +-
3 files changed, 2347 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tomee/blob/28a95b71/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java
----------------------------------------------------------------------
diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java
index dea3669..0ab5b35 100644
--- a/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java
+++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/router/FailOverRouter.java
@@ -22,9 +22,9 @@ import org.apache.openejb.loader.SystemInstance;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
+import javax.annotation.PostConstruct;
import javax.naming.NamingException;
import javax.sql.DataSource;
-import javax.sql.XADataSource;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.TransactionSynchronizationRegistry;
@@ -35,11 +35,11 @@ import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
-import java.util.Locale;
+import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
public class FailOverRouter extends AbstractRouter {
@@ -47,8 +47,10 @@ public class FailOverRouter extends AbstractRouter {
public static final String DEFAULT_STRATEGY = "default";
- private final AtomicReference<DataSource> facade = new AtomicReference<DataSource>();
- private final Collection<DataSource> dataSources = new CopyOnWriteArrayList<DataSource>();
+ private ErrorHandler errorHandlerRuntime;
+ private Strategy strategyRuntime;
+ private DataSource facade;
+ private final List<DataSourceHolder> dataSources = new CopyOnWriteArrayList<>();
private String delimiter = ",";
private String strategy = DEFAULT_STRATEGY;
@@ -56,39 +58,153 @@ public class FailOverRouter extends AbstractRouter {
@Override
public DataSource getDataSource() {
- return facade.get();
+ return facade;
+ }
+
+ @PostConstruct
+ private void init() {
+ initDataSources();
+ initStrategy();
+ initFacade();
}
public void setDatasourceNames(final String datasourceNames) {
this.datasourceNames = datasourceNames;
- initDataSources();
}
public void setDelimiter(final String delimiter) {
this.delimiter = delimiter;
- initDataSources();
}
public void setStrategy(final String strategy) {
- if (strategy == null) {
- this.strategy = DEFAULT_STRATEGY;
- } else {
- this.strategy = strategy.toLowerCase(Locale.ENGLISH).trim();
+ this.strategy = strategy;
+ }
+
+ public void setStrategyInstance(final Strategy strategy) {
+ this.strategyRuntime = strategy;
+ }
+
+ public void setErrorHandlerInstance(final ErrorHandler errorHandler) {
+ errorHandlerRuntime = errorHandler;
+ }
+
+ public void setErrorHandler(final String errorHandler) {
+ try {
+ errorHandlerRuntime = ErrorHandler.class.cast(
+ Thread.currentThread().getContextClassLoader().loadClass(errorHandler.trim()).newInstance());
+ } catch (final InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private void initStrategy() {
+ switch (strategy) {
+ case "round-robin":
+ strategyRuntime = new Strategy() { // simply rotating the list each time
+ private final AtomicInteger idx = new AtomicInteger(0);
+
+ @Override
+ public Collection<DataSourceHolder> prepare(final Collection<DataSourceHolder> list) {
+ final int step;
+ if (strategy.contains("%")) {
+ step = Math.max(1, Integer.parseInt(strategy.substring(strategy.lastIndexOf("%") + 1)));
+ } else {
+ step = 1;
+ }
+
+ final List<DataSourceHolder> ds = new ArrayList<>(list);
+ int currentIdx = 0;
+ for (int i = 0; i < step; i++) {
+ currentIdx = idx.incrementAndGet();
+ }
+ Collections.rotate(ds, 1 + currentIdx % ds.size());
+ return ds;
+ }
+
+ @Override
+ public void used(final DataSourceHolder holder) {
+ // no-op
+ }
+ };
+ break;
+ case "random":
+ strategyRuntime = new Strategy() { // simply rotating the list each time
+ @Override
+ public Collection<DataSourceHolder> prepare(final Collection<DataSourceHolder> list) {
+ final List<DataSourceHolder> ds = new ArrayList<DataSourceHolder>(list);
+ Collections.shuffle(ds);
+ return ds;
+ }
+
+ @Override
+ public void used(final DataSourceHolder holder) {
+ // no-op
+ }
+ };
+ break;
+ case "reverse":
+ strategyRuntime = new Strategy() { // simply rotating the list each time
+ private final AtomicInteger idx = new AtomicInteger();
+
+ @Override
+ public Collection<DataSourceHolder> prepare(final Collection<DataSourceHolder> list) {
+ final List<DataSourceHolder> ds = new ArrayList<>(list);
+ final int times = idx.incrementAndGet() % ds.size();
+ for (int i = 0; i < times; i++) {
+ Collections.reverse(ds);
+ }
+ return ds;
+ }
+
+ @Override
+ public void used(final DataSourceHolder holder) {
+ // no-op
+ }
+ };
+ break;
+ case DEFAULT_STRATEGY:
+ default:
+ strategyRuntime = new Strategy() { // use the list and save a working item as first one
+ @Override
+ public Collection<DataSourceHolder> prepare(final Collection<DataSourceHolder> list) {
+ return list;
+ }
+
+ @Override
+ public void used(final DataSourceHolder holder) {
+ if (dataSources.get(0) == holder) { // no lock
+ return;
+ }
+ synchronized (this) {
+ if (dataSources.get(0) == holder) {
+ return;
+ }
+
+ final DataSourceHolder old = dataSources.set(0, holder); // locks
+ if (old != holder) {
+ dataSources.set(dataSources.lastIndexOf(holder), old);
+ }
+ }
+ }
+ };
+ break;
}
- initFacade();
}
private void initDataSources() {
dataSources.clear();
for (final String ds : datasourceNames.split(Pattern.quote(delimiter))) {
try {
- final Object o = getOpenEJBResource(ds.trim());
+ final String name = ds.trim();
+ final Object o = getOpenEJBResource(name);
if (DataSource.class.isInstance(o)) {
LOGGER.debug("Found datasource '" + ds + "'");
- dataSources.add(DataSource.class.cast(o));
+ dataSources.add(new DataSourceHolder(DataSource.class.cast(o), name));
+ } else {
+ throw new IllegalArgumentException(name + " (" + o + ") is not a datasource");
}
} catch (final NamingException error) {
- LOGGER.error("Can't find datasource '" + ds + "'", error);
+ throw new IllegalStateException(error);
}
}
@@ -96,27 +212,16 @@ public class FailOverRouter extends AbstractRouter {
}
private void initFacade() {
- Class<?> clazz = DataSource.class;
- int xads = 0;
- for (final DataSource ds : dataSources) {
- if (XADataSource.class.isInstance(ds)) {
- xads++;
- }
- }
- if (xads > 0 && xads == dataSources.size()) {
- clazz = XADataSource.class;
- }
-
- facade.set(DataSource.class.cast(Proxy.newProxyInstance(
- Thread.currentThread().getContextClassLoader(),
- new Class<?>[]{clazz}, new FacadeHandler(dataSources, strategy))));
+ facade = DataSource.class.cast(Proxy.newProxyInstance(
+ Thread.currentThread().getContextClassLoader(),
+ new Class<?>[]{DataSource.class}, new FacadeHandler(dataSources, strategyRuntime, errorHandlerRuntime)));
}
- public Collection<DataSource> getDataSources() {
+ public Collection<DataSourceHolder> getDataSources() {
return dataSources;
}
- public void updateDataSources(final Collection<DataSource> ds) {
+ public void updateDataSources(final Collection<DataSourceHolder> ds) {
dataSources.clear();
dataSources.addAll(ds);
initFacade();
@@ -124,15 +229,17 @@ public class FailOverRouter extends AbstractRouter {
private static class FacadeHandler implements InvocationHandler {
private static final TransactionSynchronizationRegistry SYNCHRONIZATION_REGISTRY = SystemInstance.get().getComponent(TransactionSynchronizationRegistry.class);
- private static final String DATASOURCE_KEY = "router_datasource_in_use";
- private final Collection<DataSource> delegates;
- private final String strategy;
- private final AtomicInteger currentIdx = new AtomicInteger(0); // used by some strategies
+ private final Collection<DataSourceHolder> delegates;
+ private final Strategy strategy;
+ private final TransactionManager transactionManager;
+ private final ErrorHandler handler;
- public FacadeHandler(final Collection<DataSource> dataSources, final String strategy) {
+ public FacadeHandler(final Collection<DataSourceHolder> dataSources, final Strategy strategy, final ErrorHandler handler) {
this.delegates = dataSources;
this.strategy = strategy;
+ this.handler = handler;
+ this.transactionManager = OpenEJB.getTransactionManager();
}
@Override
@@ -149,33 +256,33 @@ public class FailOverRouter extends AbstractRouter {
}
}
- final TransactionManager txMgr = OpenEJB.getTransactionManager();
- final Transaction transaction = txMgr.getTransaction();
-
+ final Transaction transaction = transactionManager.getTransaction();
if (transaction != null) {
-
- final DataSource currentDs = DataSource.class.cast(SYNCHRONIZATION_REGISTRY.getResource(DATASOURCE_KEY));
+ final DataSource currentDs = DataSource.class.cast(SYNCHRONIZATION_REGISTRY.getResource(FacadeHandler.class.getName()));
if (currentDs != null) {
return method.invoke(currentDs, args);
}
}
int ex = 0;
- final Collection<DataSource> sources = sortFollowingStrategy(strategy, delegates, currentIdx);
+ final Collection<DataSourceHolder> sources = strategy.prepare(delegates);
final int size = sources.size();
Object out = null;
- for (final DataSource ds : sources) {
+ Map<String, Throwable> failed = null;
+ DataSourceHolder used = null;
+ for (final DataSourceHolder ds : sources) {
+ used = ds;
try {
final boolean set = method.getName().startsWith("set");
- if (set) { // set on all datasources because of failover which can happen
- method.invoke(ds, args);
+ if (set) { // should set on all datasources because of failover which can happen but can also be bound to the tx
+ method.invoke(ds.dataSource, args);
} else { // getConnection methods are here
- out = method.invoke(ds, args);
+ out = method.invoke(ds.dataSource, args);
}
if (transaction != null) { // if a tx is in progress save the datasource to use for the tx
- SYNCHRONIZATION_REGISTRY.putResource(DATASOURCE_KEY, ds);
+ SYNCHRONIZATION_REGISTRY.putResource(FacadeHandler.class.getName(), ds.dataSource);
break;
}
@@ -183,64 +290,48 @@ public class FailOverRouter extends AbstractRouter {
break;
}
} catch (final InvocationTargetException ite) {
+ if (handler != null) {
+ if (failed == null) {
+ failed = new HashMap<>();
+ }
+ failed.put(ds.name, ite.getCause());
+ }
+
ex++;
if (ex == size) { // all failed so throw the exception
+ if (failed != null) {
+ handler.onError(failed, null);
+ }
throw ite.getCause();
}
}
}
+ if (failed != null) {
+ handler.onError(failed, used);
+ }
+ strategy.used(used);
return out;
}
}
- private static Collection<DataSource> sortFollowingStrategy(final String strategy, final Collection<DataSource> delegates, final AtomicInteger idx) {
- if (strategy == null) {
- return delegates;
- }
-
- if (DEFAULT_STRATEGY.equals(strategy) || strategy.isEmpty()) {
- return delegates;
- }
-
- //
- // take care next strategies can break multiple calls on the facade
- // it is only intended to be used for connection selection
- //
-
- if ("random".equals(strategy)) {
- final List<DataSource> ds = new ArrayList<DataSource>(delegates);
- Collections.shuffle(ds);
- return ds;
- }
+ public interface ErrorHandler {
+ void onError(final Map<String, Throwable> errorByFailingDataSource, final DataSourceHolder finallyUsedOrNull);
+ }
- if ("reverse".equals(strategy)) {
- final List<DataSource> ds = new ArrayList<DataSource>(delegates);
- final int times = idx.incrementAndGet() % ds.size();
- for (int i = 0; i < times; i++) {
- Collections.reverse(ds);
- }
- return ds;
- }
+ public interface Strategy {
+ Collection<DataSourceHolder> prepare(final Collection<DataSourceHolder> list);
- if (strategy.startsWith("round-robin")) {
- final int step;
- if (strategy.contains("%")) {
- step = Math.max(1, Integer.parseInt(strategy.substring(strategy.lastIndexOf("%") + 1)));
- } else {
- step = 1;
- }
+ void used(DataSourceHolder holder);
+ }
- final List<DataSource> ds = new ArrayList<DataSource>(delegates);
+ public static final class DataSourceHolder {
+ private final DataSource dataSource;
+ private final String name;
- int currentIdx = 0;
- for (int i = 0; i < step; i++) {
- currentIdx = idx.incrementAndGet();
- }
- Collections.rotate(ds, 1 + currentIdx % ds.size());
- return ds;
+ public DataSourceHolder(final DataSource dataSource, final String name) {
+ this.dataSource = dataSource;
+ this.name = name;
}
-
- return delegates;
}
}