You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openjpa.apache.org by pp...@apache.org on 2010/05/05 01:01:51 UTC

svn commit: r941084 - in /openjpa/trunk/openjpa-slice/src: main/java/org/apache/openjpa/slice/ main/java/org/apache/openjpa/slice/jdbc/ test/java/org/apache/openjpa/slice/

Author: ppoddar
Date: Tue May  4 23:01:51 2010
New Revision: 941084

URL: http://svn.apache.org/viewvc?rev=941084&view=rev
Log:
OPENJPA-1648,OPENJPA-1649,OPENJPA-1650: Replace fixed thread pools by cached thread pool. 
Stop creating thread pools on every flush/query. 
Streamline property parsing/processing to create slice configurations.
Retain addSlice() on Persistence Unit and remove from Persistence Context and Configuration.

Modified:
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBroker.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/Slice.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
    openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/PObject.java
    openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
    openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestConfiguration.java
    openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBroker.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBroker.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBroker.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBroker.java Tue May  4 23:01:51 2010
@@ -18,29 +18,17 @@
  */
 package org.apache.openjpa.slice;
 
-import java.util.Map;
-
 import org.apache.openjpa.kernel.Broker;
 
 /**
- * Extension to Broker to allow dynamically add/remove slices.
+ * Extension to Broker to allow access to virtual data store.
  * 
  * @author Pinaki Poddar
  *
  */
 public interface DistributedBroker extends Broker {
     /**
-     * Adds the given slice with the given properties. This newly added slice
-     * will participate in the current and subsequent transaction.
-     * 
-     * @param name logical name of the to be added slice. Must be different from
-     * any currently available slices.
-     * @see DistributedConfiguration#getAvailableSliceNames()
-     * 
-     * @param properties key-value pair of configuration for the slice to be
-     * added. The keys must have openjpa.slice.<name>.* as prefix.
-     * 
-     * @see DistributedConfiguration#addSlice(String, Map)
+     * Gets the distributed store manager used by this receiver.
      */
-    Slice addSlice(String name, Map properties);
+    DistributedStoreManager getDistributedStoreManager();
 }

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java Tue May  4 23:01:51 2010
@@ -44,8 +44,7 @@ public class DistributedBrokerImpl exten
 	private transient DistributedConfiguration _conf;
 	private final ReentrantSliceLock _lock;
 	
-	private static final Localizer _loc =
-			Localizer.forPackage(DistributedBrokerImpl.class);
+	private static final Localizer _loc = Localizer.forPackage(DistributedBrokerImpl.class);
 
 	public DistributedBrokerImpl() {
 	    super();
@@ -60,15 +59,7 @@ public class DistributedBrokerImpl exten
     }
     
     public DistributedStoreManager getDistributedStoreManager() {
-        return (DistributedStoreManager)getStoreManager().
-                getInnermostDelegate();
-    }
-    
-    public Slice addSlice(String name, Map properties) {
-        Slice slice = ((DistributedBrokerFactory)getBrokerFactory()).addSlice(
-            name, properties);
-        getDistributedStoreManager().addSlice(slice);
-        return slice;
+        return (DistributedStoreManager)getStoreManager().getInnermostDelegate();
     }
     
 	/**
@@ -81,8 +72,7 @@ public class DistributedBrokerImpl exten
 	 * been assigned before.
 	 */
 	@Override
