You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@labs.apache.org by pp...@apache.org on 2007/12/25 03:20:28 UTC

svn commit: r606752 - in /labs/fluid/slice/src: main/java/org/apache/openjpa/slice/ main/java/org/apache/openjpa/slice/jdbc/ main/resources/org/apache/openjpa/slice/jdbc/ test/java/org/apache/openjpa/slice/ test/resources/META-INF/

Author: ppoddar
Date: Mon Dec 24 18:20:24 2007
New Revision: 606752

URL: http://svn.apache.org/viewvc?rev=606752&view=rev
Log:
per-slice Configuration is now supported

Modified:
    labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java
    labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributionPolicy.java
    labs/fluid/slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java
    labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java
    labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
    labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
    labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
    labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties
    labs/fluid/slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
    labs/fluid/slice/src/test/resources/META-INF/persistence.xml

Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java Mon Dec 24 18:20:24 2007
@@ -18,23 +18,39 @@
  */
 package org.apache.openjpa.slice;
 
+import java.util.Set;
 import org.apache.openjpa.conf.OpenJPAConfiguration;
 
 /**
- * A configuration for multiple datastores.
+ * A configuration for multiple data stores, each referred as <em>slice</em>.
+ * This configuration allows each underlying slices be configured with their
+ * own specific configuration properties such as JDBC Driver or connection
+ * user/password etc. 
+ * This configuration also extends by adding a {@link DistributionPolicy 
+ * DistributionPolicy} that governs how new instances be distributed
+ * among the slices.
  * 
  * @author Pinaki Poddar 
  *
  */
 public interface DistributedConfiguration extends OpenJPAConfiguration {
 	/**
-	 * Gets the connection URLs in the same order as specified in the 
-	 * configuration.
+	 * Gets the name of the available slices. This list is determined by the
+	 * configuration properties.
+	 * A configuration property <code>slice.XYZ</code> will be registered as
+	 * a slice named <code>XYZ</code>. 
 	 */
-	String[] getConnectionURLs();
-
+	Set<String> getSliceNames();
+	
+	/**
+	 * Gets the configuration for a given slice.
+	 * 
+	 */
+	OpenJPAConfiguration getSlice(String sliceName);
+	
 	/**
-	 * Gets the policy that distributes the instances.
+	 * Gets the policy that governs how new instances will be distributed across
+	 * the available slices.
 	 */
 	DistributionPolicy getDistributionPolicyInstance();
 }

Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributionPolicy.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributionPolicy.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributionPolicy.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/DistributionPolicy.java Mon Dec 24 18:20:24 2007
@@ -18,24 +18,27 @@
  */
 package org.apache.openjpa.slice;
 
+import java.util.Set;
+
 
 /**
- * Policy to select one of the physical datastore in which a persistent instance 
- * will be stored.
+ * Policy to select one of the physical datastore referred as <em>slice</em>
+ * in which a persistent instance will be stored.
  *  
  * @author Pinaki Poddar 
  *
  */
 public interface DistributionPolicy {
 	/**
-	 * Gets the index of the database where a given instance will be stored.
+	 * Gets the name of the slice where a given instance will be stored.
 	 *  
 	 * @param pc The newly persistent or to-be-merged object. 
-	 * @param urls the list of Connection URLs in the same order as specified 
-	 * in the configuration.
+	 * @param slices name of the configured slices.
+	 * @param a opaque context for future use.
 	 * 
-	 * @return an index in the given list. It is an error to return a value 
-	 * that is out of bound of the given array.
+	 * @return identifier of the slice. This name must match one of the
+	 * configured slice names. 
+	 * @see DistributedConfiguration#getSliceNames()
 	 */
-	int distribute(Object pc, String[] urls);
+	String distribute(Object pc, Set<String> slices, Object context);
 }

Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java Mon Dec 24 18:20:24 2007
@@ -20,10 +20,12 @@
 
 import java.security.AccessController;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.openjpa.conf.OpenJPAProductDerivation;
 import org.apache.openjpa.lib.conf.AbstractProductDerivation;
 import org.apache.openjpa.lib.conf.Configuration;
+import org.apache.openjpa.lib.conf.Configurations;
 import org.apache.openjpa.lib.util.J2DoPrivHelper;
 import org.apache.openjpa.slice.jdbc.DistributedJDBCBrokerFactory;
 import org.apache.openjpa.slice.jdbc.DistributedJDBCConfigurationImpl;
@@ -57,5 +59,4 @@
 		AccessController.doPrivileged(J2DoPrivHelper
 				.getClassLoaderAction(DistributedJDBCBrokerFactory.class));
 	}
-
 }

Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java Mon Dec 24 18:20:24 2007
@@ -27,30 +27,31 @@
  * A factory for distributed JDBC datastores.
  * 
  * @author Pinaki Poddar
