You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@labs.apache.org by pp...@apache.org on 2007/12/25 03:20:28 UTC
svn commit: r606752 - in /labs/fluid/slice/src:
main/java/org/apache/openjpa/slice/ main/java/org/apache/openjpa/slice/jdbc/
main/resources/org/apache/openjpa/slice/jdbc/
test/java/org/apache/openjpa/slice/ test/resources/META-INF/
Author: ppoddar
Date: Mon Dec 24 18:20:24 2007
New Revision: 606752
URL: http://svn.apache.org/viewvc?rev=606752&view=rev
Log:
per-slice Configuration is now supported
Modified:
labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java
labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributionPolicy.java
labs/fluid/slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java
labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java
labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties
labs/fluid/slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
labs/fluid/slice/src/test/resources/META-INF/persistence.xml
Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java Mon Dec 24 18:20:24 2007
@@ -18,23 +18,39 @@
*/
package org.apache.openjpa.slice;
+import java.util.Set;
import org.apache.openjpa.conf.OpenJPAConfiguration;
/**
- * A configuration for multiple datastores.
+ * A configuration for multiple data stores, each referred as <em>slice</em>.
+ * This configuration allows each underlying slices be configured with their
+ * own specific configuration properties such as JDBC Driver or connection
+ * user/password etc.
+ * This configuration also extends by adding a {@link DistributionPolicy
+ * DistributionPolicy} that governs how new instances be distributed
+ * among the slices.
*
* @author Pinaki Poddar
*
*/
public interface DistributedConfiguration extends OpenJPAConfiguration {
/**
- * Gets the connection URLs in the same order as specified in the
- * configuration.
+ * Gets the name of the available slices. This list is determined by the
+ * configuration properties.
+ * A configuration property <code>slice.XYZ</code> will be registered as
+ * a slice named <code>XYZ</code>.
*/
- String[] getConnectionURLs();
-
+ Set<String> getSliceNames();
+
+ /**
+ * Gets the configuration for a given slice.
+ *
+ */
+ OpenJPAConfiguration getSlice(String sliceName);
+
/**
- * Gets the policy that distributes the instances.
+ * Gets the policy that governs how new instances will be distributed across
+ * the available slices.
*/
DistributionPolicy getDistributionPolicyInstance();
}
Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributionPolicy.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributionPolicy.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributionPolicy.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributionPolicy.java Mon Dec 24 18:20:24 2007
@@ -18,24 +18,27 @@
*/
package org.apache.openjpa.slice;
+import java.util.Set;
+
/**
- * Policy to select one of the physical datastore in which a persistent instance
- * will be stored.
+ * Policy to select one of the physical datastore referred as <em>slice</em>
+ * in which a persistent instance will be stored.
*
* @author Pinaki Poddar
*
*/
public interface DistributionPolicy {
/**
- * Gets the index of the database where a given instance will be stored.
+ * Gets the name of the slice where a given instance will be stored.
*
* @param pc The newly persistent or to-be-merged object.
- * @param urls the list of Connection URLs in the same order as specified
- * in the configuration.
+ * @param slices name of the configured slices.
+ * @param a opaque context for future use.
*
- * @return an index in the given list. It is an error to return a value
- * that is out of bound of the given array.
+ * @return identifier of the slice. This name must match one of the
+ * configured slice names.
+ * @see DistributedConfiguration#getSliceNames()
*/
- int distribute(Object pc, String[] urls);
+ String distribute(Object pc, Set<String> slices, Object context);
}
Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java Mon Dec 24 18:20:24 2007
@@ -20,10 +20,12 @@
import java.security.AccessController;
import java.util.Map;
+import java.util.Properties;
import org.apache.openjpa.conf.OpenJPAProductDerivation;
import org.apache.openjpa.lib.conf.AbstractProductDerivation;
import org.apache.openjpa.lib.conf.Configuration;
+import org.apache.openjpa.lib.conf.Configurations;
import org.apache.openjpa.lib.util.J2DoPrivHelper;
import org.apache.openjpa.slice.jdbc.DistributedJDBCBrokerFactory;
import org.apache.openjpa.slice.jdbc.DistributedJDBCConfigurationImpl;
@@ -57,5 +59,4 @@
AccessController.doPrivileged(J2DoPrivHelper
.getClassLoaderAction(DistributedJDBCBrokerFactory.class));
}
-
}
Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java Mon Dec 24 18:20:24 2007
@@ -27,30 +27,31 @@
* A factory for distributed JDBC datastores.
*
* @author Pinaki Poddar
- *
+ *
*/
@SuppressWarnings("serial")
public class DistributedJDBCBrokerFactory extends JDBCBrokerFactory {
private DistributedJDBCConfiguration conf;
- /**
- * Factory method for constructing a factory from properties. Invoked from
- * {@link Bootstrap#newBrokerFactory}.
- */
- public static DistributedJDBCBrokerFactory newInstance(ConfigurationProvider cp) {
- DistributedJDBCConfigurationImpl conf =
- new DistributedJDBCConfigurationImpl();
- cp.setInto(conf);
- return new DistributedJDBCBrokerFactory(conf);
- }
+ /**
+ * Factory method for constructing a factory from properties. Invoked from
+ * {@link Bootstrap#newBrokerFactory}.
+ */
+ public static DistributedJDBCBrokerFactory newInstance(
+ ConfigurationProvider cp) {
+ DistributedJDBCConfigurationImpl conf =
+ new DistributedJDBCConfigurationImpl(cp);
+ cp.setInto(conf);
+ return new DistributedJDBCBrokerFactory(conf);
+ }
+
+ public DistributedJDBCBrokerFactory(DistributedJDBCConfiguration conf) {
+ super(conf);
+ this.conf = conf;
+ }
- public DistributedJDBCBrokerFactory(DistributedJDBCConfiguration conf) {
- super(conf);
- this.conf = conf;
- }
-
@Override
protected StoreManager newStoreManager() {
- return new DistributedStoreManager(conf);
- }
+ return new DistributedStoreManager(conf);
+ }
}
Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java Mon Dec 24 18:20:24 2007
@@ -30,5 +30,4 @@
*/
public interface DistributedJDBCConfiguration extends JDBCConfiguration,
Iterable<JDBCConfiguration>, DistributedConfiguration {
-
}
Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java Mon Dec 24 18:20:24 2007
@@ -22,10 +22,15 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
import java.util.StringTokenizer;
+import java.util.TreeMap;
import javax.sql.DataSource;
@@ -35,6 +40,7 @@
import org.apache.openjpa.jdbc.schema.DataSourceFactory;
import org.apache.openjpa.kernel.BrokerImpl;
import org.apache.openjpa.lib.conf.BooleanValue;
+import org.apache.openjpa.lib.conf.ConfigurationProvider;
import org.apache.openjpa.lib.conf.Configurations;
import org.apache.openjpa.lib.conf.PluginValue;
import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
@@ -46,157 +52,258 @@
/**
* Implements a distributed configuration of JDBCStoreManagers.
*
- * @author Pinaki Poddar
- *
+ * @author Pinaki Poddar
+ *
*/
-public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
- implements DistributedJDBCConfiguration {
-
- private final List<JDBCConfiguration> children =
- new ArrayList<JDBCConfiguration>();
- private String[] urls;
- private DecoratingDataSource dataSource;
-
+public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
+ implements DistributedJDBCConfiguration {
+
+ private final Map<String, JDBCConfiguration> _slices =
+ new TreeMap<String, JDBCConfiguration>();
+ private DecoratingDataSource virtualDataSource;
+
protected BooleanValue lenient;
protected PluginValue distributionPolicyPlugin;
- protected DistributionPolicy distributionPolicy;
-
- private Log _log;
- private static Localizer _loc =
- Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
-
- public DistributedJDBCConfigurationImpl() {
+
+ private static Localizer _loc =
+ Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
+ public static final String PREFIX_SLICE = "slice.";
+
+ /**
+ * Configure itself as well as underlying slices.
+ *
+ */
+ public DistributedJDBCConfigurationImpl(ConfigurationProvider cp) {
super(true, false);
- _log = getLog(LOG_RUNTIME);
distributionPolicyPlugin = addPlugin("DistributionPolicy", true);
distributionPolicyPlugin.setDynamic(true);
lenient = addBoolean("Lenient");
+ setSlices(cp.getProperties());
}
-
+
+ /**
+ * Iterator for underlying slice configurations.
+ */
public Iterator<JDBCConfiguration> iterator() {
- return children.iterator();
+ return _slices.values().iterator();
}
-
+
/**
- * Gets the URLs of all the underlying physical datastores.
+ * Gets the name of the configured slices.
*/
- public String[] getConnectionURLs() {
- if (urls == null) {
- urls = parseURLs();
- Map parent = toProperties(true);
- Configurations.removeProperty("DistributionPolicy", parent);
- Configurations.removeProperty("Lenient", parent);
- for (String url:urls) {
- JDBCConfigurationImpl child = new JDBCConfigurationImpl();
- child.fromProperties(parent);
- child.setConnectionURL(url);
- child.setBrokerFactory(BrokerImpl.class.getName());
- children.add(child);
- }
- }
- return urls;
+ public Set<String> getSliceNames() {
+ return _slices.keySet();
+ }
+
+ /**
+ * Get the configuration for given slice.
+ */
+ public JDBCConfiguration getSlice(String slice) {
+ JDBCConfiguration result = _slices.get(slice);
+ if (result == null)
+ throw new UserException(_loc.get("slice-not-found", slice, _slices
+ .keySet()));
+ return result;
}
-
+
public DistributionPolicy getDistributionPolicyInstance() {
- if (distributionPolicy == null) {
- Object val = distributionPolicyPlugin.instantiate(
- DistributionPolicy.class, this, true);
- distributionPolicy = (DistributionPolicy)val;
+ if (distributionPolicyPlugin.get() == null) {
+ distributionPolicyPlugin.instantiate(DistributionPolicy.class,
+ this, true);
}
- return distributionPolicy;
+ return (DistributionPolicy) distributionPolicyPlugin.get();
}
-
+
public void setDistributionPolicyInstance(String val) {
distributionPolicyPlugin.set(val);
}
-
- public Object getConnectionFactory() {
- if (dataSource == null) {
- getConnectionURLs();
- DistributedDataSource ds = createDistributedDataStore();
- dataSource = DataSourceFactory.installDBDictionary
- (getDBDictionaryInstance(), ds, this, false);
- }
- return dataSource;
- }
-
- /**
- * Create a virtual DistributedDataSource as a composite of individual
- * physical data sources as per configuration, optionally ignoring
- * any non-reachable sources.
- */
- private DistributedDataSource createDistributedDataStore() {
- List<DataSource> dataSources = new ArrayList<DataSource>();
- boolean isLenient = lenient.get();
- List<JDBCConfiguration> badConfs = new ArrayList<JDBCConfiguration>();
- for (JDBCConfiguration conf:children) {
- String url = conf.getConnectionURL();
- if (_log.isInfoEnabled())
- _log.info(_loc.get("connect", url));
- DataSource ds = DataSourceFactory.newDataSource(conf, false);
- if (verifyDataSource(isLenient, badConfs, conf, ds)) {
- dataSources.add(ds);
- }
- }
- removeBadConnection(badConfs);
- return new DistributedDataSource(dataSources);
- }
-
- private boolean verifyDataSource(boolean isLenient, List<JDBCConfiguration>
- badConfs, JDBCConfiguration conf, DataSource ds) {
+ public Object getConnectionFactory() {
+ if (virtualDataSource == null) {
+ DistributedDataSource ds = createDistributedDataStore();
+ virtualDataSource =
+ DataSourceFactory.installDBDictionary(
+ getDBDictionaryInstance(), ds, this, false);
+ }
+ return virtualDataSource;
+ }
+
+ /**
+ * Create a virtual DistributedDataSource as a composite of individual
+ * slices as per configuration, optionally ignoring slices that can not be
+ * connected.
+ */
+ private DistributedDataSource createDistributedDataStore() {
+ Log log = getLog(LOG_RUNTIME);
+ List<DataSource> dataSources = new ArrayList<DataSource>();
+ boolean isLenient = lenient.get();
+ List<String> badSlices = new ArrayList<String>();
+ for (String slice : _slices.keySet()) {
+ JDBCConfiguration conf = _slices.get(slice);
+ String url = conf.getConnectionURL();
+ if (log.isInfoEnabled())
+ log.info(_loc.get("slice-connect", slice, url));
+ try {
+ DataSource ds = DataSourceFactory.newDataSource(conf, false);
+ DecoratingDataSource dds = new DecoratingDataSource(ds);
+ ds =
+ DataSourceFactory.installDBDictionary(conf
+ .getDBDictionaryInstance(), dds, conf, false);
+ if (verifyDataSource(isLenient, badSlices, slice, ds)) {
+ dataSources.add(ds);
+ }
+ } catch (Exception ex) {
+ handleBadConnection(isLenient, badSlices, slice, ex);
+ }
+ }
+ removeBadSlices(badSlices);
+ return new DistributedDataSource(dataSources);
+ }
+
+ /**
+ * Verify that a connection can be established to the given slice. If
+ * connection can not be established then process the bad slice in
+ * {@link #handleBadConnection(boolean, List, JDBCConfiguration)} method.
+ */
+ private boolean verifyDataSource(boolean isLenient, List<String> badSlices,
+ String badSlice, DataSource ds) {
Connection con = null;
try {
con = ds.getConnection();
if (con == null) {
- handleBadConnection(isLenient, badConfs, conf);
+ handleBadConnection(isLenient, badSlices, badSlice, null);
return false;
}
return true;
} catch (SQLException ex) {
- handleBadConnection(isLenient, badConfs, conf);
+ handleBadConnection(isLenient, badSlices, badSlice, ex);
return false;
} finally {
- if (con != null)
- try {
- con.close();
- } catch (SQLException ex) {
-
- }
+ if (con != null)
+ try {
+ con.close();
+ } catch (SQLException ex) {
+ // ignore
+ }
}
- }
-
- private void removeBadConnection(List<JDBCConfiguration> badConfs) {
- if (badConfs == null || badConfs.isEmpty())
- return;
- children.removeAll(badConfs);
- List<String> badURLs = new ArrayList<String>(badConfs.size());
- for (JDBCConfiguration bad:badConfs)
- badURLs.add(bad.getConnectionURL());
- List<String> allURLs = Arrays.asList(getConnectionURLs());
- urls = new String[allURLs.size()-badURLs.size()];
- int i = 0;
- for (String url:allURLs)
- if (!badURLs.contains(url))
- urls[i++] = url;
- }
-
-
- private void handleBadConnection(boolean isLenient,
- List<JDBCConfiguration> badConfs, JDBCConfiguration conf) {
+ }
+
+ /**
+ * Remove the given set of bad slices.
+ *
+ */
+ private void removeBadSlices(List<String> badSlices) {
+ if (badSlices == null)
+ return;
+ for (String bad : badSlices)
+ _slices.remove(bad);
+ }
+
+ /**
+ * Either throw a user exception or add the configuration to the given list,
+ * based on <code>isLenient</code>.
+ */
+ private void handleBadConnection(boolean isLenient, List<String> badSlices,
+ String slice, Throwable ex) {
+ JDBCConfiguration conf = _slices.get(slice);
String url = conf.getConnectionURL();
+ Log log = getLog(LOG_RUNTIME);
if (isLenient) {
- _log.error(_loc.get("connect-error-lenient", url));
- badConfs.add(conf);
- } else
- throw new UserException(_loc.get("connect-error", url));
- }
-
- private String[] parseURLs() {
- String url = getConnectionURL();
- if (url == null || url.trim().length() == 0)
- throw new UserException("no-url");
- String regex = "\\|";
- return url.split(regex);
- }
+ if (ex != null) {
+ log.warn(_loc.get("slice-connect-known-warn", slice, url,
+ ex.getCause()));
+ } else {
+ log.warn(_loc.get("slice-connect-warn", slice, url));
+ }
+ badSlices.add(slice);
+ } else if (ex != null) {
+ throw new UserException(_loc.get("slice-connect-known-error",
+ slice, url, ex), ex.getCause());
+ } else {
+ throw new UserException(_loc.get("slice-connect-error", slice, url));
+ }
+ }
+
+ /**
+ * Create JDBCConfigurations for slices from the given properties.
+ */
+ void setSlices(Map p) {
+ Set<String> slices = findSlices(p);
+ Log log = getLog(LOG_RUNTIME);
+ if (slices.isEmpty()) {
+ throw new UserException(_loc.get("slice-none-configured"));
+ } else if (log != null && log.isInfoEnabled()) {
+ log.info(_loc.get("slice-available", slices));
+ }
+ for (String key : slices) {
+ JDBCConfiguration slice = new JDBCConfigurationImpl();
+ slice.fromProperties(replaceKeys(p, prefix(key), "openjpa."));
+ _slices.put(key, slice);
+ if (log != null && log.isInfoEnabled())
+ log.info(_loc.get("slice-configuration", key, slice
+ .toProperties(false)));
+ }
+ }
+
+ String prefix(String key) {
+ return PREFIX_SLICE + key + ".";
+ }
+
+ Set<String> findSlices(Map p) {
+ Set<String> slices = new HashSet<String>();
+ for (Object o : p.keySet()) {
+ if (o instanceof String && o.toString().startsWith(PREFIX_SLICE)) {
+ String sliceName =
+ chopTail(chopHead(o.toString(), PREFIX_SLICE), ".");
+ if (!isKnownProperty(sliceName))
+ slices.add(sliceName);
+ }
+ }
+ return slices;
+ }
+
+ boolean isKnownProperty(String property) {
+ return distributionPolicyPlugin.getProperty().equals(property)
+ || lenient.getProperty().equals(property);
+ }
+
+ static String chopHead(String s, String head) {
+ if (s.startsWith(head))
+ return s.substring(head.length());
+ return s;
+ }
+
+ static String chopTail(String s, String tail) {
+ int i = s.lastIndexOf(tail);
+ if (i == -1)
+ return s;
+ return s.substring(0, i);
+ }
+
+ /**
+ * Replace the key that has the given prefix in the given <code>old</code>
+ * map, by new key with new prefix. Also throw away entries that belong to
+ * other slices.
+ */
+ Map replaceKeys(Map old, String oldPrefix, String newPrefix) {
+ Map result = new HashMap();
+ for (Object o : old.keySet()) {
+ String key = o.toString();
+ Object value = old.get(o);
+ if (key.startsWith(oldPrefix)) {
+ String newKey = newPrefix + chopHead(key, oldPrefix);
+ result.put(newKey, value);
+ // System.err.println("Replaced [" + key + "] with " + newKey +
+ // " value=" + value);
+ } else if (!key.startsWith(PREFIX_SLICE)) { // not other slice
+ // properties
+ result.put(key, value); // keep them as it is
+ } else {
+ // System.err.println("Ignored [" + key + "] with " +
+ // oldPrefix);
+ }
+ }
+ return result;
+ }
+
}
Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java Mon Dec 24 18:20:24 2007
@@ -19,11 +19,12 @@
package org.apache.openjpa.slice.jdbc;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -57,17 +58,25 @@
import org.apache.openjpa.util.UserException;
/**
- * A Store manager for multiple physical databases.
- *
+ * A Store manager for multiple physical databases referred as <em>slice</em>.
+ * The actions are delegated to the underlying slices. The actions are executed
+ * in parallel threads whenever possible such as flushing or query.
+ * <br>
+ * The major limitation of this implementation is lack of two-phase commit
+ * protocol. The distributed commit is indirectly supported by the context
+ * i.e. the Broker which registers as participant to external transaction
+ * manager, when available, as in a Application Container environment.
+ * <br>
+ * However, if invoked in local transaction then this implementation does not
+ * act like a transaction manager to its underlying slices.
*
* @author Pinaki Poddar
*
*/
public class DistributedStoreManager extends JDBCStoreManager {
- private final JDBCStoreManager[] _children;
- private final JDBCStoreManager _master;
+ private final Map<String,JDBCStoreManager> _slices;
+ private JDBCStoreManager _master;
private final DistributedJDBCConfiguration _conf;
- private final Integer[] _storeIds;
private static final Localizer _loc =
Localizer.forPackage(DistributedStoreManager.class);
private static ExecutorService threadPool = Executors.newCachedThreadPool();
@@ -82,14 +91,12 @@
*/
public DistributedStoreManager(DistributedJDBCConfiguration conf) {
_conf = conf;
- _children = new JDBCStoreManager[conf.getConnectionURLs().length];
- _storeIds = new Integer[_children.length];
- for (int i = 0; i < _children.length; i++) {
+ _slices = new HashMap<String,JDBCStoreManager>();
+ for (String slice:conf.getSliceNames()) {
JDBCStoreManager child = new JDBCStoreManager();
- _children[i] = child;
- _storeIds[i] = i;
+ _slices.put(slice, child);
+ _master = (_master == null) ? child : _master;
}
- _master = _children[0];
}
public DistributedJDBCConfiguration getConfiguration() {
@@ -103,73 +110,66 @@
* additional connection info is used to estimate for the existing
* instances.
*/
- protected int getSliceIndex(OpenJPAStateManager sm, Object info) {
- boolean hasIndex = hasSliceIndex(sm);
+ protected String findSlice(OpenJPAStateManager sm, Object info) {
+ boolean hasIndex = hasSlice(sm);
if (hasIndex)
- return (Integer) sm.getImplData();
- if (sm.isNew()) {
- return assignSliceIndex(sm);
- } else {
- int i = estimateSliceIndex(sm, info);
- if (i < 0)
- return assignSliceIndex(sm);
- else
- return i;
- }
+ return sm.getImplData().toString();
+ String slice = estimateSlice(sm, info);
+ if (slice == null)
+ return assignSlice(sm);
+ return slice;
}
- private boolean hasSliceIndex(OpenJPAStateManager sm) {
- Object index = sm.getImplData();
- return index != null;
+ private boolean hasSlice(OpenJPAStateManager sm) {
+ return sm.getImplData() != null;
}
- private int assignSliceIndex(OpenJPAStateManager sm) {
+ private String assignSlice(OpenJPAStateManager sm) {
PersistenceCapable pc = sm.getPersistenceCapable();
- int newi = _conf.getDistributionPolicyInstance().distribute(pc,
- _conf.getConnectionURLs());
- if (newi<0 || newi>=_storeIds.length) {
- String[] urls = _conf.getConnectionURLs();
+ String slice = _conf.getDistributionPolicyInstance().distribute(pc,
+ _slices.keySet(), getContext());
+ if (!_slices.containsKey(slice)) {
throw new UserException(_loc.get("bad-policy-slice",
new Object[]{
_conf.getDistributionPolicyInstance().getClass().getName(),
- newi, sm.getPersistenceCapable(),
- urls.length-1, Arrays.toString(urls)}));
+ slice, sm.getPersistenceCapable(),
+ _slices.keySet()}));
}
- sm.setImplData(_storeIds[newi], true);
- return newi;
+ sm.setImplData(slice, true);
+ return slice;
}
/**
* The additional edata is used, if possible, to find the StoreManager
* managing the given StateManager. If the additional data is unavailable
- * then return a negative value.
+ * then return null.
*
*/
- private int estimateSliceIndex(OpenJPAStateManager sm, Object edata) {
+ private String estimateSlice(OpenJPAStateManager sm, Object edata) {
if (edata == null || !(edata instanceof ConnectionInfo))
- return -1;
+ return null;
Result result = ((ConnectionInfo) edata).result;
if (result instanceof ResultSetResult) {
JDBCStore store = ((ResultSetResult) result).getStore();
- for (int i = 0; i < _children.length; i++) {
- if (_children[i] == store) {
- sm.setImplData(_storeIds[i], true);
- return i;
+ for (String slice:_slices.keySet()) {
+ if (_slices.get(slice) == store) {
+ sm.setImplData(slice, true);
+ return slice;
}
}
}
- return -1;
+ return null;
}
/**
* Selects a child StoreManager where the given instance resides.
*/
private StoreManager selectStore(OpenJPAStateManager sm, Object edata) {
- int i = getSliceIndex(sm, edata);
- if (i >= 0 && i < _children.length)
- return _children[i];
- throw new InternalException(_loc.get("wrong-slice", i, sm));
+ String slice = findSlice(sm, edata);
+ if (_slices.containsKey(slice))
+ return _slices.get(slice);
+ throw new InternalException(_loc.get("wrong-slice", slice, sm));
}
public boolean assignField(OpenJPAStateManager sm, int field,
@@ -187,29 +187,29 @@
}
public void begin() {
- for (StoreManager child : _children)
+ for (StoreManager child : _slices.values())
child.begin();
}
public void beginOptimistic() {
- for (StoreManager child : _children)
+ for (StoreManager child : _slices.values())
child.beginOptimistic();
}
public boolean cancelAll() {
boolean ret = true;
- for (StoreManager child : _children)
+ for (StoreManager child : _slices.values())
ret = child.cancelAll() & ret;
return ret;
}
public void close() {
- for (StoreManager child : _children)
+ for (StoreManager child : _slices.values())
child.close();
}
public void commit() {
- for (StoreManager child : _children)
+ for (StoreManager child : _slices.values())
child.commit();
}
@@ -223,18 +223,18 @@
public ResultObjectProvider executeExtent(ClassMetaData meta,
boolean subclasses, FetchConfiguration fetch) {
- ResultObjectProvider[] tmp = new ResultObjectProvider[_children.length];
+ ResultObjectProvider[] tmp = new ResultObjectProvider[_slices.size()];
int i = 0;
- for (StoreManager child : _children) {
+ for (StoreManager child : _slices.values()) {
tmp[i++] = child.executeExtent(meta, subclasses, fetch);
}
return new MergedResultObjectProvider(tmp);
}
public boolean exists(OpenJPAStateManager sm, Object edata) {
- for (int i=0; i<_children.length; i++) {
- if (_children[i].exists(sm, edata)) {
- sm.setImplData(_storeIds[i], true);
+ for (String slice:_slices.keySet()) {
+ if (_slices.get(slice).exists(sm, edata)) {
+ sm.setImplData(slice, true);
return true;
}
}
@@ -248,23 +248,23 @@
@SuppressWarnings("unchecked")
public Collection flush(Collection sms) {
Collection exceptions = new ArrayList();
- List<List<OpenJPAStateManager>> subsets =
- new ArrayList<List<OpenJPAStateManager>>();
- for (int i=0; i<_children.length; i++)
- subsets.add(new ArrayList<OpenJPAStateManager>());
+ Map<String,List<OpenJPAStateManager>> subsets =
+ new HashMap<String,List<OpenJPAStateManager>>();
+ for (String slice:_slices.keySet())
+ subsets.put(slice, new ArrayList<OpenJPAStateManager>());
for (Object x : sms) {
OpenJPAStateManager sm = (OpenJPAStateManager) x;
- int i = getSliceIndex(sm, null);
- subsets.get(i).add(sm);
+ String slice = findSlice(sm, null);
+ subsets.get(slice).add(sm);
}
List<Future<Collection>> futures = new ArrayList<Future<Collection>>();
- int i = 0;
- for (JDBCStoreManager store:_children) {
- List<OpenJPAStateManager> toBeFlushed = subsets.get(i++);
+
+ for (String slice:_slices.keySet()) {
+ List<OpenJPAStateManager> toBeFlushed = subsets.get(slice);
if (toBeFlushed.isEmpty())
continue;
Flusher flusher = new Flusher();
- flusher.store = store;
+ flusher.store = _slices.get(slice);
flusher.toFlush = toBeFlushed;
futures.add(threadPool.submit(flusher));
}
@@ -307,14 +307,14 @@
public boolean initialize(OpenJPAStateManager sm, PCState state,
FetchConfiguration fetch, Object edata) {
if (edata instanceof ConnectionInfo) {
- int slice = getSliceIndex(sm, (ConnectionInfo) edata);
- if (slice >= 0)
- return _children[slice].initialize(sm, state, fetch, edata);
+ String slice = findSlice(sm, (ConnectionInfo) edata);
+ if (slice != null)
+ return _slices.get(slice).initialize(sm, state, fetch, edata);
}
// not a part of Query result load. Look into the slices till found
- for (int i = 0; i < _children.length; i++) {
- if (_children[i].initialize(sm, state, fetch, edata)) {
- sm.setImplData(_storeIds[i], true);
+ for (String slice:_slices.keySet()) {
+ if (_slices.get(slice).initialize(sm, state, fetch, edata)) {
+ sm.setImplData(slice, true);
return true;
}
}
@@ -346,30 +346,30 @@
public StoreQuery newQuery(String language) {
ExpressionParser parser = QueryLanguages.parserForLanguage(language);
DistributedStoreQuery ret = new DistributedStoreQuery(this, parser);
- for (JDBCStoreManager child : _children) {
+ for (JDBCStoreManager child : _slices.values()) {
ret.add(child.newQuery(language));
}
return ret;
}
public void releaseConnection() {
- for (StoreManager child : _children)
+ for (StoreManager child : _slices.values())
child.releaseConnection();
}
public void retainConnection() {
- for (StoreManager child : _children)
+ for (StoreManager child : _slices.values())
child.retainConnection();
}
public void rollback() {
- for (StoreManager child : _children)
+ for (StoreManager child : _slices.values())
child.rollback();
}
public void rollbackOptimistic() {
- for (StoreManager child : _children)
+ for (StoreManager child : _slices.values())
child.rollbackOptimistic();
}
@@ -379,7 +379,7 @@
public void setContext(StoreContext ctx) {
super.setContext(ctx);
Iterator<JDBCConfiguration> confs = _conf.iterator();
- for (JDBCStoreManager child : _children)
+ for (JDBCStoreManager child : _slices.values())
child.setContext(ctx, confs.next());
}
Modified: labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties (original)
+++ labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties Mon Dec 24 18:20:24 2007
@@ -1,12 +1,20 @@
+slice-not-found: No slice named "{0}" can be found. Available slices are "{1}"
+slice-none-configured: No slice has been configured. Specify slice.XYZ as \
+ property name to register a slice named XYZ.
+slice-configuration: Slice "{0}" configured with "{1}"
+slice-available: Detected slices "{0}" in configuration.
no-url: No ConnectionURL property has been specified.
wrong-url: URL "{0}" is invalid as database URL.
wrong-slice: Wrong slice "{0}" for "{1}"
-connect: Connecting to "{0}"
-connect-error-lenient: Failed to connect to "{0}". This database will be \
- ignored as Lenient property is set to true.
-connect-error: Failed to connect to "{0}"
-bad-policy-slice:Distribution policy "{0}" has returned invalid slice index \
- "{1}" for "{2}". The valid slice index is between 0 and {3} both \
- inclusive representing index of active databases "{4}". This can happen \
- when one or more of the originally configured databases are not available \
+slice-connect: Connecting to slice "{0}" at URL "{1}"
+slice-connect-warn: Failed to connect to slice "{0}". Slice "{0}" will be \
+ ignored as configuration is set as lenient.
+slice-connect-known-warn: Failed to connect to slice "{0}" due to "{2}. \
+ Slice "{0}" will be ignored as configuration is set as lenient.
+slice-connect-error: Failed to connect to slice "{0}" at URL "{1}"
+slice-connect-known-error: Failed to connect to slice "{0}" at URL "{1} due to \
+ {2}"
+bad-policy-slice:Distribution policy "{0}" has returned invalid slice \
+ "{1}" for "{2}". The valid slices are {3}. This error may happen \
+ when one or more of the originally configured slices are unavailable \
and Lenient property is set to true.
Modified: labs/fluid/slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/test/java/org/apache/openjpa/slice/TestBasic.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/test/java/org/apache/openjpa/slice/TestBasic.java (original)
+++ labs/fluid/slice/src/test/java/org/apache/openjpa/slice/TestBasic.java Mon Dec 24 18:20:24 2007
@@ -34,24 +34,13 @@
public void setUp() throws Exception {
super.setUp(PObject.class, Person.class, Address.class);
+ assertNotNull("No unit for " + persistenceUnitName, emf);
}
public void tearDown() throws Exception {
// do not invoke super -- it deletes the data
}
- /**
- * Tests that user-level configurations are set.
- */
- public void testConfig() {
- assertTrue(emf.getConfiguration() instanceof DistributedConfiguration);
- DistributedConfiguration conf = (DistributedConfiguration)emf.getConfiguration();
- assertTrue(conf.getConnectionURLs().length>1);
- String brokerFactory = conf.getBrokerFactory();
- assertTrue(brokerFactory.equals("slice") ||
- brokerFactory.equals(DistributedJDBCBrokerFactory.class.getName()));
- assertNotNull(conf.getDistributionPolicyInstance());
- }
PObject persist() {
EntityManager em = emf.createEntityManager();
@@ -117,7 +106,7 @@
em.clear();
em = emf.createEntityManager();
- List<Person> persons = em.createQuery("SELECT p FROM Person p WHERE p.name=?1").
+ List<Person> persons = em.createQuery("SELECT p FROM Person p WHERE p.name=?1 ORDER BY p.address.city").
setParameter(1, "A").getResultList();
List<Address> addresses = em.createQuery("SELECT a FROM Address a").getResultList();
for (Address pc:addresses) {
Modified: labs/fluid/slice/src/test/resources/META-INF/persistence.xml
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/test/resources/META-INF/persistence.xml?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/test/resources/META-INF/persistence.xml (original)
+++ labs/fluid/slice/src/test/resources/META-INF/persistence.xml Mon Dec 24 18:20:24 2007
@@ -1,5 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<persistence xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="1.0" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd">
+ <persistence-unit name="per-slice">
+ <properties>
+ <property name="openjpa.BrokerFactory" value="slice"/>
+ <property name="openjpa.ConnectionDriverName" value="com.mysql.jdbc.Driver"/>
+ <property name="slice.One.ConnectionURL" value="jdbc:mysql://localhost/slice1"/>
+ <property name="slice.Two.ConnectionURL" value="jdbc:mysql://localhost/slice2"/>
+ <property name="slice.Three.ConnectionURL" value="jdbc:mysql://localhost/slice3"/>
+ <property name="slice.Three.ConnectionDriverName" value="x.y.z"/>
+
+ <property name="slice.DistributionPolicy" value="org.apache.openjpa.slice.policy.UserDistributionPolicy"/>
+ <property name="slice.Lenient" value="false"/>
+ </properties>
+ </persistence-unit>
+
<persistence-unit name="slice">
<class>domain.PObject</class>
<class>domain.Person</class>
@@ -7,7 +21,8 @@
<properties>
<property name="openjpa.BrokerFactory" value="slice"/>
<property name="openjpa.ConnectionDriverName" value="com.mysql.jdbc.Driver"/>
- <property name="openjpa.ConnectionURL" value="jdbc:mysql://localhost/slice1|jdbc:mysql://localhost/slice2"/>
+ <property name="slice.One.ConnectionURL" value="jdbc:mysql://localhost/slice1"/>
+ <property name="slice.Two.ConnectionURL" value="jdbc:mysql://localhost/slice2"/>
<property name="openjpa.ConnectionUserName" value="root"/>
<property name="openjpa.ConnectionPassword" value="hello"/>
<property name="slice.DistributionPolicy" value="org.apache.openjpa.slice.policy.UserDistributionPolicy"/>
@@ -22,7 +37,7 @@
</properties>
</persistence-unit>
- <persistence-unit name="slice1">
+ <!-- persistence-unit name="slice1">
<properties>
<property name="openjpa.ConnectionDriverName" value="com.mysql.jdbc.Driver"/>
<property name="openjpa.ConnectionURL" value="jdbc:mysql://localhost/slice1"/>
@@ -40,7 +55,7 @@
<property name="openjpa.ConnectionPassword" value="hello"/>
<property name="openjpa.Connection2URL" value="jdbc:mysql://localhost/slice2"/>
</properties>
- </persistence-unit>
+ </persistence-unit -->
</persistence>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@labs.apache.org
For additional commands, e-mail: commits-help@labs.apache.org