-    public OpenJPAStateManager persist(Object pc, Object id, boolean explicit,
-			OpCallbacks call) {
+    public OpenJPAStateManager persist(Object pc, Object id, boolean explicit, OpCallbacks call) {
 		OpenJPAStateManager sm = getStateManager(pc);
 		SliceInfo info = null;
         boolean replicated = SliceImplHelper.isReplicated(pc,

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java Tue May  4 23:01:51 2010
@@ -19,7 +19,6 @@
 package org.apache.openjpa.slice;
 
 import java.util.List;
-import java.util.Map;
 
 import org.apache.openjpa.conf.OpenJPAConfiguration;
 
@@ -118,10 +117,4 @@ public interface DistributedConfiguratio
      * replicated instances will be replicated across the available slices.
      */
     void setReplicationPolicy(String policy);
-	/**
-	 * Adds a new Slice of the given name and given properties.
-     * The given properties must have keys with prefix openjpa.slice.<name>.*
-     * where <name> is the new slice to be added.
-	 */
-    Slice addSlice(String name, Map properties);
 }

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/Slice.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/Slice.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/Slice.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/Slice.java Tue May  4 23:01:51 2010
@@ -24,11 +24,12 @@ import org.apache.openjpa.conf.OpenJPACo
 
 /**
  * Represents a database slice of immutable logical name, a configuration and
- * status.
+ * status. A Slice is uniquely identified by its logical name.
  * 
  * @author Pinaki Poddar 
  *
  */
+@SuppressWarnings("serial")
 public class Slice implements Comparable<Slice>,Serializable {
     public enum Status {
         NOT_INITIALIZED, 

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java Tue May  4 23:01:51 2010
@@ -19,10 +19,8 @@
 package org.apache.openjpa.slice;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * A thread to execute operation against each database slice.
@@ -32,7 +30,8 @@ import java.util.concurrent.TimeUnit;
  */
 public class SliceThread extends Thread {
     private final Thread _parent;
-
+    private static ExecutorService _pool;
+    
     public SliceThread(String name, Thread parent, Runnable r) {
         super(r, name);
         _parent = parent;
@@ -52,23 +51,23 @@ public class SliceThread extends Thread 
     }
     
     /** 
-     * Create a pool of given size.
-     * The thread factory is specialized to create SliceThread which gets
-     * preferential treatment for locking.
+     * Create a cached pool of <em>slice</em> threads.
+     * The thread factory creates specialized threads for preferential locking treatment.
      * 
      */
 
-    public static ExecutorService newPool(int size) {
-        return new ThreadPoolExecutor(size, size, 60L, TimeUnit.SECONDS, 
-            new SynchronousQueue<Runnable>(), new SliceThreadFactory());
+    public static ExecutorService getPool() {
+        if (_pool == null) {
+            _pool = Executors.newCachedThreadPool(new SliceThreadFactory());
+        }
+        return _pool;
     }
     
-    static class SliceThreadFactory implements ThreadFactory {
+    private static class SliceThreadFactory implements ThreadFactory {
         int n = 0;
         public Thread newThread(Runnable r) {
             Thread parent = Thread.currentThread();
             return new SliceThread(parent.getName()+"-slice-"+n++, parent, r);
         }
     }
-
 }

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java Tue May  4 23:01:51 2010
@@ -19,17 +19,21 @@
 package org.apache.openjpa.slice.jdbc;
 
 import java.security.AccessController;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.openjpa.conf.OpenJPAVersion;
 import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
 import org.apache.openjpa.jdbc.kernel.JDBCBrokerFactory;
 import org.apache.openjpa.kernel.Bootstrap;
-import org.apache.openjpa.kernel.StoreManager;
+import org.apache.openjpa.kernel.Broker;
 import org.apache.openjpa.lib.conf.ConfigurationProvider;
 import org.apache.openjpa.lib.util.J2DoPrivHelper;
 import org.apache.openjpa.lib.util.Localizer;
+import org.apache.openjpa.slice.DistributedBroker;
 import org.apache.openjpa.slice.DistributedBrokerFactory;
+import org.apache.openjpa.slice.DistributedBrokerImpl;
 import org.apache.openjpa.slice.Slice;
 
 /**
@@ -41,16 +45,14 @@ import org.apache.openjpa.slice.Slice;
 @SuppressWarnings("serial")
 public class DistributedJDBCBrokerFactory extends JDBCBrokerFactory 
     implements DistributedBrokerFactory {
-	private static final Localizer _loc = 
-	    Localizer.forPackage(DistributedJDBCBrokerFactory.class);
+	private static final Localizer _loc = Localizer.forPackage(DistributedJDBCBrokerFactory.class);
+	
 	/**
      * Factory method for constructing a factory from properties. Invoked from
 	 * {@link Bootstrap#newBrokerFactory}.
 	 */
-	public static DistributedJDBCBrokerFactory newInstance(
-			ConfigurationProvider cp) {
-		DistributedJDBCConfigurationImpl conf =
-				new DistributedJDBCConfigurationImpl(cp);
+	public static DistributedJDBCBrokerFactory newInstance(ConfigurationProvider cp) {
+		DistributedJDBCConfigurationImpl conf =	new DistributedJDBCConfigurationImpl();
 		cp.setInto(conf);
 		return new DistributedJDBCBrokerFactory(conf);
 	}
@@ -60,10 +62,9 @@ public class DistributedJDBCBrokerFactor
 	 * Invoked from {@link Bootstrap#getBrokerFactory}.
 	 */
 	public static JDBCBrokerFactory getInstance(ConfigurationProvider cp) {
-	    Map properties = cp.getProperties();
+	    Map<String,Object> properties = cp.getProperties();
 	    Object key = toPoolKey(properties);
-		DistributedJDBCBrokerFactory factory =
-                (DistributedJDBCBrokerFactory) getPooledFactoryForKey(key);
+		DistributedJDBCBrokerFactory factory = (DistributedJDBCBrokerFactory) getPooledFactoryForKey(key);
 		if (factory != null)
 			return factory;
 
@@ -75,17 +76,14 @@ public class DistributedJDBCBrokerFactor
 	/**
 	 * Factory method for constructing a factory from a configuration.
 	 */
-	public static synchronized JDBCBrokerFactory getInstance(
-			JDBCConfiguration conf) {
-	    Map properties = conf.toProperties(false);
+	public static synchronized JDBCBrokerFactory getInstance(DistributedJDBCConfiguration conf) {
+	    Map<String,Object> properties = conf.toProperties(false);
 	    Object key = toPoolKey(properties);
-		DistributedJDBCBrokerFactory factory =
-                (DistributedJDBCBrokerFactory) getPooledFactoryForKey(key);
+		DistributedJDBCBrokerFactory factory = (DistributedJDBCBrokerFactory) getPooledFactoryForKey(key);
 		if (factory != null)
 			return factory;
 
-		factory = new DistributedJDBCBrokerFactory(
-		        (DistributedJDBCConfiguration) conf);
+		factory = new DistributedJDBCBrokerFactory(conf);
 		pool(key, factory);
 		return factory;
 	}
@@ -100,11 +98,13 @@ public class DistributedJDBCBrokerFactor
 	}
 	
 	public Slice addSlice(String name, Map properties) {
-	    Slice slice = getConfiguration().addSlice(name, properties);
-        ClassLoader loader = AccessController.doPrivileged(
-            J2DoPrivHelper.getContextClassLoaderAction());
-        synchronizeMappings(loader, (JDBCConfiguration)slice.
-                getConfiguration());
+	    Slice slice = ((DistributedJDBCConfigurationImpl)getConfiguration()).addSlice(name, properties);
+        ClassLoader loader = AccessController.doPrivileged(J2DoPrivHelper.getContextClassLoaderAction());
+        synchronizeMappings(loader, (JDBCConfiguration)slice.getConfiguration());
+        Collection<Broker> brokers = getOpenBrokers();
+        for (Broker broker : brokers) {
+            ((DistributedBroker)broker).getDistributedStoreManager().addSlice(slice);
+        }
 	    return slice;
 	}
 
@@ -112,7 +112,19 @@ public class DistributedJDBCBrokerFactor
 	protected DistributedJDBCStoreManager newStoreManager() {
 		return new DistributedJDBCStoreManager(getConfiguration());
 	}
-	
+    
+    @Override
+    public DistributedBroker newBroker() {
+        return new DistributedBrokerImpl();
+    }
+    
+    protected void synchronizeMappings(ClassLoader loader) {
+        List<Slice> slices = getConfiguration().getSlices(Slice.Status.ACTIVE);
+        for (Slice slice : slices) {
+            synchronizeMappings(loader, (JDBCConfiguration) slice.getConfiguration());
+        }
+    }
+
     @Override
     protected Object getFactoryInitializationBanner() {
         return _loc.get("factory-init", OpenJPAVersion.VERSION_NUMBER);

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java Tue May  4 23:01:51 2010
@@ -34,6 +34,6 @@ public interface DistributedJDBCConfigur
     /**
      * Gets the master slice.
      */
-    Slice getMaster();
+    Slice getMasterSlice();
 
 }

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java Tue May  4 23:01:51 2010
@@ -52,7 +52,7 @@ import org.apache.openjpa.slice.Slice;
 import org.apache.openjpa.util.UserException;
 
 /**
- * Implements a distributed configuration of JDBCStoreManagers.
+ * A specialized configuration embodies a set of Slice configurations.
  * The original configuration properties are analyzed to create a set of
  * Slice specific properties with defaulting rules. 
  * 
@@ -73,63 +73,49 @@ public class DistributedJDBCConfiguratio
     public PluginValue distributionPolicyPlugin;
     public PluginValue replicationPolicyPlugin;
     
-    protected Log log;
-    protected String unit;
-    
     public static final String DOT = ".";
     public static final String REGEX_DOT = "\\.";
-    public static final String PREFIX_SLICE = ProductDerivation.PREFIX_SLICE + 
-    	DOT;
+    public static final String PREFIX_SLICE = ProductDerivation.PREFIX_SLICE + DOT;
     public static final String PREFIX_OPENJPA = "openjpa.";
-    private static Localizer _loc =
-            Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
+    private static Localizer _loc = Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
 
-    public DistributedJDBCConfigurationImpl() {
-        super();
-    }
-    
     /**
-     * Configure itself as well as underlying slices.
-     * 
+     * Create a configuration and declare the plug-ins.
      */
-    public DistributedJDBCConfigurationImpl(ConfigurationProvider cp) {
-        super(true, false);
-        Map p = cp.getProperties();
-        log = getConfigurationLog();
-        unit = getPersistenceUnitName(p);
-        setDiagnosticContext(this, unit);
-        
+    public DistributedJDBCConfigurationImpl() {
+        super(true,   // load derivations
+              false); // load globals
         brokerPlugin.setString(DistributedBrokerImpl.class.getName());
         
-        distributionPolicyPlugin = addPlugin(
-            PREFIX_SLICE + "DistributionPolicy", true);
-        distributionPolicyPlugin.setAlias("random", 
-        	DistributionPolicy.Default.class.getName());
+        distributionPolicyPlugin = addPlugin(PREFIX_SLICE + "DistributionPolicy", true);
+        distributionPolicyPlugin.setAlias("random", DistributionPolicy.Default.class.getName());
         distributionPolicyPlugin.setDefault("random");
         distributionPolicyPlugin.setDynamic(true);
         
-        replicationPolicyPlugin = addPlugin(
-            PREFIX_SLICE + "ReplicationPolicy", true);
-        replicationPolicyPlugin.setAlias("all", 
-            ReplicationPolicy.Default.class.getName());
+        replicationPolicyPlugin = addPlugin(PREFIX_SLICE + "ReplicationPolicy", true);
+        replicationPolicyPlugin.setAlias("all", ReplicationPolicy.Default.class.getName());
         replicationPolicyPlugin.setDefault("all");
         replicationPolicyPlugin.setDynamic(true);
         
         lenientPlugin = addBoolean(PREFIX_SLICE + "Lenient");
+        lenientPlugin.setDefault("true");
         
-        masterPlugin = addString(PREFIX_SLICE + "Master");
-        
-        namesPlugin = addStringList(PREFIX_SLICE + "Names");
-        
-        setSlices(p);
+        masterPlugin  = addString(PREFIX_SLICE + "Master");
+        namesPlugin   = addStringList(PREFIX_SLICE + "Names");
     }
     
-    private String getPersistenceUnitName(Map p) {
-        Object unit = p.get(PREFIX_OPENJPA + id.getProperty());
-        return (unit == null) ? "?" : unit.toString();
+    /**
+     * Configure itself as well as underlying slices.
+     * 
+     */
+    public DistributedJDBCConfigurationImpl(ConfigurationProvider cp) {
+        this();
+        cp.setInto(this);
+        setDiagnosticContext(this);
     }
     
-    private void setDiagnosticContext(OpenJPAConfiguration conf, String unit) {
+    private void setDiagnosticContext(OpenJPAConfiguration conf) {
+        String unit = conf.getId();
         LogFactory logFactory = conf.getLogFactory();
         if (logFactory instanceof LogFactoryImpl) {
             ((LogFactoryImpl)logFactory).setDiagnosticContext(unit);
@@ -153,7 +139,7 @@ public class DistributedJDBCConfiguratio
      */
     public List<String> getAvailableSliceNames() {
         List<String> result = new ArrayList<String>();
-        for (Slice slice:_slices)
+        for (Slice slice : _slices)
             result.add(slice.getName());
         return result;
     }
@@ -163,7 +149,7 @@ public class DistributedJDBCConfiguratio
      */
     public List<Slice> getSlices(Slice.Status...statuses) {
         if (statuses == null)
-            return Collections.unmodifiableList(_slices);
+            return _slices == null ? Collections.EMPTY_LIST : Collections.unmodifiableList(_slices);
         List<Slice> result = new ArrayList<Slice>();
         for (Slice slice:_slices) {
             for (Slice.Status status:statuses)
@@ -173,25 +159,19 @@ public class DistributedJDBCConfiguratio
         return result;
     }
     
-    /**
-     * Gets the master slice. 
-     */
-    public Slice getMaster() {
-        return _master;
-    }
 
     public Slice getSlice(String name) {
         return getSlice(name, false);
     }
     
     /**
-     * Get the configuration for given slice.
+     * Get the Slice of the given slice.
      * 
      * @param mustExist if true an exception if raised if the given slice name
-     * is not an active slice.
+     * is not a valid slice.
      */
     public Slice getSlice(String name, boolean mustExist) {
-        for (Slice slice:_slices)
+        for (Slice slice : _slices)
             if (slice.getName().equals(name))
                 return slice;
         if (mustExist) {
@@ -261,6 +241,29 @@ public class DistributedJDBCConfiguratio
     public boolean isLenient() {
         return lenientPlugin.get();
     }
+    
+    public void setLenient(boolean lenient) {
+        lenientPlugin.set(lenient);
+    }
+
+    public void setMaster(String master) {
+        masterPlugin.set(master);
+    }
+    
+    /**
+     * Gets the master slice.
+     */
+    public Slice getMasterSlice() {
+        if (_master == null) {
+            String value = masterPlugin.get();
+            if (value == null) {
+                _master = _slices.get(0);
+            } else {
+                _master = getSlice(value, true);
+            }
+        }
+        return _master;
+    }
 
     /**
      * Create a virtual DistributedDataSource as a composite of individual
@@ -355,48 +358,37 @@ public class DistributedJDBCConfiguratio
      * Either throw a user exception or add the configuration to the given list,
      * based on <code>isLenient</code>.
      */
-    private void handleBadConnection(boolean isLenient, Slice slice,
-            Throwable ex) {
+    private void handleBadConnection(boolean isLenient, Slice slice, Throwable ex) {
         OpenJPAConfiguration conf = slice.getConfiguration();
         String url = conf.getConnectionURL();
-        Log log = getLog(LOG_RUNTIME);
+        Log log = conf.getConfigurationLog();
         if (isLenient) {
             if (ex != null) {
-                log.warn(_loc.get("slice-connect-known-warn", slice, url, ex
-                        .getCause()));
+                log.warn(_loc.get("slice-connect-known-warn", slice, url, ex.getCause()));
             } else {
                 log.warn(_loc.get("slice-connect-warn", slice, url));
             }
         } else if (ex != null) {
-            throw new UserException(_loc.get("slice-connect-known-error",
-                    slice, url, ex), ex.getCause());
+            throw new UserException(_loc.get("slice-connect-known-error", slice, url, ex), ex.getCause());
         } else {
-            throw new UserException(_loc.get("slice-connect-error", slice,
-                    url));
+            throw new UserException(_loc.get("slice-connect-error", slice, url));
         }
     }
 
     /**
-     * Create individual slices with configurations from the given properties.
+     * Create a new Slice of given name and given properties.
+     * 
+     * @param key name of the slice to be created
+     * @param original a set of properties.
+     * @return a newly configured slice
      */
-    void setSlices(Map original) {
-        List<String> sliceNames = findSlices(original);
-        if (sliceNames.isEmpty()) {
-            throw new UserException(_loc.get("slice-none-configured"));
-        } 
-        for (String key : sliceNames) {
-            Slice slice = newSlice(key, original);
-            _slices.add(slice);
-        }
-        setMaster(original);
-    }
-    
     protected Slice newSlice(String key, Map original) {
         JDBCConfiguration child = new JDBCConfigurationImpl();
         child.fromProperties(createSliceProperties(original, key));
-        child.setId(unit+DOT+key);
-        setDiagnosticContext(child, unit+DOT+key);
+        child.setId(getId()+DOT+key);
+        setDiagnosticContext(child);
         Slice slice = new Slice(key, child);
+        Log log = getConfigurationLog();
         if (log.isTraceEnabled())
             log.trace(_loc.get("slice-configuration", key, child
                     .toProperties(false)));
@@ -470,21 +462,21 @@ public class DistributedJDBCConfiguratio
             return s;
         return s.substring(0, i);
     }
-
+    
     /**
      * Creates given <code>slice</code> specific configuration properties from
      * given <code>original</code> key-value map. The rules are
-     * <LI> if key begins with <code>"slice.XXX."</code> where
+     * <LI> if key begins with <code>"openjpa.slice.XXX."</code> where
      * <code>XXX</code> is the given slice name, then replace
-     * <code>"slice.XXX.</code> with <code>openjpa.</code>.
-     * <LI>if key begins with <code>"slice."</code> but not with
-     * <code>"slice.XXX."</code>, the ignore i.e. any property of other
+     * <code>"openjpa.slice.XXX.</code> with <code>openjpa.</code>.
+     * <LI>if key begins with <code>"openjpa.slice."</code> but not with
+     * <code>"openjpa.slice.XXX."</code>, then ignore i.e. any property of other
      * slices or global slice property e.g.
-     * <code>slice.DistributionPolicy</code>
-     * <code>if key starts with <code>"openjpa."</code> and a corresponding
-     * <code>"slice.XXX."</code> property does not exist, then use this as
+     * <code>openjpa.slice.DistributionPolicy</code>
+     * <li>if key starts with <code>"openjpa."</code> and a corresponding
+     * <code>"openjpa.slice.XXX."</code> property does not exist, then use this as
      * default property
-     * <code>property with any other prefix is simply copied
+     * <li>property with any other prefix is simply copied
      *
      */
     Map createSliceProperties(Map original, String slice) {
@@ -510,30 +502,8 @@ public class DistributedJDBCConfiguratio
         }
         return result;
     }
-
-    /**
-     * Determine the master slice.
-     */
-    private void setMaster(Map original) {
-        String key = masterPlugin.getProperty();
-        Object masterSlice = original.get(key);
-        Log log = getConfigurationLog();
-        List<Slice> activeSlices = getSlices(null);
-        if (masterSlice == null) {
-            _master = activeSlices.get(0);
-            if (log.isWarnEnabled())
-                log.warn(_loc.get("no-master-slice", key, _master));
-            return;
-        }
-        for (Slice slice:activeSlices)
-            if (slice.getName().equals(masterSlice))
-                _master = slice;
-        if (_master == null) {
-            _master = activeSlices.get(0);
-        }
-    }
     
-    public Slice addSlice(String name, Map newProps) {
+    Slice addSlice(String name, Map newProps) {
         String prefix = PREFIX_SLICE + DOT + name + DOT;
         for (Object key : newProps.keySet()) {
             if (!String.class.isInstance(key) 
@@ -548,11 +518,33 @@ public class DistributedJDBCConfiguratio
          slice = newSlice(name, original);
         _slices.add(slice);
         try {
-            virtualDataSource.addDataSource(createDataSource(slice));
+            getConnectionFactory().addDataSource(createDataSource(slice));
         } catch (Exception ex) {
             handleBadConnection(false, slice, ex);
             return null;
         }
         return slice;
     }
+    
+    /**
+     * Given the properties, creates a set of individual configurations.
+     */
+    @Override
+    public void fromProperties(Map original) {
+        super.fromProperties(original);
+        setDiagnosticContext(this);
+        List<String> sliceNames = findSlices(original);
+        for (String name : sliceNames) {
+            Slice slice = newSlice(name, original);
+            _slices.add(slice);
+        }
+    }
+       
+    @Override
+    public DecoratingDataSource createConnectionFactory() {
+        if (virtualDataSource == null) {
+            virtualDataSource = createDistributedDataStore();
+        }
+        return virtualDataSource;
+    }
 }

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java Tue May  4 23:01:51 2010
@@ -80,8 +80,7 @@ class DistributedJDBCStoreManager extend
     private final List<SliceStoreManager> _slices;
     private JDBCStoreManager _master;
     private final DistributedJDBCConfiguration _conf;
-    private static final Localizer _loc =
-            Localizer.forPackage(DistributedJDBCStoreManager.class);
+    private static final Localizer _loc = Localizer.forPackage(DistributedJDBCStoreManager.class);
 
     private static final Constructor<ClientConnection> clientConnectionImpl;
     private static final Constructor<RefCountConnection> refCountConnectionImpl;
@@ -108,13 +107,14 @@ class DistributedJDBCStoreManager extend
         super();
         _conf = conf;
         _slices = new ArrayList<SliceStoreManager>();
-        List<String> sliceNames = conf.getActiveSliceNames();
-        for (String name : sliceNames) {
-            SliceStoreManager slice =
-                    new SliceStoreManager(conf.getSlice(name));
-            _slices.add(slice);
-            if (slice.getName().equals(_conf.getMaster().getName()))
-                _master = slice;
+        List<Slice> slices = conf.getSlices(Slice.Status.ACTIVE);
+        Slice masterSlice = conf.getMasterSlice();
+        for (Slice slice : slices) {
+            SliceStoreManager store = new SliceStoreManager(slice);
+            _slices.add(store);
+            if (slice == masterSlice) {
+                _master = store;
+            }
         }
     }
 
@@ -128,8 +128,7 @@ class DistributedJDBCStoreManager extend
     
     public SliceStoreManager addSlice(Slice slice) {
         SliceStoreManager result = new SliceStoreManager(slice);
-        result.setContext(getContext(),
-                (JDBCConfiguration)slice.getConfiguration());
+        result.setContext(getContext(), (JDBCConfiguration)slice.getConfiguration());
         _slices.add(result);
         return result;
     }
@@ -274,7 +273,7 @@ class DistributedJDBCStoreManager extend
         Map<String, StateManagerSet> subsets = bin(sms, null);
         Collection<StateManagerSet> remaining = 
             new ArrayList<StateManagerSet>(subsets.values());
-        ExecutorService threadPool = SliceThread.newPool(_slices.size());
+        ExecutorService threadPool = SliceThread.getPool();
         for (int i = 0; i < _slices.size(); i++) {
             SliceStoreManager slice = _slices.get(i);
             StateManagerSet subset = subsets.get(slice.getName());

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java Tue May  4 23:01:51 2010
@@ -125,8 +125,7 @@ class DistributedStoreQuery extends JDBC
 			List<SliceStoreManager> targets = findTargets();
 			QueryContext ctx = q.getContext();
 			boolean isReplicated = containsReplicated(ctx);
-            ExecutorService threadPool = SliceThread.newPool(
-                    owner._queries.size());
+            ExecutorService threadPool = SliceThread.getPool();
 			for (int i = 0; i < owner._queries.size(); i++) {
                 // if replicated, then execute only on single slice
 				if (isReplicated && !usedExecutors.isEmpty()) {
@@ -206,7 +205,7 @@ class DistributedStoreQuery extends JDBC
 			Iterator<StoreQuery> qs = owner._queries.iterator();
 			List<Future<Number>> futures = null;
 			int result = 0;
-            ExecutorService threadPool = SliceThread.newPool(executors.size());
+            ExecutorService threadPool = SliceThread.getPool();
 			for (Executor ex : executors) {
 				if (futures == null)
                     futures = new ArrayList<Future<Number>>();
@@ -234,7 +233,7 @@ class DistributedStoreQuery extends JDBC
 			Iterator<StoreQuery> qs = owner._queries.iterator();
 			List<Future<Number>> futures = null;
 			int result = 0;
-            ExecutorService threadPool = SliceThread.newPool(executors.size());
+            ExecutorService threadPool = SliceThread.getPool();
 			for (Executor ex : executors) {
 				if (futures == null)
                     futures = new ArrayList<Future<Number>>();

Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/PObject.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/PObject.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/PObject.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/PObject.java Tue May  4 23:01:51 2010
@@ -18,6 +18,8 @@
  */
 package org.apache.openjpa.slice;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import javax.persistence.Entity;
 import javax.persistence.Id;
 
@@ -28,10 +30,10 @@ public class PObject {
 	
 	private int value;
 	
-	private static long idCounter = System.currentTimeMillis();
+	private static AtomicLong idCounter = new AtomicLong(System.currentTimeMillis());
 	
 	public PObject() {
-		id = ++idCounter;
+		id = idCounter.addAndGet(1);
 	}
 	
 	public long getId() {

Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java Tue May  4 23:01:51 2010
@@ -317,22 +317,22 @@ public class TestBasic extends SliceTest
     }
     
     public void testDynamicSlice() {
-        DistributedConfiguration conf =
-            (DistributedConfiguration)emf.getConfiguration();
+        DistributedConfiguration conf = (DistributedConfiguration)emf.getConfiguration();
         conf.setDistributionPolicyInstance(new DistributionPolicy() {
             public String distribute(Object pc, List<String> slices,
                     Object context) {
                 if (PObject.class.isInstance(pc)) {
                     PObject o = (PObject)pc;
                     if (o.getValue() > 50) {
-                        DistributedBroker broker = (DistributedBroker)context;
+                        DistributedBrokerFactory bf = (DistributedBrokerFactory)
+                            ((DistributedBroker)context).getBrokerFactory();
                         Map newProps = new HashMap();
                         newProps.put("openjpa.slice.newslice.ConnectionURL",
                             "jdbc:derby:target/database/newslice;create=true");
                         newProps.put(
                             "openjpa.slice.newslice.ConnectionDriverName",
                             "org.apache.derby.jdbc.EmbeddedDriver");
-                        broker.addSlice("newslice", newProps);
+                        bf.addSlice("newslice", newProps);
                         return "newslice";
                     } else {
                         return slices.get(o.getValue()%slices.size());

Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestConfiguration.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestConfiguration.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestConfiguration.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestConfiguration.java Tue May  4 23:01:51 2010
@@ -64,20 +64,18 @@ public class TestConfiguration extends S
     }
     
     public void testDynamicConfiguration() {
-        DistributedJDBCConfiguration conf =
-            (DistributedJDBCConfiguration) emf.getConfiguration();
+        DistributedJDBCConfiguration conf = (DistributedJDBCConfiguration) emf.getConfiguration();
         List<String> slices = conf.getAvailableSliceNames();
         assertTrue(slices.contains("One"));
         assertTrue(slices.contains("Two"));
         assertTrue(slices.contains("Three"));
-        BrokerFactory bf = ((EntityManagerFactoryImpl) emf).getBrokerFactory();
-        DistributedBroker broker = (DistributedBroker)bf.newBroker();
+        DistributedBrokerFactory bf = (DistributedBrokerFactory)((EntityManagerFactoryImpl) emf).getBrokerFactory();
         Map newProps = new HashMap();
         newProps.put("openjpa.slice.newslice.ConnectionURL",
                 "jdbc:derby:target/database/newslice;create=true");
         newProps.put("openjpa.slice.newslice.ConnectionDriverName",
                 "org.apache.derby.jdbc.EmbeddedDriver");
-        broker.addSlice("newslice", newProps);
+        bf.addSlice("newslice", newProps);
         
         assertTrue(conf.getActiveSliceNames().contains("newslice"));
         

Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java Tue May  4 23:01:51 2010
@@ -213,6 +213,34 @@ public class TestQueryMultiThreaded exte
         waitForTermination();
         em.getTransaction().rollback();
     }
+    
+    public void testHeavyLoad() {
+        Thread[] threads = new Thread[1000];
+        for (int i = 0; i < 1000; i++) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    EntityManager em = emf.createEntityManager();
+                    em.getTransaction().begin();
+                    for (int j = 0; j < 10; j ++) {
+                        PObject pc = new PObject();
+                        pc.setValue((int)System.currentTimeMillis()%10);
+                        em.persist(pc);
+                    }
+                    em.getTransaction().commit();
+                }
+            };
+            threads[i] = new Thread(r);
+            threads[i].start();
+        }
+        for (Thread t : threads) {
+            try {
+                t.join();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                fail();
+            }
+        }
+    }
 
     public void testHint() {
         final List<String> targets = new ArrayList<String>();