You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@labs.apache.org by pp...@apache.org on 2008/01/02 06:51:42 UTC
svn commit: r608015 - in /labs/fluid/slice/src/main:
java/org/apache/openjpa/slice/jdbc/ resources/org/apache/openjpa/slice/jdbc/
Author: ppoddar
Date: Tue Jan 1 21:51:36 2008
New Revision: 608015
URL: http://svn.apache.org/viewvc?rev=608015&view=rev
Log:
internal global transaction manager
Added:
labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/SliceStoreManager.java
labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/XID.java
Modified:
labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedDataSource.java
labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties
Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedDataSource.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedDataSource.java?rev=608015&r1=608014&r2=608015&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedDataSource.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedDataSource.java Tue Jan 1 21:51:36 2008
@@ -26,6 +26,7 @@
import java.util.List;
import javax.sql.DataSource;
+import javax.sql.XADataSource;
import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
@@ -44,6 +45,22 @@
super(dataSources.get(0));
real = dataSources;
master = dataSources.get(0);
+ }
+
+ Connection getConnection(DataSource ds) throws SQLException {
+ if (ds instanceof DecoratingDataSource)
+ return getConnection(((DecoratingDataSource)ds).getInnermostDelegate());
+ if (ds instanceof XADataSource)
+ return ((XADataSource)ds).getXAConnection().getConnection();
+ return ds.getConnection();
+ }
+
+ Connection getConnection(DataSource ds, String user, String pwd) throws SQLException {
+ if (ds instanceof DecoratingDataSource)
+ return getConnection(((DecoratingDataSource)ds).getInnermostDelegate(), user, pwd);
+ if (ds instanceof XADataSource)
+ return ((XADataSource)ds).getXAConnection(user, pwd).getConnection();
+ return ds.getConnection(user, pwd);
}
public Iterator<DataSource> iterator() {
Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java?rev=608015&r1=608014&r2=608015&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java Tue Jan 1 21:51:36 2008
@@ -21,15 +21,16 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import javax.sql.DataSource;
+import javax.sql.XADataSource;
import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
import org.apache.openjpa.jdbc.conf.JDBCConfigurationImpl;
@@ -39,6 +40,7 @@
import org.apache.openjpa.lib.conf.PluginValue;
import org.apache.openjpa.lib.conf.StringValue;
import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
+import org.apache.openjpa.lib.jdbc.DelegatingDataSource;
import org.apache.openjpa.lib.log.Log;
import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.slice.DistributedBrokerImpl;
@@ -52,261 +54,286 @@
*
*/
public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
- implements DistributedJDBCConfiguration {
-
- private final Map<String, JDBCConfiguration> _slices =
- new TreeMap<String, JDBCConfiguration>();
- private DecoratingDataSource virtualDataSource;
-
- protected BooleanValue lenient;
- protected StringValue master;
- protected PluginValue distributionPolicyPlugin;
-
- private static Localizer _loc =
- Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
- public static final String PREFIX_SLICE = "slice.";
-
- /**
- * Configure itself as well as underlying slices.
- *
- */
- public DistributedJDBCConfigurationImpl(ConfigurationProvider cp) {
- super(true, false);
- brokerPlugin.setString(DistributedBrokerImpl.class.getName());
- distributionPolicyPlugin = addPlugin("DistributionPolicy", true);
- distributionPolicyPlugin.setDynamic(true);
- lenient = addBoolean("Lenient");
- master = addString("Master");
- setSlices(cp.getProperties());
- }
-
- /**
- * Iterator for underlying slice configurations.
- */
- public Iterator<JDBCConfiguration> iterator() {
- return _slices.values().iterator();
- }
-
- /**
- * Gets the name of the configured slices.
- */
- public Set<String> getSliceNames() {
- return _slices.keySet();
- }
-
- /**
- * Get the configuration for given slice.
- */
- public JDBCConfiguration getSlice(String slice) {
- JDBCConfiguration result = _slices.get(slice);
- if (result == null)
- throw new UserException(_loc.get("slice-not-found", slice, _slices
- .keySet()));
- return result;
- }
-
- public DistributionPolicy getDistributionPolicyInstance() {
- if (distributionPolicyPlugin.get() == null) {
- distributionPolicyPlugin.instantiate(DistributionPolicy.class,
- this, true);
- }
- return (DistributionPolicy) distributionPolicyPlugin.get();
- }
-
- public void setDistributionPolicyInstance(String val) {
- distributionPolicyPlugin.set(val);
- }
-
- public Object getConnectionFactory() {
- if (virtualDataSource == null) {
- DistributedDataSource ds = createDistributedDataStore();
- virtualDataSource =
- DataSourceFactory.installDBDictionary(
- getDBDictionaryInstance(), ds, this, false);
- }
- return virtualDataSource;
- }
-
- /**
- * Create a virtual DistributedDataSource as a composite of individual
- * slices as per configuration, optionally ignoring slices that can not be
- * connected.
- */
- private DistributedDataSource createDistributedDataStore() {
- Log log = getLog(LOG_RUNTIME);
- List<DataSource> dataSources = new ArrayList<DataSource>();
- boolean isLenient = lenient.get();
- List<String> badSlices = new ArrayList<String>();
- for (String slice : _slices.keySet()) {
- JDBCConfiguration conf = _slices.get(slice);
- String url = conf.getConnectionURL();
- if (log.isInfoEnabled())
- log.info(_loc.get("slice-connect", slice, url));
- try {
- DataSource ds = DataSourceFactory.newDataSource(conf, false);
- DecoratingDataSource dds = new DecoratingDataSource(ds);
- ds =
- DataSourceFactory.installDBDictionary(conf
- .getDBDictionaryInstance(), dds, conf, false);
- if (verifyDataSource(isLenient, badSlices, slice, ds)) {
- dataSources.add(ds);
- }
- } catch (Exception ex) {
- handleBadConnection(isLenient, badSlices, slice, ex);
- }
- }
- removeBadSlices(badSlices);
- return new DistributedDataSource(dataSources);
- }
-
- /**
- * Verify that a connection can be established to the given slice. If
- * connection can not be established then process the bad slice in
- * {@link #handleBadConnection(boolean, List, JDBCConfiguration)} method.
- */
- private boolean verifyDataSource(boolean isLenient, List<String> badSlices,
- String badSlice, DataSource ds) {
- Connection con = null;
- try {
- con = ds.getConnection();
- if (con == null) {
- handleBadConnection(isLenient, badSlices, badSlice, null);
- return false;
- }
- return true;
- } catch (SQLException ex) {
- handleBadConnection(isLenient, badSlices, badSlice, ex);
- return false;
- } finally {
- if (con != null)
- try {
- con.close();
- } catch (SQLException ex) {
- // ignore
- }
- }
- }
-
- /**
- * Remove the given set of bad slices.
- *
- */
- private void removeBadSlices(List<String> badSlices) {
- if (badSlices == null)
- return;
- for (String bad : badSlices)
- _slices.remove(bad);
- }
-
- /**
- * Either throw a user exception or add the configuration to the given list,
- * based on <code>isLenient</code>.
- */
- private void handleBadConnection(boolean isLenient, List<String> badSlices,
- String slice, Throwable ex) {
- JDBCConfiguration conf = _slices.get(slice);
- String url = conf.getConnectionURL();
- Log log = getLog(LOG_RUNTIME);
- if (isLenient) {
- if (ex != null) {
- log.warn(_loc.get("slice-connect-known-warn", slice, url,
- ex.getCause()));
- } else {
- log.warn(_loc.get("slice-connect-warn", slice, url));
- }
- badSlices.add(slice);
- } else if (ex != null) {
- throw new UserException(_loc.get("slice-connect-known-error",
- slice, url, ex), ex.getCause());
- } else {
- throw new UserException(_loc.get("slice-connect-error", slice, url));
- }
- }
-
- /**
- * Create JDBCConfigurations for slices from the given properties.
- */
- void setSlices(Map p) {
- Set<String> slices = findSlices(p);
- Log log = getLog(LOG_RUNTIME);
- if (slices.isEmpty()) {
- throw new UserException(_loc.get("slice-none-configured"));
- } else if (log != null && log.isInfoEnabled()) {
- log.info(_loc.get("slice-available", slices));
- }
- for (String key : slices) {
- JDBCConfiguration slice = new JDBCConfigurationImpl();
- slice.fromProperties(replaceKeys(p, prefix(key), "openjpa."));
- if (slice.getConnectionURL() == null)
- throw new UserException(_loc.get("slice-no-url", key));
- _slices.put(key, slice);
- if (log != null && log.isInfoEnabled())
- log.info(_loc.get("slice-configuration", key, slice
- .toProperties(false)));
- }
- }
-
- String prefix(String key) {
- return PREFIX_SLICE + key + ".";
- }
-
- Set<String> findSlices(Map p) {
- Set<String> slices = new TreeSet<String>();
- for (Object o : p.keySet()) {
- if (o instanceof String && o.toString().startsWith(PREFIX_SLICE)) {
- String sliceName =
- chopTail(chopHead(o.toString(), PREFIX_SLICE), ".");
- if (!isKnownProperty(sliceName))
- slices.add(sliceName);
- }
- }
- return slices;
- }
-
- boolean isKnownProperty(String property) {
- return distributionPolicyPlugin.getProperty().equals(property)
- || lenient.getProperty().equals(property)
- || master.getProperty().equals(property);
- }
-
-
- static String chopHead(String s, String head) {
- if (s.startsWith(head))
- return s.substring(head.length());
- return s;
- }
-
- static String chopTail(String s, String tail) {
- int i = s.lastIndexOf(tail);
- if (i == -1)
- return s;
- return s.substring(0, i);
- }
-
- /**
- * Replace the key that has the given prefix in the given <code>old</code>
- * map, by new key with new prefix. Also throw away entries that belong to
- * other slices.
- */
- Map replaceKeys(Map old, String oldPrefix, String newPrefix) {
- Map result = new HashMap();
- for (Object o : old.keySet()) {
- String key = o.toString();
- Object value = old.get(o);
- if (key.startsWith(oldPrefix)) {
- String newKey = newPrefix + chopHead(key, oldPrefix);
- result.put(newKey, value);
- // System.err.println("Replaced [" + key + "] with " + newKey +
- // " value=" + value);
- } else if (!key.startsWith(PREFIX_SLICE)) { // not other slice
- // properties
- result.put(key, value); // keep them as it is
- } else {
- // System.err.println("Ignored [" + key + "] with " +
- // oldPrefix);
- }
- }
- return result;
- }
+ implements DistributedJDBCConfiguration {
+ private final Map<String, JDBCConfiguration> _slices =
+ new TreeMap<String, JDBCConfiguration>();
+ private DecoratingDataSource virtualDataSource;
+
+ protected BooleanValue lenient;
+ protected StringValue master;
+ protected PluginValue distributionPolicyPlugin;
+
+ private static Localizer _loc =
+ Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
+ public static final String PREFIX_SLICE = "slice.";
+ public static final String PREFIX_OPENJPA = "openjpa.";
+
+ /**
+ * Configure itself as well as underlying slices.
+ *
+ */
+ public DistributedJDBCConfigurationImpl(ConfigurationProvider cp) {
+ super(true, false);
+ brokerPlugin.setString(DistributedBrokerImpl.class.getName());
+ distributionPolicyPlugin = addPlugin("DistributionPolicy", true);
+ distributionPolicyPlugin.setDynamic(true);
+ lenient = addBoolean("Lenient");
+ master = addString("Master");
+ setSlices(cp.getProperties());
+ }
+
+ /**
+ * Iterator for underlying slice configurations.
+ */
+ public Iterator<JDBCConfiguration> iterator() {
+ return _slices.values().iterator();
+ }
+
+ /**
+ * Gets the name of the configured slices.
+ */
+ public Set<String> getSliceNames() {
+ return _slices.keySet();
+ }
+
+ /**
+ * Get the configuration for given slice.
+ */
+ public JDBCConfiguration getSlice(String slice) {
+ JDBCConfiguration result = _slices.get(slice);
+ if (result == null)
+ throw new UserException(_loc.get("slice-not-found", slice, _slices
+ .keySet()));
+ return result;
+ }
+
+ public DistributionPolicy getDistributionPolicyInstance() {
+ if (distributionPolicyPlugin.get() == null) {
+ distributionPolicyPlugin.instantiate(DistributionPolicy.class,
+ this, true);
+ }
+ return (DistributionPolicy) distributionPolicyPlugin.get();
+ }
+
+ public void setDistributionPolicyInstance(String val) {
+ distributionPolicyPlugin.set(val);
+ }
+
+ public Object getConnectionFactory() {
+ if (virtualDataSource == null) {
+ DistributedDataSource ds = createDistributedDataStore();
+ virtualDataSource =
+ DataSourceFactory.installDBDictionary(
+ getDBDictionaryInstance(), ds, this, false);
+ }
+ return virtualDataSource;
+ }
+
+ /**
+ * Create a virtual DistributedDataSource as a composite of individual
+ * slices as per configuration, optionally ignoring slices that can not be
+ * connected.
+ */
+ private DistributedDataSource createDistributedDataStore() {
+ List<DataSource> dataSources = new ArrayList<DataSource>();
+ boolean isLenient = lenient.get();
+ boolean isXA = true;
+ List<String> badSlices = new ArrayList<String>();
+ for (String slice : _slices.keySet()) {
+ JDBCConfiguration conf = _slices.get(slice);
+ Log log = conf.getConfigurationLog();
+ String url = getConnectionInfo(conf);
+ if (log.isInfoEnabled())
+ log.info(_loc.get("slice-connect", slice, url));
+ try {
+ DataSource ds = DataSourceFactory.newDataSource(conf, false);
+ DecoratingDataSource dds = new DecoratingDataSource(ds);
+ ds =
+ DataSourceFactory.installDBDictionary(conf
+ .getDBDictionaryInstance(), dds, conf, false);
+ if (verifyDataSource(isLenient, badSlices, slice, ds)) {
+ dataSources.add(ds);
+ isXA &= isXACompliant(ds);
+ }
+ } catch (Exception ex) {
+ handleBadConnection(isLenient, badSlices, slice, ex);
+ }
+ }
+ removeBadSlices(badSlices);
+ DistributedDataSource result = new DistributedDataSource(dataSources);
+ Log log = getConfigurationLog();
+ if (isXA)
+ log.info(_loc.get("slice-xa-enabled", getSliceNames()));
+ else
+ log.warn(_loc.get("slice-xa-disabled", getSliceNames()));
+ return result;
+ }
+
+ String getConnectionInfo(JDBCConfiguration conf) {
+ String result = conf.getConnectionURL();
+ if (result == null) {
+ result = conf.getConnectionDriverName();
+ String props = conf.getConnectionProperties();
+ if (props != null)
+ result += "(" + props + ")";
+ }
+ return result;
+ }
+
+ boolean isXACompliant(DataSource ds) {
+ if (ds instanceof DelegatingDataSource)
+ return ((DelegatingDataSource) ds).getInnermostDelegate() instanceof XADataSource;
+ return ds instanceof XADataSource;
+ }
+
+ /**
+ * Verify that a connection can be established to the given slice. If
+ * connection can not be established then process the bad slice in
+ * {@link #handleBadConnection(boolean, List, JDBCConfiguration)} method.
+ */
+ private boolean verifyDataSource(boolean isLenient, List<String> badSlices,
+ String badSlice, DataSource ds) {
+ Connection con = null;
+ try {
+ con = ds.getConnection();
+ if (con == null) {
+ handleBadConnection(isLenient, badSlices, badSlice, null);
+ return false;
+ }
+ return true;
+ } catch (SQLException ex) {
+ handleBadConnection(isLenient, badSlices, badSlice, ex);
+ return false;
+ } finally {
+ if (con != null)
+ try {
+ con.close();
+ } catch (SQLException ex) {
+ // ignore
+ }
+ }
+ }
+
+ /**
+ * Remove the given set of bad slices.
+ *
+ */
+ private void removeBadSlices(List<String> badSlices) {
+ if (badSlices == null)
+ return;
+ for (String bad : badSlices)
+ _slices.remove(bad);
+ }
+
+ /**
+ * Either throw a user exception or add the configuration to the given list,
+ * based on <code>isLenient</code>.
+ */
+ private void handleBadConnection(boolean isLenient, List<String> badSlices,
+ String slice, Throwable ex) {
+ JDBCConfiguration conf = _slices.get(slice);
+ String url = conf.getConnectionURL();
+ Log log = getLog(LOG_RUNTIME);
+ if (isLenient) {
+ if (ex != null) {
+ log.warn(_loc.get("slice-connect-known-warn", slice, url, ex
+ .getCause()));
+ } else {
+ log.warn(_loc.get("slice-connect-warn", slice, url));
+ }
+ badSlices.add(slice);
+ } else if (ex != null) {
+ throw new UserException(_loc.get("slice-connect-known-error",
+ slice, url, ex), ex.getCause());
+ } else {
+ throw new UserException(_loc.get("slice-connect-error", slice, url));
+ }
+ }
+
+ /**
+ * Create JDBCConfigurations for slices from the given properties.
+ */
+ void setSlices(Map original) {
+ Set<String> slices = findSlices(original);
+ Log log = getConfigurationLog();
+ if (slices.isEmpty()) {
+ throw new UserException(_loc.get("slice-none-configured"));
+ } else if (log != null && log.isInfoEnabled()) {
+ log.info(_loc.get("slice-available", slices));
+ }
+ for (String key : slices) {
+ JDBCConfiguration slice = new JDBCConfigurationImpl();
+ slice.fromProperties(createSliceProperties(original, key));
+ slice.setId(PREFIX_SLICE + key);
+ _slices.put(key, slice);
+ if (log != null && log.isInfoEnabled())
+ log.info(_loc.get("slice-configuration", key, slice
+ .toProperties(false)));
+ }
+ }
+
+ Set<String> findSlices(Map p) {
+ Set<String> slices = new TreeSet<String>();
+ for (Object o : p.keySet()) {
+ String key = o.toString();
+ if (key.startsWith(PREFIX_SLICE) && key.split("\\.").length > 2) {
+ String sliceName =
+ chopTail(chopHead(o.toString(), PREFIX_SLICE), ".");
+ slices.add(sliceName);
+ }
+ }
+ return slices;
+ }
+
+ static String chopHead(String s, String head) {
+ if (s.startsWith(head))
+ return s.substring(head.length());
+ return s;
+ }
+
+ static String chopTail(String s, String tail) {
+ int i = s.lastIndexOf(tail);
+ if (i == -1)
+ return s;
+ return s.substring(0, i);
+ }
+
+ /**
+ * Creates given <code>slice</code> specific configuration properties from
+ * given <code>original</code> key-value map. The rules are
+ * <LI> if key begins with <code>"slice.XXX."</code> where
+ * <code>XXX</code> is the given slice name, then replace
+ * <code>"slice.XXX.</code> with <code>openjpa.</code>.
+ * <LI>if key begins with <code>"slice."</code> but not with
+ * <code>"slice.XXX."</code>, the ignore i.e. any property of other
+ * slices or global slice property e.g.
+ * <code>slice.DistributionPolicy</code>
+ * <code>if key starts with <code>"openjpa."</code> and a corresponding
+ * <code>"slice.XXX."</code> property does not exist, then use this as
+ * default property
+ * <code>property with any other prefix is simply copied
+ *
+ */
+ Map createSliceProperties(Map original, String slice) {
+ Map result = new Properties();
+ String prefix = PREFIX_SLICE + slice + ".";
+ for (Object o : original.keySet()) {
+ String key = o.toString();
+ if (key.startsWith(prefix)) {
+ String newKey = PREFIX_OPENJPA + key.substring(prefix.length());
+ result.put(newKey, original.get(o));
+ } else if (key.startsWith(PREFIX_SLICE)) {
+ // ignore keys that are in 'slice.' namespace but not this slice
+ } else if (key.startsWith(PREFIX_OPENJPA)) {
+ String newKey = prefix + key.substring(PREFIX_OPENJPA.length());
+ if (!original.containsKey(newKey))
+ result.put(key, original.get(o));
+ } else { // keys that are neither "openjpa." nor "slice."
+ // namespace
+ result.put(key, original.get(o));
+ }
+ }
+ return result;
+ }
}
Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=608015&r1=608014&r2=608015&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java Tue Jan 1 21:51:36 2008
@@ -18,19 +18,27 @@
*/
package org.apache.openjpa.slice.jdbc;
+import java.sql.Connection;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import javax.sql.XAConnection;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+
+import org.apache.openjpa.conf.OpenJPAConfiguration;
import org.apache.openjpa.enhance.PersistenceCapable;
import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
import org.apache.openjpa.jdbc.kernel.ConnectionInfo;
@@ -47,6 +55,7 @@
import org.apache.openjpa.kernel.StoreManager;
import org.apache.openjpa.kernel.StoreQuery;
import org.apache.openjpa.kernel.exps.ExpressionParser;
+import org.apache.openjpa.lib.log.Log;
import org.apache.openjpa.lib.rop.MergedResultObjectProvider;
import org.apache.openjpa.lib.rop.ResultObjectProvider;
import org.apache.openjpa.lib.util.Localizer;
@@ -59,26 +68,22 @@
/**
* A Store manager for multiple physical databases referred as <em>slice</em>.
- * The actions are delegated to the underlying slices. The actions are executed
- * in parallel threads whenever possible such as flushing or query.
- * <br>
- * The major limitation of this implementation is lack of two-phase commit
- * protocol. The distributed commit is indirectly supported by the context
- * i.e. the Broker which registers as participant to external transaction
- * manager, when available, as in a Application Container environment.
- * <br>
- * However, if invoked in local transaction then this implementation does not
- * act like a transaction manager to its underlying slices.
+ * This receiver behaves like a Transaction Manager as it implements two-phase
+ * commit protocol if all the component slices is XA-complaint. The actions are
+ * delegated to the underlying slices. The actions are executed in parallel
+ * threads whenever possible such as flushing or query. <br>
*
* @author Pinaki Poddar
*
*/
class DistributedStoreManager extends JDBCStoreManager {
- private final Map<String,JDBCStoreManager> _slices;
+ private final List<SliceStoreManager> _slices;
private JDBCStoreManager _master;
+ private boolean isXA;
private final DistributedJDBCConfiguration _conf;
- private static final Localizer _loc =
- Localizer.forPackage(DistributedStoreManager.class);
+ private final ThreadLocal<XID> xids = new ThreadLocal<XID>();
+ private static final Localizer _loc =
+ Localizer.forPackage(DistributedStoreManager.class);
private static ExecutorService threadPool = Executors.newCachedThreadPool();
/**
@@ -91,11 +96,11 @@
*/
public DistributedStoreManager(DistributedJDBCConfiguration conf) {
_conf = conf;
- _slices = new HashMap<String,JDBCStoreManager>();
- for (String slice:conf.getSliceNames()) {
- JDBCStoreManager child = new JDBCStoreManager();
- _slices.put(slice, child);
- _master = (_master == null) ? child : _master;
+ _slices = new ArrayList<SliceStoreManager>();
+ for (String name : conf.getSliceNames()) {
+ SliceStoreManager slice = new SliceStoreManager(name);
+ _slices.add(slice);
+ _master = (_master == null) ? slice : _master;
}
}
@@ -110,12 +115,12 @@
* additional connection info is used to estimate for the existing
* instances.
*/
- protected String findSlice(OpenJPAStateManager sm, Object info) {
+ protected String findSliceName(OpenJPAStateManager sm, Object info) {
boolean hasIndex = hasSlice(sm);
if (hasIndex)
return sm.getImplData().toString();
String slice = estimateSlice(sm, info);
- if (slice == null)
+ if (slice == null)
return assignSlice(sm);
return slice;
}
@@ -126,14 +131,13 @@
private String assignSlice(OpenJPAStateManager sm) {
PersistenceCapable pc = sm.getPersistenceCapable();
- String slice = _conf.getDistributionPolicyInstance().distribute(pc,
- _slices.keySet(), getContext());
- if (!_slices.containsKey(slice)) {
- throw new UserException(_loc.get("bad-policy-slice",
- new Object[]{
- _conf.getDistributionPolicyInstance().getClass().getName(),
- slice, sm.getPersistenceCapable(),
- _slices.keySet()}));
+ String slice =
+ _conf.getDistributionPolicyInstance().distribute(pc,
+ _conf.getSliceNames(), getContext());
+ if (!_conf.getSliceNames().contains(slice)) {
+ throw new UserException(_loc.get("bad-policy-slice", new Object[] {
+ _conf.getDistributionPolicyInstance().getClass().getName(),
+ slice, sm.getPersistenceCapable(), _conf.getSliceNames() }));
}
sm.setImplData(slice, true);
return slice;
@@ -152,10 +156,10 @@
Result result = ((ConnectionInfo) edata).result;
if (result instanceof ResultSetResult) {
JDBCStore store = ((ResultSetResult) result).getStore();
- for (String slice:_slices.keySet()) {
- if (_slices.get(slice) == store) {
- sm.setImplData(slice, true);
- return slice;
+ for (SliceStoreManager slice : _slices) {
+ if (slice == store) {
+ sm.setImplData(slice.getName(), true);
+ return slice.getName();
}
}
}
@@ -166,10 +170,11 @@
* Selects a child StoreManager where the given instance resides.
*/
private StoreManager selectStore(OpenJPAStateManager sm, Object edata) {
- String slice = findSlice(sm, edata);
- if (_slices.containsKey(slice))
- return _slices.get(slice);
- throw new InternalException(_loc.get("wrong-slice", slice, sm));
+ String name = findSliceName(sm, edata);
+ SliceStoreManager slice = lookup(name);
+ if (slice == null)
+ throw new InternalException(_loc.get("wrong-slice", name, sm));
+ return slice;
}
public boolean assignField(OpenJPAStateManager sm, int field,
@@ -187,30 +192,130 @@
}
public void begin() {
- for (StoreManager child : _slices.values())
+ if (isXA) {
+ XID xid = getXID(false);
+ Set<XAResource> resources = new HashSet<XAResource>();
+ for (SliceStoreManager slice : _slices) {
+ try {
+ Log log = getLog(slice);
+ XAConnection xcon = slice.getXAConnection();
+ Connection con = slice.getConnection();
+ XAResource rm = xcon.getXAResource();
+ XAResource existing = isSame(rm, resources);
+ XID branch = xid.branch(slice.getName());
+ log.info(_loc.get("two-phase",
+ new Object[] { "start", toString(con),
+ toString(xcon), toString(rm), branch }));
+ if (existing == null) {
+ resources.add(rm);
+ rm.start(branch, XAResource.TMNOFLAGS);
+ } else {
+ rm.start(branch, XAResource.TMJOIN);
+ }
+ } catch (Exception e) {
+ throw new StoreException(e);
+ }
+ }
+ return;
+ }
+ for (SliceStoreManager child : _slices)
child.begin();
}
+ Log getLog(SliceStoreManager slice) {
+ return slice.getConfiguration()
+ .getLog(OpenJPAConfiguration.LOG_RUNTIME);
+ }
+
+ XAResource isSame(XAResource rm, Set<XAResource> others) {
+ for (XAResource other : others)
+ try {
+ if (other.isSameRM(rm))
+ return other;
+ } catch (XAException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
public void beginOptimistic() {
- for (StoreManager child : _slices.values())
- child.beginOptimistic();
+ for (SliceStoreManager slice : _slices)
+ slice.beginOptimistic();
}
public boolean cancelAll() {
boolean ret = true;
- for (StoreManager child : _slices.values())
- ret = child.cancelAll() & ret;
+ for (SliceStoreManager slice : _slices)
+ ret = slice.cancelAll() & ret;
return ret;
}
public void close() {
- for (StoreManager child : _slices.values())
- child.close();
+ for (SliceStoreManager slice : _slices)
+ slice.close();
}
public void commit() {
- for (StoreManager child : _slices.values())
- child.commit();
+ if (isXA) {
+ XID xid = getXID(true);
+ Set<XAResource> resources = new HashSet<XAResource>();
+ try {
+ for (SliceStoreManager slice : _slices) {
+ XAConnection xcon = slice.getXAConnection();
+ XAResource rm = xcon.getXAResource();
+ Connection con = slice.getConnection();
+ Log log = getLog(slice);
+ try {
+ XID branch = xid.branch(slice.getName());
+ XAResource exists = isSame(rm, resources);
+ log.info(_loc.get("two-phase", new Object[] { "end",
+ toString(con), toString(xcon), toString(rm),
+ branch }));
+ if (exists == null) {
+ resources.add(rm);
+ rm.end(branch, XAResource.TMSUCCESS);
+ } else {
+
+ }
+ rm.prepare(branch);
+ } catch (XAException ex) {
+ ex.printStackTrace();
+ throw new StoreException(ex);
+ }
+ }
+ for (SliceStoreManager slice : _slices) {
+ XAResource rm = slice.getXAConnection().getXAResource();
+ XID branch = xid.branch(slice.getName());
+ try {
+ rm.commit(branch, false);
+ } catch (XAException ex) {
+ throw new StoreException(ex);
+ }
+ }
+
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ } finally {
+ return;
+ }
+ }
+
+ for (SliceStoreManager slice : _slices)
+ slice.commit();
+ }
+
+ XID getXID(boolean mustExist) {
+ XID xid = xids.get();
+ if (xid == null) {
+ if (mustExist)
+ throw new RuntimeException("No XID with "
+ + Thread.currentThread());
+ byte[] global =
+ Long.toHexString(System.currentTimeMillis()).getBytes();
+ xid = new XID(0, global, new byte[] { 0x1 });
+ xids.set(xid);
+ }
+ return xid;
}
public int compareVersion(OpenJPAStateManager sm, Object v1, Object v2) {
@@ -225,16 +330,16 @@
boolean subclasses, FetchConfiguration fetch) {
ResultObjectProvider[] tmp = new ResultObjectProvider[_slices.size()];
int i = 0;
- for (StoreManager child : _slices.values()) {
- tmp[i++] = child.executeExtent(meta, subclasses, fetch);
+ for (SliceStoreManager slice : _slices) {
+ tmp[i++] = slice.executeExtent(meta, subclasses, fetch);
}
return new MergedResultObjectProvider(tmp);
}
public boolean exists(OpenJPAStateManager sm, Object edata) {
- for (String slice:_slices.keySet()) {
- if (_slices.get(slice).exists(sm, edata)) {
- sm.setImplData(slice, true);
+ for (SliceStoreManager slice : _slices) {
+ if (slice.exists(sm, edata)) {
+ sm.setImplData(slice.getName(), true);
return true;
}
}
@@ -248,27 +353,15 @@
@SuppressWarnings("unchecked")
public Collection flush(Collection sms) {
Collection exceptions = new ArrayList();
- Map<String,List<OpenJPAStateManager>> subsets =
- new HashMap<String,List<OpenJPAStateManager>>();
- for (String slice:_slices.keySet())
- subsets.put(slice, new ArrayList<OpenJPAStateManager>());
- for (Object x : sms) {
- OpenJPAStateManager sm = (OpenJPAStateManager) x;
- String slice = findSlice(sm, null);
- subsets.get(slice).add(sm);
- }
- List<Future<Collection>> futures = new ArrayList<Future<Collection>>();
-
- for (String slice:_slices.keySet()) {
- List<OpenJPAStateManager> toBeFlushed = subsets.get(slice);
- if (toBeFlushed.isEmpty())
+ List<Future<Collection>> futures = new ArrayList<Future<Collection>>();
+ Map<String, List<OpenJPAStateManager>> subsets = bin(sms, null);
+ for (SliceStoreManager slice : _slices) {
+ List<OpenJPAStateManager> subset = subsets.get(slice.getName());
+ if (subset.isEmpty())
continue;
- Flusher flusher = new Flusher();
- flusher.store = _slices.get(slice);
- flusher.toFlush = toBeFlushed;
- futures.add(threadPool.submit(flusher));
+ futures.add(threadPool.submit(new Flusher(slice, subset)));
}
- for (Future<Collection> future:futures) {
+ for (Future<Collection> future : futures) {
Collection error;
try {
error = future.get();
@@ -284,6 +377,20 @@
return exceptions;
}
+ private Map<String, List<OpenJPAStateManager>> bin(Collection sms,
+ Object edata) {
+ Map<String, List<OpenJPAStateManager>> subsets =
+ new HashMap<String, List<OpenJPAStateManager>>();
+ for (SliceStoreManager slice : _slices)
+ subsets.put(slice.getName(), new ArrayList<OpenJPAStateManager>());
+ for (Object x : sms) {
+ OpenJPAStateManager sm = (OpenJPAStateManager) x;
+ String slice = findSliceName(sm, edata);
+ subsets.get(slice).add(sm);
+ }
+ return subsets;
+ }
+
public Object getClientConnection() {
throw new UnsupportedOperationException();
}
@@ -307,13 +414,13 @@
public boolean initialize(OpenJPAStateManager sm, PCState state,
FetchConfiguration fetch, Object edata) {
if (edata instanceof ConnectionInfo) {
- String slice = findSlice(sm, (ConnectionInfo) edata);
+ String slice = findSliceName(sm, (ConnectionInfo) edata);
if (slice != null)
- return _slices.get(slice).initialize(sm, state, fetch, edata);
+ return lookup(slice).initialize(sm, state, fetch, edata);
}
// not a part of Query result load. Look into the slices till found
- for (String slice:_slices.keySet()) {
- if (_slices.get(slice).initialize(sm, state, fetch, edata)) {
+ for (SliceStoreManager slice : _slices) {
+ if (slice.initialize(sm, state, fetch, edata)) {
sm.setImplData(slice, true);
return true;
}
@@ -329,7 +436,17 @@
public Collection loadAll(Collection sms, PCState state, int load,
FetchConfiguration fetch, Object edata) {
- throw new UnsupportedOperationException();
+ Map<String, List<OpenJPAStateManager>> subsets = bin(sms, edata);
+ Collection result = new ArrayList();
+ for (SliceStoreManager slice : _slices) {
+ List<OpenJPAStateManager> subset = subsets.get(slice.getName());
+ if (subset.isEmpty())
+ continue;
+ Collection tmp = slice.loadAll(subset, state, load, fetch, edata);
+ if (tmp != null && !tmp.isEmpty())
+ result.addAll(tmp);
+ }
+ return result;
}
public Object newDataStoreId(Object oidVal, ClassMetaData meta) {
@@ -346,31 +463,31 @@
public StoreQuery newQuery(String language) {
ExpressionParser parser = QueryLanguages.parserForLanguage(language);
DistributedStoreQuery ret = new DistributedStoreQuery(this, parser);
- for (JDBCStoreManager child : _slices.values()) {
- ret.add(child.newQuery(language));
+ for (SliceStoreManager slice : _slices) {
+ ret.add(slice.newQuery(language));
}
return ret;
}
public void releaseConnection() {
- for (StoreManager child : _slices.values())
- child.releaseConnection();
+ for (SliceStoreManager slice : _slices)
+ slice.releaseConnection();
}
public void retainConnection() {
- for (StoreManager child : _slices.values())
- child.retainConnection();
+ for (SliceStoreManager slice : _slices)
+ slice.retainConnection();
}
public void rollback() {
- for (StoreManager child : _slices.values())
- child.rollback();
+ for (SliceStoreManager slice : _slices)
+ slice.rollback();
}
public void rollbackOptimistic() {
- for (StoreManager child : _slices.values())
- child.rollbackOptimistic();
+ for (SliceStoreManager slice : _slices)
+ slice.rollbackOptimistic();
}
/**
@@ -379,17 +496,38 @@
public void setContext(StoreContext ctx) {
super.setContext(ctx);
Iterator<JDBCConfiguration> confs = _conf.iterator();
- for (JDBCStoreManager child : _slices.values())
- child.setContext(ctx, confs.next());
+ isXA = true;
+ for (SliceStoreManager slice : _slices) {
+ slice.setContext(ctx, confs.next());
+ isXA &= slice.isXAEnabled();
+ }
+ }
+
+ private SliceStoreManager lookup(String name) {
+ for (SliceStoreManager slice : _slices)
+ if (slice.getName().equals(name))
+ return slice;
+ return null;
}
public boolean syncVersion(OpenJPAStateManager sm, Object edata) {
return selectStore(sm, edata).syncVersion(sm, edata);
}
-
+
+ String toString(Object o) {
+ return o.getClass().getSimpleName() + "@"
+ + Long.toHexString(System.identityHashCode(o));
+ }
+
private static class Flusher implements Callable<Collection> {
- JDBCStoreManager store;
- Collection toFlush;
+ final SliceStoreManager store;
+ final Collection toFlush;
+
+ Flusher(SliceStoreManager store, Collection toFlush) {
+ this.store = store;
+ this.toFlush = toFlush;
+ }
+
public Collection call() throws Exception {
return store.flush(toFlush);
}
Added: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/SliceStoreManager.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/SliceStoreManager.java?rev=608015&view=auto
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/SliceStoreManager.java (added)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/SliceStoreManager.java Tue Jan 1 21:51:36 2008
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openjpa.slice.jdbc;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import javax.sql.DataSource;
+import javax.sql.XAConnection;
+import javax.sql.XADataSource;
+
+import org.apache.openjpa.jdbc.kernel.JDBCStoreManager;
+import org.apache.openjpa.lib.jdbc.DelegatingDataSource;
+import org.apache.openjpa.lib.util.Localizer;
+import org.apache.openjpa.util.InternalException;
+
+/**
+ * A specialized JDBCStoreManager for XA-complaint DataSource.
+ * If the configured DataSource is not XA-complaint, behaves as the super
+ * implementation.
+ *
+ * @author Pinaki Poddar
+ *
+ */
+public class SliceStoreManager extends JDBCStoreManager {
+ private final String _name;
+ private Boolean isXAEnabled;
+ private XAConnection xcon;
+
+ private static final Localizer _loc =
+ Localizer.forPackage(SliceStoreManager.class);
+
+ /**
+ * Construct with immutable logical name of the slice.
+ */
+ public SliceStoreManager(String name) {
+ _name = name;
+ }
+
+ /**
+ * Gets the name of the slice.
+ */
+ public String getName() {
+ return _name;
+ }
+
+ /**
+ * Gets the connection via XAConnection if the datasource is XA-complaint.
+ * Otherwise, behaves exactly as the super implementation.
+ */
+ @Override
+ protected RefCountConnection connectInternal() throws SQLException {
+ if (!isXAEnabled)
+ return super.connectInternal();
+ XADataSource xds = getXADataSource();
+ xcon = xds.getXAConnection();
+ Connection con = xcon.getConnection();
+ return new RefCountConnection(con);
+ }
+
+ /**
+ * Gets the XAConnection if connected and XA-complaint. Otherwise null.
+ */
+ public XAConnection getXAConnection() {
+ return xcon;
+ }
+
+ private XADataSource getXADataSource() {
+ if (!isXAEnabled())
+ throw new InternalException(_loc.get("slice-not-xa", this));
+ return (XADataSource)getInnerDataSource();
+ }
+
+ /**
+ * Affirms if the configured DataSource is XA-complaint.
+ * Can return null if the context has not been set yet.
+ */
+ public boolean isXAEnabled() {
+ if (isXAEnabled == null) {
+ isXAEnabled = getInnerDataSource() instanceof XADataSource;
+ }
+ return isXAEnabled.booleanValue();
+ }
+
+ private DataSource getInnerDataSource() {
+ DataSource parent = super.getDataSource();
+ DataSource real = (parent instanceof DelegatingDataSource) ?
+ ((DelegatingDataSource)parent).getInnermostDelegate()
+ : parent;
+ return real;
+ }
+
+ public String toString() {
+ return _name;
+ }
+}
Added: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/XID.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/XID.java?rev=608015&view=auto
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/XID.java (added)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/XID.java Tue Jan 1 21:51:36 2008
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openjpa.slice.jdbc;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Internally used Global Transaction Identifier for two-phase distributed
+ * commit protocol.
+ *
+ * @author Pinaki Poddar
+ *
+ */
+class XID implements Xid {
+ private final int format;
+ private final byte[] global;
+ private final byte[] branch;
+
+ public XID(int format, byte[] global, byte[] branch) {
+ super();
+ this.format = format;
+ this.global = global;
+ this.branch = branch;
+ }
+
+ public byte[] getBranchQualifier() {
+ return branch;
+ }
+
+ public int getFormatId() {
+ return format;
+ }
+
+ public byte[] getGlobalTransactionId() {
+ return global;
+ }
+
+ XID branch(String branch) {
+ return new XID(format, global, branch.getBytes());
+ }
+
+ public String toString() {
+ return new String(global) + ":" + new String(branch);
+ }
+
+ public boolean equals(Object other) {
+ if (other instanceof XID) {
+ XID that = (XID) other;
+ return format == that.format && equals(global, that.global)
+ && equals(branch, that.branch);
+ }
+ return false;
+ }
+
+ boolean equals(byte[] a, byte[] b) {
+ if (a == null && b == null)
+ return true;
+ if (a == null || b == null)
+ return false;
+ return new String(a).equals(new String(b));
+ }
+
+}
Modified: labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties?rev=608015&r1=608014&r2=608015&view=diff
==============================================================================
--- labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties (original)
+++ labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties Tue Jan 1 21:51:36 2008
@@ -22,4 +22,13 @@
bad-policy-slice:Distribution policy "{0}" has returned invalid slice \
"{1}" for "{2}". The valid slices are {3}. This error may happen \
when one or more of the originally configured slices are unavailable \
- and Lenient property is set to true.
\ No newline at end of file
+ and Lenient property is set to true.
+slice-xa-enabled: All slices "{0}" is XA-complaint and hence store transaction \
+ will use a two-phase commit protocol even if the persistent unit is \
+ configured for non-JTA transaction.
+slice-xa-disabled: Not all slices "{0}" is XA-complaint and hence store \
+ transaction will not use a two-phase commit protocol. If persistent unit \
+ is configured for JTA transaction then the slices will participate in \
+ global transaction but otherwise the atomic nature of commit across all \
+ slices is not guaranteed.
+two-phase: "{3}".{0}"(xid=[{4}]] Connection={1} XAConnection={2}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@labs.apache.org
For additional commands, e-mail: commits-help@labs.apache.org