You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openjpa.apache.org by pp...@apache.org on 2010/05/05 01:01:51 UTC
svn commit: r941084 - in /openjpa/trunk/openjpa-slice/src:
main/java/org/apache/openjpa/slice/ main/java/org/apache/openjpa/slice/jdbc/
test/java/org/apache/openjpa/slice/
Author: ppoddar
Date: Tue May 4 23:01:51 2010
New Revision: 941084
URL: http://svn.apache.org/viewvc?rev=941084&view=rev
Log:
OPENJPA-1648,OPENJPA-1649,OPENJPA-1650: Replace fixed thread pools by cached thread pool.
Stop creating thread pools on every flush/query.
Streamline property parsing/processing to create slice configurations.
Retain addSlice() on Persistence Unit and remove from Persistence Context and Configuration.
Modified:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBroker.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/Slice.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/PObject.java
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestConfiguration.java
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBroker.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBroker.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBroker.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBroker.java Tue May 4 23:01:51 2010
@@ -18,29 +18,17 @@
*/
package org.apache.openjpa.slice;
-import java.util.Map;
-
import org.apache.openjpa.kernel.Broker;
/**
- * Extension to Broker to allow dynamically add/remove slices.
+ * Extension to Broker to allow access to virtual data store.
*
* @author Pinaki Poddar
*
*/
public interface DistributedBroker extends Broker {
/**
- * Adds the given slice with the given properties. This newly added slice
- * will participate in the current and subsequent transaction.
- *
- * @param name logical name of the to be added slice. Must be different from
- * any currently available slices.
- * @see DistributedConfiguration#getAvailableSliceNames()
- *
- * @param properties key-value pair of configuration for the slice to be
- * added. The keys must have openjpa.slice.<name>.* as prefix.
- *
- * @see DistributedConfiguration#addSlice(String, Map)
+ * Gets the distributed store manager used by this receiver.
*/
- Slice addSlice(String name, Map properties);
+ DistributedStoreManager getDistributedStoreManager();
}
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java Tue May 4 23:01:51 2010
@@ -44,8 +44,7 @@ public class DistributedBrokerImpl exten
private transient DistributedConfiguration _conf;
private final ReentrantSliceLock _lock;
- private static final Localizer _loc =
- Localizer.forPackage(DistributedBrokerImpl.class);
+ private static final Localizer _loc = Localizer.forPackage(DistributedBrokerImpl.class);
public DistributedBrokerImpl() {
super();
@@ -60,15 +59,7 @@ public class DistributedBrokerImpl exten
}
public DistributedStoreManager getDistributedStoreManager() {
- return (DistributedStoreManager)getStoreManager().
- getInnermostDelegate();
- }
-
- public Slice addSlice(String name, Map properties) {
- Slice slice = ((DistributedBrokerFactory)getBrokerFactory()).addSlice(
- name, properties);
- getDistributedStoreManager().addSlice(slice);
- return slice;
+ return (DistributedStoreManager)getStoreManager().getInnermostDelegate();
}
/**
@@ -81,8 +72,7 @@ public class DistributedBrokerImpl exten
* been assigned before.
*/
@Override
- public OpenJPAStateManager persist(Object pc, Object id, boolean explicit,
- OpCallbacks call) {
+ public OpenJPAStateManager persist(Object pc, Object id, boolean explicit, OpCallbacks call) {
OpenJPAStateManager sm = getStateManager(pc);
SliceInfo info = null;
boolean replicated = SliceImplHelper.isReplicated(pc,
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedConfiguration.java Tue May 4 23:01:51 2010
@@ -19,7 +19,6 @@
package org.apache.openjpa.slice;
import java.util.List;
-import java.util.Map;
import org.apache.openjpa.conf.OpenJPAConfiguration;
@@ -118,10 +117,4 @@ public interface DistributedConfiguratio
* replicated instances will be replicated across the available slices.
*/
void setReplicationPolicy(String policy);
- /**
- * Adds a new Slice of the given name and given properties.
- * The given properties must have keys with prefix openjpa.slice.<name>.*
- * where <name> is the new slice to be added.
- */
- Slice addSlice(String name, Map properties);
}
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/Slice.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/Slice.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/Slice.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/Slice.java Tue May 4 23:01:51 2010
@@ -24,11 +24,12 @@ import org.apache.openjpa.conf.OpenJPACo
/**
* Represents a database slice of immutable logical name, a configuration and
- * status.
+ * status. A Slice is uniquely identified by its logical name.
*
* @author Pinaki Poddar
*
*/
+@SuppressWarnings("serial")
public class Slice implements Comparable<Slice>,Serializable {
public enum Status {
NOT_INITIALIZED,
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java Tue May 4 23:01:51 2010
@@ -19,10 +19,8 @@
package org.apache.openjpa.slice;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
/**
* A thread to execute operation against each database slice.
@@ -32,7 +30,8 @@ import java.util.concurrent.TimeUnit;
*/
public class SliceThread extends Thread {
private final Thread _parent;
-
+ private static ExecutorService _pool;
+
public SliceThread(String name, Thread parent, Runnable r) {
super(r, name);
_parent = parent;
@@ -52,23 +51,23 @@ public class SliceThread extends Thread
}
/**
- * Create a pool of given size.
- * The thread factory is specialized to create SliceThread which gets
- * preferential treatment for locking.
+ * Create a cached pool of <em>slice</em> threads.
+ * The thread factory creates specialized threads for preferential locking treatment.
*
*/
- public static ExecutorService newPool(int size) {
- return new ThreadPoolExecutor(size, size, 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(), new SliceThreadFactory());
+ public static ExecutorService getPool() {
+ if (_pool == null) {
+ _pool = Executors.newCachedThreadPool(new SliceThreadFactory());
+ }
+ return _pool;
}
- static class SliceThreadFactory implements ThreadFactory {
+ private static class SliceThreadFactory implements ThreadFactory {
int n = 0;
public Thread newThread(Runnable r) {
Thread parent = Thread.currentThread();
return new SliceThread(parent.getName()+"-slice-"+n++, parent, r);
}
}
-
}
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCBrokerFactory.java Tue May 4 23:01:51 2010
@@ -19,17 +19,21 @@
package org.apache.openjpa.slice.jdbc;
import java.security.AccessController;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import org.apache.openjpa.conf.OpenJPAVersion;
import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
import org.apache.openjpa.jdbc.kernel.JDBCBrokerFactory;
import org.apache.openjpa.kernel.Bootstrap;
-import org.apache.openjpa.kernel.StoreManager;
+import org.apache.openjpa.kernel.Broker;
import org.apache.openjpa.lib.conf.ConfigurationProvider;
import org.apache.openjpa.lib.util.J2DoPrivHelper;
import org.apache.openjpa.lib.util.Localizer;
+import org.apache.openjpa.slice.DistributedBroker;
import org.apache.openjpa.slice.DistributedBrokerFactory;
+import org.apache.openjpa.slice.DistributedBrokerImpl;
import org.apache.openjpa.slice.Slice;
/**
@@ -41,16 +45,14 @@ import org.apache.openjpa.slice.Slice;
@SuppressWarnings("serial")
public class DistributedJDBCBrokerFactory extends JDBCBrokerFactory
implements DistributedBrokerFactory {
- private static final Localizer _loc =
- Localizer.forPackage(DistributedJDBCBrokerFactory.class);
+ private static final Localizer _loc = Localizer.forPackage(DistributedJDBCBrokerFactory.class);
+
/**
* Factory method for constructing a factory from properties. Invoked from
* {@link Bootstrap#newBrokerFactory}.
*/
- public static DistributedJDBCBrokerFactory newInstance(
- ConfigurationProvider cp) {
- DistributedJDBCConfigurationImpl conf =
- new DistributedJDBCConfigurationImpl(cp);
+ public static DistributedJDBCBrokerFactory newInstance(ConfigurationProvider cp) {
+ DistributedJDBCConfigurationImpl conf = new DistributedJDBCConfigurationImpl();
cp.setInto(conf);
return new DistributedJDBCBrokerFactory(conf);
}
@@ -60,10 +62,9 @@ public class DistributedJDBCBrokerFactor
* Invoked from {@link Bootstrap#getBrokerFactory}.
*/
public static JDBCBrokerFactory getInstance(ConfigurationProvider cp) {
- Map properties = cp.getProperties();
+ Map<String,Object> properties = cp.getProperties();
Object key = toPoolKey(properties);
- DistributedJDBCBrokerFactory factory =
- (DistributedJDBCBrokerFactory) getPooledFactoryForKey(key);
+ DistributedJDBCBrokerFactory factory = (DistributedJDBCBrokerFactory) getPooledFactoryForKey(key);
if (factory != null)
return factory;
@@ -75,17 +76,14 @@ public class DistributedJDBCBrokerFactor
/**
* Factory method for constructing a factory from a configuration.
*/
- public static synchronized JDBCBrokerFactory getInstance(
- JDBCConfiguration conf) {
- Map properties = conf.toProperties(false);
+ public static synchronized JDBCBrokerFactory getInstance(DistributedJDBCConfiguration conf) {
+ Map<String,Object> properties = conf.toProperties(false);
Object key = toPoolKey(properties);
- DistributedJDBCBrokerFactory factory =
- (DistributedJDBCBrokerFactory) getPooledFactoryForKey(key);
+ DistributedJDBCBrokerFactory factory = (DistributedJDBCBrokerFactory) getPooledFactoryForKey(key);
if (factory != null)
return factory;
- factory = new DistributedJDBCBrokerFactory(
- (DistributedJDBCConfiguration) conf);
+ factory = new DistributedJDBCBrokerFactory(conf);
pool(key, factory);
return factory;
}
@@ -100,11 +98,13 @@ public class DistributedJDBCBrokerFactor
}
public Slice addSlice(String name, Map properties) {
- Slice slice = getConfiguration().addSlice(name, properties);
- ClassLoader loader = AccessController.doPrivileged(
- J2DoPrivHelper.getContextClassLoaderAction());
- synchronizeMappings(loader, (JDBCConfiguration)slice.
- getConfiguration());
+ Slice slice = ((DistributedJDBCConfigurationImpl)getConfiguration()).addSlice(name, properties);
+ ClassLoader loader = AccessController.doPrivileged(J2DoPrivHelper.getContextClassLoaderAction());
+ synchronizeMappings(loader, (JDBCConfiguration)slice.getConfiguration());
+ Collection<Broker> brokers = getOpenBrokers();
+ for (Broker broker : brokers) {
+ ((DistributedBroker)broker).getDistributedStoreManager().addSlice(slice);
+ }
return slice;
}
@@ -112,7 +112,19 @@ public class DistributedJDBCBrokerFactor
protected DistributedJDBCStoreManager newStoreManager() {
return new DistributedJDBCStoreManager(getConfiguration());
}
-
+
+ @Override
+ public DistributedBroker newBroker() {
+ return new DistributedBrokerImpl();
+ }
+
+ protected void synchronizeMappings(ClassLoader loader) {
+ List<Slice> slices = getConfiguration().getSlices(Slice.Status.ACTIVE);
+ for (Slice slice : slices) {
+ synchronizeMappings(loader, (JDBCConfiguration) slice.getConfiguration());
+ }
+ }
+
@Override
protected Object getFactoryInitializationBanner() {
return _loc.get("factory-init", OpenJPAVersion.VERSION_NUMBER);
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java Tue May 4 23:01:51 2010
@@ -34,6 +34,6 @@ public interface DistributedJDBCConfigur
/**
* Gets the master slice.
*/
- Slice getMaster();
+ Slice getMasterSlice();
}
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java Tue May 4 23:01:51 2010
@@ -52,7 +52,7 @@ import org.apache.openjpa.slice.Slice;
import org.apache.openjpa.util.UserException;
/**
- * Implements a distributed configuration of JDBCStoreManagers.
+ * A specialized configuration embodies a set of Slice configurations.
* The original configuration properties are analyzed to create a set of
* Slice specific properties with defaulting rules.
*
@@ -73,63 +73,49 @@ public class DistributedJDBCConfiguratio
public PluginValue distributionPolicyPlugin;
public PluginValue replicationPolicyPlugin;
- protected Log log;
- protected String unit;
-
public static final String DOT = ".";
public static final String REGEX_DOT = "\\.";
- public static final String PREFIX_SLICE = ProductDerivation.PREFIX_SLICE +
- DOT;
+ public static final String PREFIX_SLICE = ProductDerivation.PREFIX_SLICE + DOT;
public static final String PREFIX_OPENJPA = "openjpa.";
- private static Localizer _loc =
- Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
+ private static Localizer _loc = Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
- public DistributedJDBCConfigurationImpl() {
- super();
- }
-
/**
- * Configure itself as well as underlying slices.
- *
+ * Create a configuration and declare the plug-ins.
*/
- public DistributedJDBCConfigurationImpl(ConfigurationProvider cp) {
- super(true, false);
- Map p = cp.getProperties();
- log = getConfigurationLog();
- unit = getPersistenceUnitName(p);
- setDiagnosticContext(this, unit);
-
+ public DistributedJDBCConfigurationImpl() {
+ super(true, // load derivations
+ false); // load globals
brokerPlugin.setString(DistributedBrokerImpl.class.getName());
- distributionPolicyPlugin = addPlugin(
- PREFIX_SLICE + "DistributionPolicy", true);
- distributionPolicyPlugin.setAlias("random",
- DistributionPolicy.Default.class.getName());
+ distributionPolicyPlugin = addPlugin(PREFIX_SLICE + "DistributionPolicy", true);
+ distributionPolicyPlugin.setAlias("random", DistributionPolicy.Default.class.getName());
distributionPolicyPlugin.setDefault("random");
distributionPolicyPlugin.setDynamic(true);
- replicationPolicyPlugin = addPlugin(
- PREFIX_SLICE + "ReplicationPolicy", true);
- replicationPolicyPlugin.setAlias("all",
- ReplicationPolicy.Default.class.getName());
+ replicationPolicyPlugin = addPlugin(PREFIX_SLICE + "ReplicationPolicy", true);
+ replicationPolicyPlugin.setAlias("all", ReplicationPolicy.Default.class.getName());
replicationPolicyPlugin.setDefault("all");
replicationPolicyPlugin.setDynamic(true);
lenientPlugin = addBoolean(PREFIX_SLICE + "Lenient");
+ lenientPlugin.setDefault("true");
- masterPlugin = addString(PREFIX_SLICE + "Master");
-
- namesPlugin = addStringList(PREFIX_SLICE + "Names");
-
- setSlices(p);
+ masterPlugin = addString(PREFIX_SLICE + "Master");
+ namesPlugin = addStringList(PREFIX_SLICE + "Names");
}
- private String getPersistenceUnitName(Map p) {
- Object unit = p.get(PREFIX_OPENJPA + id.getProperty());
- return (unit == null) ? "?" : unit.toString();
+ /**
+ * Configure itself as well as underlying slices.
+ *
+ */
+ public DistributedJDBCConfigurationImpl(ConfigurationProvider cp) {
+ this();
+ cp.setInto(this);
+ setDiagnosticContext(this);
}
- private void setDiagnosticContext(OpenJPAConfiguration conf, String unit) {
+ private void setDiagnosticContext(OpenJPAConfiguration conf) {
+ String unit = conf.getId();
LogFactory logFactory = conf.getLogFactory();
if (logFactory instanceof LogFactoryImpl) {
((LogFactoryImpl)logFactory).setDiagnosticContext(unit);
@@ -153,7 +139,7 @@ public class DistributedJDBCConfiguratio
*/
public List<String> getAvailableSliceNames() {
List<String> result = new ArrayList<String>();
- for (Slice slice:_slices)
+ for (Slice slice : _slices)
result.add(slice.getName());
return result;
}
@@ -163,7 +149,7 @@ public class DistributedJDBCConfiguratio
*/
public List<Slice> getSlices(Slice.Status...statuses) {
if (statuses == null)
- return Collections.unmodifiableList(_slices);
+ return _slices == null ? Collections.EMPTY_LIST : Collections.unmodifiableList(_slices);
List<Slice> result = new ArrayList<Slice>();
for (Slice slice:_slices) {
for (Slice.Status status:statuses)
@@ -173,25 +159,19 @@ public class DistributedJDBCConfiguratio
return result;
}
- /**
- * Gets the master slice.
- */
- public Slice getMaster() {
- return _master;
- }
public Slice getSlice(String name) {
return getSlice(name, false);
}
/**
- * Get the configuration for given slice.
+ * Get the Slice of the given slice.
*
* @param mustExist if true an exception if raised if the given slice name
- * is not an active slice.
+ * is not a valid slice.
*/
public Slice getSlice(String name, boolean mustExist) {
- for (Slice slice:_slices)
+ for (Slice slice : _slices)
if (slice.getName().equals(name))
return slice;
if (mustExist) {
@@ -261,6 +241,29 @@ public class DistributedJDBCConfiguratio
public boolean isLenient() {
return lenientPlugin.get();
}
+
+ public void setLenient(boolean lenient) {
+ lenientPlugin.set(lenient);
+ }
+
+ public void setMaster(String master) {
+ masterPlugin.set(master);
+ }
+
+ /**
+ * Gets the master slice.
+ */
+ public Slice getMasterSlice() {
+ if (_master == null) {
+ String value = masterPlugin.get();
+ if (value == null) {
+ _master = _slices.get(0);
+ } else {
+ _master = getSlice(value, true);
+ }
+ }
+ return _master;
+ }
/**
* Create a virtual DistributedDataSource as a composite of individual
@@ -355,48 +358,37 @@ public class DistributedJDBCConfiguratio
* Either throw a user exception or add the configuration to the given list,
* based on <code>isLenient</code>.
*/
- private void handleBadConnection(boolean isLenient, Slice slice,
- Throwable ex) {
+ private void handleBadConnection(boolean isLenient, Slice slice, Throwable ex) {
OpenJPAConfiguration conf = slice.getConfiguration();
String url = conf.getConnectionURL();
- Log log = getLog(LOG_RUNTIME);
+ Log log = conf.getConfigurationLog();
if (isLenient) {
if (ex != null) {
- log.warn(_loc.get("slice-connect-known-warn", slice, url, ex
- .getCause()));
+ log.warn(_loc.get("slice-connect-known-warn", slice, url, ex.getCause()));
} else {
log.warn(_loc.get("slice-connect-warn", slice, url));
}
} else if (ex != null) {
- throw new UserException(_loc.get("slice-connect-known-error",
- slice, url, ex), ex.getCause());
+ throw new UserException(_loc.get("slice-connect-known-error", slice, url, ex), ex.getCause());
} else {
- throw new UserException(_loc.get("slice-connect-error", slice,
- url));
+ throw new UserException(_loc.get("slice-connect-error", slice, url));
}
}
/**
- * Create individual slices with configurations from the given properties.
+ * Create a new Slice of given name and given properties.
+ *
+ * @param key name of the slice to be created
+ * @param original a set of properties.
+ * @return a newly configured slice
*/
- void setSlices(Map original) {
- List<String> sliceNames = findSlices(original);
- if (sliceNames.isEmpty()) {
- throw new UserException(_loc.get("slice-none-configured"));
- }
- for (String key : sliceNames) {
- Slice slice = newSlice(key, original);
- _slices.add(slice);
- }
- setMaster(original);
- }
-
protected Slice newSlice(String key, Map original) {
JDBCConfiguration child = new JDBCConfigurationImpl();
child.fromProperties(createSliceProperties(original, key));
- child.setId(unit+DOT+key);
- setDiagnosticContext(child, unit+DOT+key);
+ child.setId(getId()+DOT+key);
+ setDiagnosticContext(child);
Slice slice = new Slice(key, child);
+ Log log = getConfigurationLog();
if (log.isTraceEnabled())
log.trace(_loc.get("slice-configuration", key, child
.toProperties(false)));
@@ -470,21 +462,21 @@ public class DistributedJDBCConfiguratio
return s;
return s.substring(0, i);
}
-
+
/**
* Creates given <code>slice</code> specific configuration properties from
* given <code>original</code> key-value map. The rules are
- * <LI> if key begins with <code>"slice.XXX."</code> where
+ * <LI> if key begins with <code>"openjpa.slice.XXX."</code> where
* <code>XXX</code> is the given slice name, then replace
- * <code>"slice.XXX.</code> with <code>openjpa.</code>.
- * <LI>if key begins with <code>"slice."</code> but not with
- * <code>"slice.XXX."</code>, the ignore i.e. any property of other
+ * <code>"openjpa.slice.XXX.</code> with <code>openjpa.</code>.
+ * <LI>if key begins with <code>"openjpa.slice."</code> but not with
+ * <code>"openjpa.slice.XXX."</code>, then ignore i.e. any property of other
* slices or global slice property e.g.
- * <code>slice.DistributionPolicy</code>
- * <code>if key starts with <code>"openjpa."</code> and a corresponding
- * <code>"slice.XXX."</code> property does not exist, then use this as
+ * <code>openjpa.slice.DistributionPolicy</code>
+ * <li>if key starts with <code>"openjpa."</code> and a corresponding
+ * <code>"openjpa.slice.XXX."</code> property does not exist, then use this as
* default property
- * <code>property with any other prefix is simply copied
+ * <li>property with any other prefix is simply copied
*
*/
Map createSliceProperties(Map original, String slice) {
@@ -510,30 +502,8 @@ public class DistributedJDBCConfiguratio
}
return result;
}
-
- /**
- * Determine the master slice.
- */
- private void setMaster(Map original) {
- String key = masterPlugin.getProperty();
- Object masterSlice = original.get(key);
- Log log = getConfigurationLog();
- List<Slice> activeSlices = getSlices(null);
- if (masterSlice == null) {
- _master = activeSlices.get(0);
- if (log.isWarnEnabled())
- log.warn(_loc.get("no-master-slice", key, _master));
- return;
- }
- for (Slice slice:activeSlices)
- if (slice.getName().equals(masterSlice))
- _master = slice;
- if (_master == null) {
- _master = activeSlices.get(0);
- }
- }
- public Slice addSlice(String name, Map newProps) {
+ Slice addSlice(String name, Map newProps) {
String prefix = PREFIX_SLICE + DOT + name + DOT;
for (Object key : newProps.keySet()) {
if (!String.class.isInstance(key)
@@ -548,11 +518,33 @@ public class DistributedJDBCConfiguratio
slice = newSlice(name, original);
_slices.add(slice);
try {
- virtualDataSource.addDataSource(createDataSource(slice));
+ getConnectionFactory().addDataSource(createDataSource(slice));
} catch (Exception ex) {
handleBadConnection(false, slice, ex);
return null;
}
return slice;
}
+
+ /**
+ * Given the properties, creates a set of individual configurations.
+ */
+ @Override
+ public void fromProperties(Map original) {
+ super.fromProperties(original);
+ setDiagnosticContext(this);
+ List<String> sliceNames = findSlices(original);
+ for (String name : sliceNames) {
+ Slice slice = newSlice(name, original);
+ _slices.add(slice);
+ }
+ }
+
+ @Override
+ public DecoratingDataSource createConnectionFactory() {
+ if (virtualDataSource == null) {
+ virtualDataSource = createDistributedDataStore();
+ }
+ return virtualDataSource;
+ }
}
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java Tue May 4 23:01:51 2010
@@ -80,8 +80,7 @@ class DistributedJDBCStoreManager extend
private final List<SliceStoreManager> _slices;
private JDBCStoreManager _master;
private final DistributedJDBCConfiguration _conf;
- private static final Localizer _loc =
- Localizer.forPackage(DistributedJDBCStoreManager.class);
+ private static final Localizer _loc = Localizer.forPackage(DistributedJDBCStoreManager.class);
private static final Constructor<ClientConnection> clientConnectionImpl;
private static final Constructor<RefCountConnection> refCountConnectionImpl;
@@ -108,13 +107,14 @@ class DistributedJDBCStoreManager extend
super();
_conf = conf;
_slices = new ArrayList<SliceStoreManager>();
- List<String> sliceNames = conf.getActiveSliceNames();
- for (String name : sliceNames) {
- SliceStoreManager slice =
- new SliceStoreManager(conf.getSlice(name));
- _slices.add(slice);
- if (slice.getName().equals(_conf.getMaster().getName()))
- _master = slice;
+ List<Slice> slices = conf.getSlices(Slice.Status.ACTIVE);
+ Slice masterSlice = conf.getMasterSlice();
+ for (Slice slice : slices) {
+ SliceStoreManager store = new SliceStoreManager(slice);
+ _slices.add(store);
+ if (slice == masterSlice) {
+ _master = store;
+ }
}
}
@@ -128,8 +128,7 @@ class DistributedJDBCStoreManager extend
public SliceStoreManager addSlice(Slice slice) {
SliceStoreManager result = new SliceStoreManager(slice);
- result.setContext(getContext(),
- (JDBCConfiguration)slice.getConfiguration());
+ result.setContext(getContext(), (JDBCConfiguration)slice.getConfiguration());
_slices.add(result);
return result;
}
@@ -274,7 +273,7 @@ class DistributedJDBCStoreManager extend
Map<String, StateManagerSet> subsets = bin(sms, null);
Collection<StateManagerSet> remaining =
new ArrayList<StateManagerSet>(subsets.values());
- ExecutorService threadPool = SliceThread.newPool(_slices.size());
+ ExecutorService threadPool = SliceThread.getPool();
for (int i = 0; i < _slices.size(); i++) {
SliceStoreManager slice = _slices.get(i);
StateManagerSet subset = subsets.get(slice.getName());
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java Tue May 4 23:01:51 2010
@@ -125,8 +125,7 @@ class DistributedStoreQuery extends JDBC
List<SliceStoreManager> targets = findTargets();
QueryContext ctx = q.getContext();
boolean isReplicated = containsReplicated(ctx);
- ExecutorService threadPool = SliceThread.newPool(
- owner._queries.size());
+ ExecutorService threadPool = SliceThread.getPool();
for (int i = 0; i < owner._queries.size(); i++) {
// if replicated, then execute only on single slice
if (isReplicated && !usedExecutors.isEmpty()) {
@@ -206,7 +205,7 @@ class DistributedStoreQuery extends JDBC
Iterator<StoreQuery> qs = owner._queries.iterator();
List<Future<Number>> futures = null;
int result = 0;
- ExecutorService threadPool = SliceThread.newPool(executors.size());
+ ExecutorService threadPool = SliceThread.getPool();
for (Executor ex : executors) {
if (futures == null)
futures = new ArrayList<Future<Number>>();
@@ -234,7 +233,7 @@ class DistributedStoreQuery extends JDBC
Iterator<StoreQuery> qs = owner._queries.iterator();
List<Future<Number>> futures = null;
int result = 0;
- ExecutorService threadPool = SliceThread.newPool(executors.size());
+ ExecutorService threadPool = SliceThread.getPool();
for (Executor ex : executors) {
if (futures == null)
futures = new ArrayList<Future<Number>>();
Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/PObject.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/PObject.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/PObject.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/PObject.java Tue May 4 23:01:51 2010
@@ -18,6 +18,8 @@
*/
package org.apache.openjpa.slice;
+import java.util.concurrent.atomic.AtomicLong;
+
import javax.persistence.Entity;
import javax.persistence.Id;
@@ -28,10 +30,10 @@ public class PObject {
private int value;
- private static long idCounter = System.currentTimeMillis();
+ private static AtomicLong idCounter = new AtomicLong(System.currentTimeMillis());
public PObject() {
- id = ++idCounter;
+ id = idCounter.addAndGet(1);
}
public long getId() {
Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java Tue May 4 23:01:51 2010
@@ -317,22 +317,22 @@ public class TestBasic extends SliceTest
}
public void testDynamicSlice() {
- DistributedConfiguration conf =
- (DistributedConfiguration)emf.getConfiguration();
+ DistributedConfiguration conf = (DistributedConfiguration)emf.getConfiguration();
conf.setDistributionPolicyInstance(new DistributionPolicy() {
public String distribute(Object pc, List<String> slices,
Object context) {
if (PObject.class.isInstance(pc)) {
PObject o = (PObject)pc;
if (o.getValue() > 50) {
- DistributedBroker broker = (DistributedBroker)context;
+ DistributedBrokerFactory bf = (DistributedBrokerFactory)
+ ((DistributedBroker)context).getBrokerFactory();
Map newProps = new HashMap();
newProps.put("openjpa.slice.newslice.ConnectionURL",
"jdbc:derby:target/database/newslice;create=true");
newProps.put(
"openjpa.slice.newslice.ConnectionDriverName",
"org.apache.derby.jdbc.EmbeddedDriver");
- broker.addSlice("newslice", newProps);
+ bf.addSlice("newslice", newProps);
return "newslice";
} else {
return slices.get(o.getValue()%slices.size());
Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestConfiguration.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestConfiguration.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestConfiguration.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestConfiguration.java Tue May 4 23:01:51 2010
@@ -64,20 +64,18 @@ public class TestConfiguration extends S
}
public void testDynamicConfiguration() {
- DistributedJDBCConfiguration conf =
- (DistributedJDBCConfiguration) emf.getConfiguration();
+ DistributedJDBCConfiguration conf = (DistributedJDBCConfiguration) emf.getConfiguration();
List<String> slices = conf.getAvailableSliceNames();
assertTrue(slices.contains("One"));
assertTrue(slices.contains("Two"));
assertTrue(slices.contains("Three"));
- BrokerFactory bf = ((EntityManagerFactoryImpl) emf).getBrokerFactory();
- DistributedBroker broker = (DistributedBroker)bf.newBroker();
+ DistributedBrokerFactory bf = (DistributedBrokerFactory)((EntityManagerFactoryImpl) emf).getBrokerFactory();
Map newProps = new HashMap();
newProps.put("openjpa.slice.newslice.ConnectionURL",
"jdbc:derby:target/database/newslice;create=true");
newProps.put("openjpa.slice.newslice.ConnectionDriverName",
"org.apache.derby.jdbc.EmbeddedDriver");
- broker.addSlice("newslice", newProps);
+ bf.addSlice("newslice", newProps);
assertTrue(conf.getActiveSliceNames().contains("newslice"));
Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java?rev=941084&r1=941083&r2=941084&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java Tue May 4 23:01:51 2010
@@ -213,6 +213,34 @@ public class TestQueryMultiThreaded exte
waitForTermination();
em.getTransaction().rollback();
}
+
+ public void testHeavyLoad() {
+ Thread[] threads = new Thread[1000];
+ for (int i = 0; i < 1000; i++) {
+ Runnable r = new Runnable() {
+ public void run() {
+ EntityManager em = emf.createEntityManager();
+ em.getTransaction().begin();
+ for (int j = 0; j < 10; j ++) {
+ PObject pc = new PObject();
+ pc.setValue((int)System.currentTimeMillis()%10);
+ em.persist(pc);
+ }
+ em.getTransaction().commit();
+ }
+ };
+ threads[i] = new Thread(r);
+ threads[i].start();
+ }
+ for (Thread t : threads) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+ }
public void testHint() {
final List<String> targets = new ArrayList<String>();