- *
+ * 
  */
 @SuppressWarnings("serial")
 public class DistributedJDBCBrokerFactory extends JDBCBrokerFactory {
 	private DistributedJDBCConfiguration conf;
 
-    /**
-     * Factory method for constructing a factory from properties. Invoked from
-     * {@link Bootstrap#newBrokerFactory}.
-     */
-    public static DistributedJDBCBrokerFactory newInstance(ConfigurationProvider cp) {
-    	DistributedJDBCConfigurationImpl conf = 
-    		new DistributedJDBCConfigurationImpl();
-    	cp.setInto(conf);
-        return new DistributedJDBCBrokerFactory(conf);
-    }
+	/**
+	 * Factory method for constructing a factory from properties. Invoked from
+	 * {@link Bootstrap#newBrokerFactory}.
+	 */
+	public static DistributedJDBCBrokerFactory newInstance(
+			ConfigurationProvider cp) {
+		DistributedJDBCConfigurationImpl conf =
+				new DistributedJDBCConfigurationImpl(cp);
+		cp.setInto(conf);
+		return new DistributedJDBCBrokerFactory(conf);
+	}
+
+	public DistributedJDBCBrokerFactory(DistributedJDBCConfiguration conf) {
+		super(conf);
+		this.conf = conf;
+	}
 
-    public DistributedJDBCBrokerFactory(DistributedJDBCConfiguration conf) {
-        super(conf);
-        this.conf = conf;
-   }
-    
 	@Override
 	protected StoreManager newStoreManager() {
-        return new DistributedStoreManager(conf);
-    }
+		return new DistributedStoreManager(conf);
+	}
 }

Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java Mon Dec 24 18:20:24 2007
@@ -30,5 +30,4 @@
  */
 public interface DistributedJDBCConfiguration extends JDBCConfiguration, 
 	Iterable<JDBCConfiguration>, DistributedConfiguration {
-
 }

Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java Mon Dec 24 18:20:24 2007
@@ -22,10 +22,15 @@
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.TreeMap;
 
 import javax.sql.DataSource;
 
@@ -35,6 +40,7 @@
 import org.apache.openjpa.jdbc.schema.DataSourceFactory;
 import org.apache.openjpa.kernel.BrokerImpl;
 import org.apache.openjpa.lib.conf.BooleanValue;
+import org.apache.openjpa.lib.conf.ConfigurationProvider;
 import org.apache.openjpa.lib.conf.Configurations;
 import org.apache.openjpa.lib.conf.PluginValue;
 import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
@@ -46,157 +52,258 @@
 /**
  * Implements a distributed configuration of JDBCStoreManagers.
  * 
- * @author Pinaki Poddar 
- *
+ * @author Pinaki Poddar
+ * 
  */
-public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl 
-	implements DistributedJDBCConfiguration {
-	
-	private final List<JDBCConfiguration> children = 
-		new ArrayList<JDBCConfiguration>();
-	private String[] urls;
-	private DecoratingDataSource dataSource;
-	
+public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
+		implements DistributedJDBCConfiguration {
+
+	private final Map<String, JDBCConfiguration> _slices =
+			new TreeMap<String, JDBCConfiguration>();
+	private DecoratingDataSource virtualDataSource;
+
 	protected BooleanValue lenient;
 	protected PluginValue distributionPolicyPlugin;
-	protected DistributionPolicy distributionPolicy;
-	
-	private Log _log;
-	private static Localizer _loc = 
-		Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
-	
-	public DistributedJDBCConfigurationImpl() {
+
+	private static Localizer _loc =
+			Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
+	public static final String PREFIX_SLICE = "slice.";
+
+	/**
+	 * Configure itself as well as underlying slices.
+	 * 
+	 */
+	public DistributedJDBCConfigurationImpl(ConfigurationProvider cp) {
 		super(true, false);
-		_log = getLog(LOG_RUNTIME);
 		distributionPolicyPlugin = addPlugin("DistributionPolicy", true);
 		distributionPolicyPlugin.setDynamic(true);
 		lenient = addBoolean("Lenient");
+		setSlices(cp.getProperties());
 	}
-	
+
+	/**
+	 * Iterator for underlying slice configurations.
+	 */
 	public Iterator<JDBCConfiguration> iterator() {
-		return children.iterator();
+		return _slices.values().iterator();
 	}
-	
+
 	/**
-	 * Gets the URLs of all the underlying physical datastores.
+	 * Gets the name of the configured slices.
 	 */
-	public String[] getConnectionURLs() {
-		if (urls == null) {
-		    urls = parseURLs();
-		    Map parent = toProperties(true);
-		    Configurations.removeProperty("DistributionPolicy", parent);
-		    Configurations.removeProperty("Lenient", parent);
-		    for (String url:urls) {
-		    	JDBCConfigurationImpl child = new JDBCConfigurationImpl();
-		    	child.fromProperties(parent);
-		    	child.setConnectionURL(url);
-		    	child.setBrokerFactory(BrokerImpl.class.getName());
-		    	children.add(child);
-		    }
-		}
-		return urls;
+	public Set<String> getSliceNames() {
+		return _slices.keySet();
+	}
+
+	/**
+	 * Get the configuration for given slice.
+	 */
+	public JDBCConfiguration getSlice(String slice) {
+		JDBCConfiguration result = _slices.get(slice);
+		if (result == null)
+			throw new UserException(_loc.get("slice-not-found", slice, _slices
+					.keySet()));
+		return result;
 	}
-	
+
 	public DistributionPolicy getDistributionPolicyInstance() {
-		if (distributionPolicy == null) {
-			Object val = distributionPolicyPlugin.instantiate(
-					DistributionPolicy.class, this, true);
-			distributionPolicy = (DistributionPolicy)val;
+		if (distributionPolicyPlugin.get() == null) {
+			distributionPolicyPlugin.instantiate(DistributionPolicy.class,
+					this, true);
 		}
-		return distributionPolicy;
+		return (DistributionPolicy) distributionPolicyPlugin.get();
 	}
-	
+
 	public void setDistributionPolicyInstance(String val) {
 		distributionPolicyPlugin.set(val);
 	}
 
-	
-    public Object getConnectionFactory() {
-        if (dataSource == null) {
-        	getConnectionURLs();
-            DistributedDataSource ds = createDistributedDataStore();
-            dataSource = DataSourceFactory.installDBDictionary
-                (getDBDictionaryInstance(), ds, this, false);
-        }
-        return dataSource;
-    }
-    
-    /**
-     * Create a virtual DistributedDataSource as a composite of individual 
-     * physical data sources as per configuration, optionally ignoring 
-     * any non-reachable sources.
-     */
-    private DistributedDataSource createDistributedDataStore() {
-    	List<DataSource> dataSources = new ArrayList<DataSource>();
-    	boolean isLenient = lenient.get();
-    	List<JDBCConfiguration> badConfs = new ArrayList<JDBCConfiguration>();
-    	for (JDBCConfiguration conf:children) {
-    		String url = conf.getConnectionURL();
-    		if (_log.isInfoEnabled())
-    			_log.info(_loc.get("connect", url));
-    		DataSource ds = DataSourceFactory.newDataSource(conf, false);
-    		if (verifyDataSource(isLenient, badConfs, conf, ds)) {
-    			dataSources.add(ds);
-    		}
-    	}
-    	removeBadConnection(badConfs);
-    	return new DistributedDataSource(dataSources);
-    }
-    
-    private boolean verifyDataSource(boolean isLenient, List<JDBCConfiguration>
-    	badConfs, JDBCConfiguration conf, DataSource ds) {
+	public Object getConnectionFactory() {
+		if (virtualDataSource == null) {
+			DistributedDataSource ds = createDistributedDataStore();
+			virtualDataSource =
+					DataSourceFactory.installDBDictionary(
+							getDBDictionaryInstance(), ds, this, false);
+		}
+		return virtualDataSource;
+	}
+
+	/**
+	 * Create a virtual DistributedDataSource as a composite of individual
+	 * slices as per configuration, optionally ignoring slices that can not be
+	 * connected.
+	 */
+	private DistributedDataSource createDistributedDataStore() {
+		Log log = getLog(LOG_RUNTIME);
+		List<DataSource> dataSources = new ArrayList<DataSource>();
+		boolean isLenient = lenient.get();
+		List<String> badSlices = new ArrayList<String>();
+		for (String slice : _slices.keySet()) {
+			JDBCConfiguration conf = _slices.get(slice);
+			String url = conf.getConnectionURL();
+			if (log.isInfoEnabled())
+				log.info(_loc.get("slice-connect", slice, url));
+			try {
+				DataSource ds = DataSourceFactory.newDataSource(conf, false);
+				DecoratingDataSource dds = new DecoratingDataSource(ds);
+				ds =
+						DataSourceFactory.installDBDictionary(conf
+								.getDBDictionaryInstance(), dds, conf, false);
+				if (verifyDataSource(isLenient, badSlices, slice, ds)) {
+					dataSources.add(ds);
+				}
+			} catch (Exception ex) {
+				handleBadConnection(isLenient, badSlices, slice, ex);
+			}
+		}
+		removeBadSlices(badSlices);
+		return new DistributedDataSource(dataSources);
+	}
+
+	/**
+	 * Verify that a connection can be established to the given slice. If
+	 * connection can not be established then process the bad slice in
+	 * {@link #handleBadConnection(boolean, List, JDBCConfiguration)} method.
+	 */
+	private boolean verifyDataSource(boolean isLenient, List<String> badSlices,
+			String badSlice, DataSource ds) {
 		Connection con = null;
 		try {
 			con = ds.getConnection();
 			if (con == null) {
-				handleBadConnection(isLenient, badConfs, conf);
+				handleBadConnection(isLenient, badSlices, badSlice, null);
 				return false;
 			}
 			return true;
 		} catch (SQLException ex) {
-			handleBadConnection(isLenient, badConfs, conf);
+			handleBadConnection(isLenient, badSlices, badSlice, ex);
 			return false;
 		} finally {
-			if (con != null) 
-			try {
-				con.close();
-			} catch (SQLException ex) {
-				
-			}
+			if (con != null)
+				try {
+					con.close();
+				} catch (SQLException ex) {
+					// ignore
+				}
 		}
-    }
-    
-    private void removeBadConnection(List<JDBCConfiguration> badConfs) {
-    	if (badConfs == null || badConfs.isEmpty())
-    		return;
-    	children.removeAll(badConfs);
-    	List<String> badURLs = new ArrayList<String>(badConfs.size());
-    	for (JDBCConfiguration bad:badConfs)
-    		badURLs.add(bad.getConnectionURL());
-    	List<String> allURLs =  Arrays.asList(getConnectionURLs());
-    	urls = new String[allURLs.size()-badURLs.size()];
-    	int i = 0;
-    	for (String url:allURLs)
-    		if (!badURLs.contains(url))
-    			urls[i++] = url;
-    }
-    
-    
-    private void handleBadConnection(boolean isLenient, 
-    		List<JDBCConfiguration> badConfs, JDBCConfiguration conf) {
+	}
+
+	/**
+	 * Remove the given set of bad slices.
+	 * 
+	 */
+	private void removeBadSlices(List<String> badSlices) {
+		if (badSlices == null)
+			return;
+		for (String bad : badSlices)
+			_slices.remove(bad);
+	}
+
+	/**
+	 * Either throw a user exception or add the configuration to the given list,
+	 * based on <code>isLenient</code>.
+	 */
+	private void handleBadConnection(boolean isLenient, List<String> badSlices,
+			String slice, Throwable ex) {
+		JDBCConfiguration conf = _slices.get(slice);
 		String url = conf.getConnectionURL();
+		Log log = getLog(LOG_RUNTIME);
 		if (isLenient) {
-			_log.error(_loc.get("connect-error-lenient", url));
-			badConfs.add(conf);
-		} else
-			throw new UserException(_loc.get("connect-error", url));
-    }
-
-    private String[] parseURLs() {
-    	String url = getConnectionURL();
-    	if (url == null || url.trim().length() == 0)
-    		throw new UserException("no-url");
-    	String regex = "\\|";
-    	return url.split(regex);
-    }
+			if (ex != null) {
+				log.warn(_loc.get("slice-connect-known-warn", slice, url, 
+						ex.getCause()));
+			} else {
+				log.warn(_loc.get("slice-connect-warn", slice, url));
+			}
+			badSlices.add(slice);
+		} else if (ex != null) {
+			throw new UserException(_loc.get("slice-connect-known-error",
+					slice, url, ex), ex.getCause());
+		} else {
+			throw new UserException(_loc.get("slice-connect-error", slice, url));
+		}
+	}
+
+	/**
+	 * Create JDBCConfigurations for slices from the given properties.
+	 */
+	void setSlices(Map p) {
+		Set<String> slices = findSlices(p);
+		Log log = getLog(LOG_RUNTIME);
+		if (slices.isEmpty()) {
+			throw new UserException(_loc.get("slice-none-configured"));
+		} else if (log != null && log.isInfoEnabled()) {
+			log.info(_loc.get("slice-available", slices));
+		}
+		for (String key : slices) {
+			JDBCConfiguration slice = new JDBCConfigurationImpl();
+			slice.fromProperties(replaceKeys(p, prefix(key), "openjpa."));
+			_slices.put(key, slice);
+			if (log != null && log.isInfoEnabled())
+				log.info(_loc.get("slice-configuration", key, slice
+						.toProperties(false)));
+		}
+	}
+
+	String prefix(String key) {
+		return PREFIX_SLICE + key + ".";
+	}
+
+	Set<String> findSlices(Map p) {
+		Set<String> slices = new HashSet<String>();
+		for (Object o : p.keySet()) {
+			if (o instanceof String && o.toString().startsWith(PREFIX_SLICE)) {
+				String sliceName =
+						chopTail(chopHead(o.toString(), PREFIX_SLICE), ".");
+				if (!isKnownProperty(sliceName))
+					slices.add(sliceName);
+			}
+		}
+		return slices;
+	}
+
+	boolean isKnownProperty(String property) {
+		return distributionPolicyPlugin.getProperty().equals(property)
+				|| lenient.getProperty().equals(property);
+	}
+
+	static String chopHead(String s, String head) {
+		if (s.startsWith(head))
+			return s.substring(head.length());
+		return s;
+	}
+
+	static String chopTail(String s, String tail) {
+		int i = s.lastIndexOf(tail);
+		if (i == -1)
+			return s;
+		return s.substring(0, i);
+	}
+
+	/**
+	 * Replace the key that has the given prefix in the given <code>old</code>
+	 * map, by new key with new prefix. Also throw away entries that belong to
+	 * other slices.
+	 */
+	Map replaceKeys(Map old, String oldPrefix, String newPrefix) {
+		Map result = new HashMap();
+		for (Object o : old.keySet()) {
+			String key = o.toString();
+			Object value = old.get(o);
+			if (key.startsWith(oldPrefix)) {
+				String newKey = newPrefix + chopHead(key, oldPrefix);
+				result.put(newKey, value);
+				// System.err.println("Replaced [" + key + "] with " + newKey +
+				// " value=" + value);
+			} else if (!key.startsWith(PREFIX_SLICE)) { // not other slice
+				// properties
+				result.put(key, value); // keep them as it is
+			} else {
+				// System.err.println("Ignored [" + key + "] with " +
+				// oldPrefix);
+			}
+		}
+		return result;
+	}
+
 }

Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java (original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java Mon Dec 24 18:20:24 2007
@@ -19,11 +19,12 @@
 package org.apache.openjpa.slice.jdbc;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -57,17 +58,25 @@
 import org.apache.openjpa.util.UserException;
 
 /**
- * A Store manager for multiple physical databases.
- * 
+ * A Store manager for multiple physical databases referred as <em>slice</em>.
+ * The actions are delegated to the underlying slices. The actions are executed
+ * in parallel threads whenever possible such as flushing or query. 
+ * <br> 
+ * The major limitation of this implementation is lack of two-phase commit 
+ * protocol. The distributed commit is indirectly supported by the context
+ * i.e. the Broker which registers as participant to external transaction 
+ * manager, when available, as in a Application Container environment.
+ * <br>
+ * However, if invoked in local transaction then this implementation does not
+ * act like a transaction manager to its underlying slices.
  * 
  * @author Pinaki Poddar
  * 
  */
 public class DistributedStoreManager extends JDBCStoreManager {
-	private final JDBCStoreManager[] _children;
-	private final JDBCStoreManager _master;
+	private final Map<String,JDBCStoreManager> _slices;
+	private JDBCStoreManager _master;
 	private final DistributedJDBCConfiguration _conf;
-	private final Integer[] _storeIds;
 	private static final Localizer _loc = 
 		Localizer.forPackage(DistributedStoreManager.class);
 	private static ExecutorService threadPool = Executors.newCachedThreadPool();
@@ -82,14 +91,12 @@
 	 */
 	public DistributedStoreManager(DistributedJDBCConfiguration conf) {
 		_conf = conf;
-		_children = new JDBCStoreManager[conf.getConnectionURLs().length];
-		_storeIds = new Integer[_children.length];
-		for (int i = 0; i < _children.length; i++) {
+		_slices = new HashMap<String,JDBCStoreManager>();
+		for (String slice:conf.getSliceNames()) {
 			JDBCStoreManager child = new JDBCStoreManager();
-			_children[i] = child;
-			_storeIds[i] = i;
+			_slices.put(slice, child);
+			_master = (_master == null) ? child : _master;
 		}
-		_master = _children[0];
 	}
 
 	public DistributedJDBCConfiguration getConfiguration() {
@@ -103,73 +110,66 @@
 	 * additional connection info is used to estimate for the existing
 	 * instances.
 	 */
-	protected int getSliceIndex(OpenJPAStateManager sm, Object info) {
-		boolean hasIndex = hasSliceIndex(sm);
+	protected String findSlice(OpenJPAStateManager sm, Object info) {
+		boolean hasIndex = hasSlice(sm);
 		if (hasIndex)
-			return (Integer) sm.getImplData();
-		if (sm.isNew()) {
-			return assignSliceIndex(sm);
-		} else {
-			int i = estimateSliceIndex(sm, info);
-			if (i < 0)
-				return assignSliceIndex(sm);
-			else
-				return i;
-		}
+			return sm.getImplData().toString();
+		String slice = estimateSlice(sm, info);
+		if (slice == null) 
+			return assignSlice(sm);
+		return slice;
 	}
 
-	private boolean hasSliceIndex(OpenJPAStateManager sm) {
-		Object index = sm.getImplData();
-		return index != null;
+	private boolean hasSlice(OpenJPAStateManager sm) {
+		return sm.getImplData() != null;
 	}
 
-	private int assignSliceIndex(OpenJPAStateManager sm) {
+	private String assignSlice(OpenJPAStateManager sm) {
 		PersistenceCapable pc = sm.getPersistenceCapable();
-		int newi = _conf.getDistributionPolicyInstance().distribute(pc,
-				_conf.getConnectionURLs());
-		if (newi<0 || newi>=_storeIds.length) {
-			String[] urls = _conf.getConnectionURLs();
+		String slice = _conf.getDistributionPolicyInstance().distribute(pc, 
+				_slices.keySet(), getContext());
+		if (!_slices.containsKey(slice)) {
 			throw new UserException(_loc.get("bad-policy-slice", 
 				new Object[]{
 				_conf.getDistributionPolicyInstance().getClass().getName(), 
-				newi, sm.getPersistenceCapable(), 
-				urls.length-1, Arrays.toString(urls)}));
+				slice, sm.getPersistenceCapable(), 
+				_slices.keySet()}));
 		}
-		sm.setImplData(_storeIds[newi], true);
-		return newi;
+		sm.setImplData(slice, true);
+		return slice;
 	}
 
 	/**
 	 * The additional edata is used, if possible, to find the StoreManager
 	 * managing the given StateManager. If the additional data is unavailable
-	 * then return a negative value.
+	 * then return null.
 	 * 
 	 */
-	private int estimateSliceIndex(OpenJPAStateManager sm, Object edata) {
+	private String estimateSlice(OpenJPAStateManager sm, Object edata) {
 		if (edata == null || !(edata instanceof ConnectionInfo))
-			return -1;
+			return null;
 
 		Result result = ((ConnectionInfo) edata).result;
 		if (result instanceof ResultSetResult) {
 			JDBCStore store = ((ResultSetResult) result).getStore();
-			for (int i = 0; i < _children.length; i++) {
-				if (_children[i] == store) {
-					sm.setImplData(_storeIds[i], true);
-					return i;
+			for (String slice:_slices.keySet()) {
+				if (_slices.get(slice) == store) {
+					sm.setImplData(slice, true);
+					return slice;
 				}
 			}
 		}
-		return -1;
+		return null;
 	}
 
 	/**
 	 * Selects a child StoreManager where the given instance resides.
 	 */
 	private StoreManager selectStore(OpenJPAStateManager sm, Object edata) {
-		int i = getSliceIndex(sm, edata);
-		if (i >= 0 && i < _children.length)
-			return _children[i];
-		throw new InternalException(_loc.get("wrong-slice", i, sm));
+		String slice = findSlice(sm, edata);
+		if (_slices.containsKey(slice)) 
+			return _slices.get(slice);
+		throw new InternalException(_loc.get("wrong-slice", slice, sm));
 	}
 
 	public boolean assignField(OpenJPAStateManager sm, int field,
@@ -187,29 +187,29 @@
 	}
 
 	public void begin() {
-		for (StoreManager child : _children)
+		for (StoreManager child : _slices.values())
 			child.begin();
 	}
 
 	public void beginOptimistic() {
-		for (StoreManager child : _children)
+		for (StoreManager child : _slices.values())
 			child.beginOptimistic();
 	}
 
 	public boolean cancelAll() {
 		boolean ret = true;
-		for (StoreManager child : _children)
+		for (StoreManager child : _slices.values())
 			ret = child.cancelAll() & ret;
 		return ret;
 	}
 
 	public void close() {
-		for (StoreManager child : _children)
+		for (StoreManager child : _slices.values())
 			child.close();
 	}
 
 	public void commit() {
-		for (StoreManager child : _children)
+		for (StoreManager child : _slices.values())
 			child.commit();
 	}
 
@@ -223,18 +223,18 @@
 
 	public ResultObjectProvider executeExtent(ClassMetaData meta,
 			boolean subclasses, FetchConfiguration fetch) {
-		ResultObjectProvider[] tmp = new ResultObjectProvider[_children.length];
+		ResultObjectProvider[] tmp = new ResultObjectProvider[_slices.size()];
 		int i = 0;
-		for (StoreManager child : _children) {
+		for (StoreManager child : _slices.values()) {
 			tmp[i++] = child.executeExtent(meta, subclasses, fetch);
 		}
 		return new MergedResultObjectProvider(tmp);
 	}
 
 	public boolean exists(OpenJPAStateManager sm, Object edata) {
-		for (int i=0; i<_children.length; i++) {
-			if (_children[i].exists(sm, edata)) {
-				sm.setImplData(_storeIds[i], true);
+		for (String slice:_slices.keySet()) {
+			if (_slices.get(slice).exists(sm, edata)) {
+				sm.setImplData(slice, true);
 				return true;
 			}
 		}
@@ -248,23 +248,23 @@
 	@SuppressWarnings("unchecked")
 	public Collection flush(Collection sms) {
 		Collection exceptions = new ArrayList();
-		List<List<OpenJPAStateManager>> subsets = 
-			new ArrayList<List<OpenJPAStateManager>>();
-		for (int i=0; i<_children.length; i++)
-			subsets.add(new ArrayList<OpenJPAStateManager>());
+		Map<String,List<OpenJPAStateManager>> subsets = 
+			new HashMap<String,List<OpenJPAStateManager>>();
+		for (String slice:_slices.keySet())
+			subsets.put(slice, new ArrayList<OpenJPAStateManager>());
 		for (Object x : sms) {
 			OpenJPAStateManager sm = (OpenJPAStateManager) x;
-			int i = getSliceIndex(sm, null);
-			subsets.get(i).add(sm);
+			String slice = findSlice(sm, null);
+			subsets.get(slice).add(sm);
 		}
 		List<Future<Collection>> futures = new ArrayList<Future<Collection>>(); 
-		int i = 0; 
-		for (JDBCStoreManager store:_children) {
-			List<OpenJPAStateManager> toBeFlushed = subsets.get(i++);
+		 
+		for (String slice:_slices.keySet()) {
+			List<OpenJPAStateManager> toBeFlushed = subsets.get(slice);
 			if (toBeFlushed.isEmpty()) 
 				continue;
 			Flusher flusher = new Flusher();
-			flusher.store   = store;
+			flusher.store   = _slices.get(slice);
 			flusher.toFlush = toBeFlushed;
 			futures.add(threadPool.submit(flusher));
 		}
@@ -307,14 +307,14 @@
 	public boolean initialize(OpenJPAStateManager sm, PCState state,
 			FetchConfiguration fetch, Object edata) {
 		if (edata instanceof ConnectionInfo) {
-			int slice = getSliceIndex(sm, (ConnectionInfo) edata);
-			if (slice >= 0)
-				return _children[slice].initialize(sm, state, fetch, edata);
+			String slice = findSlice(sm, (ConnectionInfo) edata);
+			if (slice != null)
+				return _slices.get(slice).initialize(sm, state, fetch, edata);
 		}
 		// not a part of Query result load. Look into the slices till found
-		for (int i = 0; i < _children.length; i++) {
-			if (_children[i].initialize(sm, state, fetch, edata)) {
-				sm.setImplData(_storeIds[i], true);
+		for (String slice:_slices.keySet()) {
+			if (_slices.get(slice).initialize(sm, state, fetch, edata)) {
+				sm.setImplData(slice, true);
 				return true;
 			}
 		}
@@ -346,30 +346,30 @@
 	public StoreQuery newQuery(String language) {
 		ExpressionParser parser = QueryLanguages.parserForLanguage(language);
 		DistributedStoreQuery ret = new DistributedStoreQuery(this, parser);
-		for (JDBCStoreManager child : _children) {
+		for (JDBCStoreManager child : _slices.values()) {
 			ret.add(child.newQuery(language));
 		}
 		return ret;
 	}
 
 	public void releaseConnection() {
-		for (StoreManager child : _children)
+		for (StoreManager child : _slices.values())
 			child.releaseConnection();
 
 	}
 
 	public void retainConnection() {
-		for (StoreManager child : _children)
+		for (StoreManager child : _slices.values())
 			child.retainConnection();
 	}
 
 	public void rollback() {
-		for (StoreManager child : _children)
+		for (StoreManager child : _slices.values())
 			child.rollback();
 	}
 
 	public void rollbackOptimistic() {
-		for (StoreManager child : _children)
+		for (StoreManager child : _slices.values())
 			child.rollbackOptimistic();
 	}
 
@@ -379,7 +379,7 @@
 	public void setContext(StoreContext ctx) {
 		super.setContext(ctx);
 		Iterator<JDBCConfiguration> confs = _conf.iterator();
-		for (JDBCStoreManager child : _children)
+		for (JDBCStoreManager child : _slices.values())
 			child.setContext(ctx, confs.next());
 	}
 

Modified: labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties (original)
+++ labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties Mon Dec 24 18:20:24 2007
@@ -1,12 +1,20 @@
+slice-not-found: No slice named "{0}" can be found. Available slices are "{1}"
+slice-none-configured: No slice has been configured. Specify slice.XYZ as \
+	property name to register a slice named XYZ.
+slice-configuration: Slice "{0}" configured with "{1}"
+slice-available: Detected slices "{0}" in configuration.
 no-url: No ConnectionURL property has been specified.
 wrong-url: URL "{0}" is invalid as database URL.
 wrong-slice: Wrong slice "{0}" for "{1}"
-connect: Connecting to "{0}"
-connect-error-lenient: Failed to connect to "{0}". This database will be \
-	ignored as Lenient property is set to true.
-connect-error: Failed to connect to "{0}"
-bad-policy-slice:Distribution policy "{0}" has returned invalid slice index \
-	"{1}" for "{2}". The valid slice index is between 0 and {3} both \
-	inclusive representing index of active databases "{4}". This can happen \
-	when one or more of the originally configured databases are not available \
+slice-connect: Connecting to slice "{0}" at URL "{1}"
+slice-connect-warn: Failed to connect to slice "{0}". Slice "{0}" will be \
+	ignored as configuration is set as lenient.
+slice-connect-known-warn: Failed to connect to slice "{0}" due to "{2}. \
+	Slice "{0}" will be ignored as configuration is set as lenient.
+slice-connect-error: Failed to connect to slice "{0}" at URL "{1}"
+slice-connect-known-error: Failed to connect to slice "{0}" at URL "{1} due to \
+	{2}"
+bad-policy-slice:Distribution policy "{0}" has returned invalid slice \
+	"{1}" for "{2}". The valid slices are {3}. This error may happen \
+	when one or more of the originally configured slices are unavailable \
 	and Lenient property is set to true.

Modified: labs/fluid/slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/test/java/org/apache/openjpa/slice/TestBasic.java?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/test/java/org/apache/openjpa/slice/TestBasic.java (original)
+++ labs/fluid/slice/src/test/java/org/apache/openjpa/slice/TestBasic.java Mon Dec 24 18:20:24 2007
@@ -34,24 +34,13 @@
 
 	public void setUp() throws Exception {
 		super.setUp(PObject.class, Person.class, Address.class);
+		assertNotNull("No unit for " + persistenceUnitName, emf);
 	}
 	
 	public void tearDown() throws Exception {
 		// do not invoke super -- it deletes the data
 	}
 	
-	/**
-	 * Tests that user-level configurations are set.
-	 */
-	public void testConfig() {
-		assertTrue(emf.getConfiguration() instanceof DistributedConfiguration);
-		DistributedConfiguration conf = (DistributedConfiguration)emf.getConfiguration();
-		assertTrue(conf.getConnectionURLs().length>1);
-		String brokerFactory = conf.getBrokerFactory();
-		assertTrue(brokerFactory.equals("slice") || 
-				   brokerFactory.equals(DistributedJDBCBrokerFactory.class.getName()));
-		assertNotNull(conf.getDistributionPolicyInstance());
-	}
 	
 	PObject persist() {
 		EntityManager em = emf.createEntityManager();
@@ -117,7 +106,7 @@
 		em.clear();
 		
 		em = emf.createEntityManager();
-		List<Person> persons = em.createQuery("SELECT p FROM Person p WHERE p.name=?1").
+		List<Person> persons = em.createQuery("SELECT p FROM Person p WHERE p.name=?1 ORDER BY p.address.city").
 			setParameter(1, "A").getResultList();
 		List<Address> addresses = em.createQuery("SELECT a FROM Address a").getResultList();
 		for (Address pc:addresses) {

Modified: labs/fluid/slice/src/test/resources/META-INF/persistence.xml
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/test/resources/META-INF/persistence.xml?rev=606752&r1=606751&r2=606752&view=diff
==============================================================================
--- labs/fluid/slice/src/test/resources/META-INF/persistence.xml (original)
+++ labs/fluid/slice/src/test/resources/META-INF/persistence.xml Mon Dec 24 18:20:24 2007
@@ -1,5 +1,19 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <persistence xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="1.0" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd">
+   <persistence-unit name="per-slice">
+      <properties>
+         <property name="openjpa.BrokerFactory" value="slice"/>
+         <property name="openjpa.ConnectionDriverName" value="com.mysql.jdbc.Driver"/>
+         <property name="slice.One.ConnectionURL" value="jdbc:mysql://localhost/slice1"/>
+         <property name="slice.Two.ConnectionURL" value="jdbc:mysql://localhost/slice2"/>
+         <property name="slice.Three.ConnectionURL" value="jdbc:mysql://localhost/slice3"/>
+         <property name="slice.Three.ConnectionDriverName" value="x.y.z"/>
+         
+         <property name="slice.DistributionPolicy" value="org.apache.openjpa.slice.policy.UserDistributionPolicy"/>
+         <property name="slice.Lenient" value="false"/>
+      </properties>
+   </persistence-unit>
+   
    <persistence-unit name="slice">
         <class>domain.PObject</class>
         <class>domain.Person</class>
@@ -7,7 +21,8 @@
 		<properties>
             <property name="openjpa.BrokerFactory" value="slice"/>
             <property name="openjpa.ConnectionDriverName" value="com.mysql.jdbc.Driver"/>
-            <property name="openjpa.ConnectionURL" value="jdbc:mysql://localhost/slice1|jdbc:mysql://localhost/slice2"/>
+            <property name="slice.One.ConnectionURL" value="jdbc:mysql://localhost/slice1"/>
+            <property name="slice.Two.ConnectionURL" value="jdbc:mysql://localhost/slice2"/>
             <property name="openjpa.ConnectionUserName" value="root"/>
             <property name="openjpa.ConnectionPassword" value="hello"/>
             <property name="slice.DistributionPolicy" value="org.apache.openjpa.slice.policy.UserDistributionPolicy"/>
@@ -22,7 +37,7 @@
       	</properties>
    </persistence-unit>
    
-      <persistence-unit name="slice1">
+      <!-- persistence-unit name="slice1">
         <properties>
             <property name="openjpa.ConnectionDriverName" value="com.mysql.jdbc.Driver"/>
             <property name="openjpa.ConnectionURL" value="jdbc:mysql://localhost/slice1"/>
@@ -40,7 +55,7 @@
             <property name="openjpa.ConnectionPassword" value="hello"/>
             <property name="openjpa.Connection2URL" value="jdbc:mysql://localhost/slice2"/>
         </properties>
-   </persistence-unit>
+   </persistence-unit -->
    
    
 </persistence>



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@labs.apache.org
For additional commands, e-mail: commits-help@labs.apache.org