You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openjpa.apache.org by pp...@apache.org on 2008/12/19 01:08:24 UTC
svn commit: r727864 - in /openjpa/trunk:
openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/
openjpa-kernel/src/main/java/org/apache/openjpa/kernel/
openjpa-persistence/src/main/java/org/apache/openjpa/persistence/
openjpa-persistence/src/main/r...
Author: ppoddar
Date: Thu Dec 18 16:08:23 2008
New Revision: 727864
URL: http://svn.apache.org/viewvc?rev=727864&view=rev
Log:
OPENJPA-825: Introduced internal locking for shared contexts (BrokerImpl/QueryImpl).
Modified:
openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java
openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
Modified: openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java (original)
+++ openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java Thu Dec 18 16:08:23 2008
@@ -84,7 +84,6 @@
public class JDBCStoreQuery
extends ExpressionStoreQuery {
- private boolean _isUnique = false;
private static final Table INVALID = new Table();
// add all standard filter and aggregate listeners to these maps
@@ -112,11 +111,6 @@
_store = store;
}
- @Override
- public void setContext(QueryContext ctx) {
- super.setContext(ctx);
- _isUnique = ctx.isUnique();
- }
/**
* Return the store.
*/
@@ -348,7 +342,7 @@
evaluate(ctx, null, null, exps[i], states[i]);
if (optHint != null)
sel.setExpectedResultCount(optHint.intValue(), true);
- else if (_isUnique)
+ else if (this.ctx.isUnique())
sel.setExpectedResultCount(1, false);
for (int j = 0; j < verts.length; j++) {
selMappings.add(verts[j]);
@@ -430,7 +424,7 @@
long end) {
if (exps.projections.length > 0 || start >= end)
return EagerFetchModes.EAGER_NONE;
- if (end - start == 1 || _isUnique)
+ if (end - start == 1 || ctx.isUnique())
return EagerFetchModes.EAGER_JOIN;
return EagerFetchModes.EAGER_PARALLEL;
}
Modified: openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java (original)
+++ openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java Thu Dec 18 16:08:23 2008
@@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.Status;
@@ -4161,7 +4162,7 @@
///////////////////
public void lock() {
- if (_lock != null)
+ if (_lock != null)
_lock.lock();
}
@@ -4169,6 +4170,24 @@
if (_lock != null)
_lock.unlock();
}
+
+ /**
+ * Creates a locks irrespective of multithreaded support. Used only by
+ * internal implementation to guard access when it spawns its own threads
+ * and user configured the broker for single-threaded access.
+ */
+ public synchronized void startLocking() {
+ if (_lock == null)
+ _lock = new ReentrantLock();
+ }
+
+ /**
+ * Destroys the lock if not multithreaded support.
+ */
+ public synchronized void stopLocking() {
+ if (_lock != null && !getMultithreaded())
+ _lock = null;
+ }
////////////////////
// State management
Modified: openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java (original)
+++ openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java Thu Dec 18 16:08:23 2008
@@ -84,7 +84,7 @@
private transient ClassLoader _loader = null;
// query has its own internal lock
- private final ReentrantLock _lock;
+ private ReentrantLock _lock;
// unparsed state
private Class _class = null;
@@ -138,8 +138,6 @@
if (_broker != null && _broker.getMultithreaded())
_lock = new ReentrantLock();
- else
- _lock = null;
}
/**
@@ -458,8 +456,8 @@
// no explicit setting; default
StoreQuery.Executor ex = compileForExecutor();
if (!ex.isAggregate(_storeQuery))
- return _unique = false;
- return _unique = !ex.hasGrouping(_storeQuery);
+ return false;
+ return !ex.hasGrouping(_storeQuery);
} finally {
unlock();
}
@@ -1553,9 +1551,22 @@
}
public void unlock() {
- if (_lock != null && _lock.isLocked())
+ if (_lock != null)
_lock.unlock();
}
+
+ public synchronized void startLocking() {
+ if (_lock == null) {
+ _lock = new ReentrantLock();
+ }
+ }
+
+ public synchronized void stopLocking() {
+ if (_lock != null && !_broker.getMultithreaded())
+ _lock = null;
+ }
+
+
/////////
// Utils
Modified: openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java (original)
+++ openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java Thu Dec 18 16:08:23 2008
@@ -37,6 +37,8 @@
import javax.persistence.FlushModeType;
import javax.persistence.LockModeType;
+import javax.persistence.NoResultException;
+import javax.persistence.NonUniqueResultException;
import javax.persistence.Query;
import javax.persistence.TemporalType;
@@ -444,15 +446,14 @@
*/
public Object getSingleResult() {
_em.assertNotCloseInvoked();
- // temporarily set query to unique so that a single result is validated
- // and returned; unset again in case the user executes query again
- // via getResultList
- _query.setUnique(true);
- try {
- return execute();
- } finally {
- _query.setUnique(false);
- }
+ List result = getResultList();
+ if (result == null || result.isEmpty())
+ throw new NoResultException(_loc.get("no-result", getQueryString())
+ .getMessage());
+ if (result.size() > 1)
+ throw new NonUniqueResultException(_loc.get("non-unique-result",
+ getQueryString(), result.size()).getMessage());
+ return result.get(0);
}
public int executeUpdate() {
Modified: openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties (original)
+++ openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties Thu Dec 18 16:08:23 2008
@@ -163,4 +163,5 @@
but this parameter is bound to a field of primitive type "{2}".
version-check-error: An error occurred while attempting to determine the \
version of "{0}".
-
\ No newline at end of file
+no-result: Query "{0}" selected no result, but expected unique result.
+non-unique-result: Query "{0}" selected {1} results, but expected unique result.
\ No newline at end of file
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java Thu Dec 18 16:08:23 2008
@@ -91,4 +91,11 @@
}
return true;
}
+
+ /**
+ * A virtual datastore need not be opened.
+ */
+ @Override
+ public void beginStore() {
+ }
}
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java Thu Dec 18 16:08:23 2008
@@ -23,6 +23,10 @@
import org.apache.openjpa.conf.OpenJPAProductDerivation;
import org.apache.openjpa.lib.conf.AbstractProductDerivation;
import org.apache.openjpa.lib.conf.Configuration;
+import org.apache.openjpa.lib.conf.PluginValue;
+import org.apache.openjpa.lib.conf.Value;
+import org.apache.openjpa.lib.log.Log;
+import org.apache.openjpa.lib.util.Localizer;
import org.apache.openjpa.slice.jdbc.DistributedJDBCBrokerFactory;
import org.apache.openjpa.slice.jdbc.DistributedJDBCConfigurationImpl;
@@ -37,10 +41,12 @@
*/
public class ProductDerivation extends AbstractProductDerivation implements
OpenJPAProductDerivation {
+ private static final Localizer _loc =
+ Localizer.forPackage(ProductDerivation.class);
/**
* Prefix for all Slice-specific configuration properties.
*/
- public static final String PREFIX_SLICE = "openjpa.slice";
+ public static final String PREFIX_SLICE = "openjpa.slice";
/**
* Hint key <code>openjpa.hint.slice.Target </code> to specify a subset of
@@ -74,14 +80,22 @@
DistributedJDBCConfigurationImpl conf =
(DistributedJDBCConfigurationImpl)c;
boolean modified = false;
+ Log log = conf.getConfigurationLog();
if (conf.getDistributionPolicyInstance() == null) {
- conf.distributionPolicyPlugin.setString("random");
+ forceSet(PREFIX_SLICE, conf.distributionPolicyPlugin,"random", log);
modified = true;
}
if (conf.getReplicationPolicyInstance() == null) {
- conf.replicationPolicyPlugin.setString("all");
+ forceSet(PREFIX_SLICE, conf.replicationPolicyPlugin, "all", log);
modified = true;
}
return modified;
}
+
+ void forceSet(String prefix, Value v, String forced, Log log) {
+ v.setString(forced);
+ if (log.isWarnEnabled())
+ log.warn(_loc.get("forced-set-config",
+ prefix+"."+v.getProperty(), forced));
+ }
}
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java Thu Dec 18 16:08:23 2008
@@ -39,6 +39,7 @@
import org.apache.openjpa.jdbc.kernel.JDBCStoreManager;
import org.apache.openjpa.jdbc.sql.Result;
import org.apache.openjpa.jdbc.sql.ResultSetResult;
+import org.apache.openjpa.kernel.BrokerImpl;
import org.apache.openjpa.kernel.FetchConfiguration;
import org.apache.openjpa.kernel.OpenJPAStateManager;
import org.apache.openjpa.kernel.PCState;
@@ -242,7 +243,7 @@
List<Future<Collection>> futures = new ArrayList<Future<Collection>>();
Map<String, List<OpenJPAStateManager>> subsets = bin(sms, null);
- boolean serialMode = getConfiguration().getMultithreaded();
+ boolean parallel = !getConfiguration().getMultithreaded();
for (SliceStoreManager slice : _slices) {
List<OpenJPAStateManager> subset = subsets.get(slice.getName());
if (subset.isEmpty())
@@ -250,14 +251,14 @@
if (containsReplicated(subset)) {
collectException(slice.flush(subset), exceptions);
} else {
- if (serialMode) {
- collectException(slice.flush(subset), exceptions);
- } else {
+ if (parallel) {
futures.add(threadPool.submit(new Flusher(slice, subset)));
+ } else {
+ collectException(slice.flush(subset), exceptions);
}
}
}
- if (!serialMode) {
+ if (parallel) {
for (Future<Collection> future : futures) {
try {
collectException(future.get(), exceptions);
@@ -459,7 +460,12 @@
}
public Collection call() throws Exception {
- return store.flush(toFlush);
+ ((BrokerImpl)store.getContext()).startLocking();
+ try {
+ return store.flush(toFlush);
+ } finally {
+ ((BrokerImpl)store.getContext()).stopLocking();
+ }
}
}
Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java Thu Dec 18 16:08:23 2008
@@ -28,10 +28,12 @@
import org.apache.openjpa.jdbc.kernel.JDBCStore;
import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
+import org.apache.openjpa.kernel.BrokerImpl;
import org.apache.openjpa.kernel.ExpressionStoreQuery;
import org.apache.openjpa.kernel.FetchConfiguration;
import org.apache.openjpa.kernel.OrderingMergedResultObjectProvider;
import org.apache.openjpa.kernel.QueryContext;
+import org.apache.openjpa.kernel.QueryImpl;
import org.apache.openjpa.kernel.StoreManager;
import org.apache.openjpa.kernel.StoreQuery;
import org.apache.openjpa.kernel.exps.ExpressionParser;
@@ -44,272 +46,292 @@
/**
* A query for distributed databases.
*
- * @author Pinaki Poddar
- *
+ * @author Pinaki Poddar
+ *
*/
@SuppressWarnings("serial")
class DistributedStoreQuery extends JDBCStoreQuery {
private List<StoreQuery> _queries = new ArrayList<StoreQuery>();
private ExpressionParser _parser;
- private boolean _serialMode;
-
+
public DistributedStoreQuery(JDBCStore store, ExpressionParser parser) {
super(store, parser);
_parser = parser;
- _serialMode = store.getContext().getConfiguration().getMultithreaded();
-
}
-
+
void add(StoreQuery q) {
_queries.add(q);
}
-
+
public DistributedStoreManager getDistributedStore() {
- return (DistributedStoreManager)getStore();
+ return (DistributedStoreManager) getStore();
+ }
+
+ public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
+ boolean parallel = !getContext().getStoreContext().getBroker()
+ .getMultithreaded();
+ ParallelExecutor ex = new ParallelExecutor(this, meta, subs, _parser,
+ ctx.getCompilation(), parallel);
+ for (StoreQuery q : _queries) {
+ ex.addExecutor(q.newDataStoreExecutor(meta, subs));
+ }
+ return ex;
+ }
+
+ public void setContext(QueryContext ctx) {
+ super.setContext(ctx);
+ for (StoreQuery q : _queries)
+ q.setContext(ctx);
+ }
+
+ public ExecutorService getExecutorServiceInstance() {
+ DistributedJDBCConfiguration conf = ((DistributedJDBCConfiguration)
+ getStore().getConfiguration());
+ return conf.getExecutorServiceInstance();
}
-
- public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
- ParallelExecutor ex = new ParallelExecutor(this, meta, subs, _parser,
- ctx.getCompilation(), _serialMode);
- for (StoreQuery q : _queries) {
- ex.addExecutor(q.newDataStoreExecutor(meta, subs));
- }
- return ex;
- }
-
- public void setContext(QueryContext ctx) {
- super.setContext(ctx);
- for (StoreQuery q : _queries)
- q.setContext(ctx);
- }
-
- public ExecutorService getExecutorServiceInstance() {
- DistributedJDBCConfiguration conf =
- ((DistributedJDBCConfiguration)getStore().getConfiguration());
- return conf.getExecutorServiceInstance();
- }
-
+
/**
* Executes queries on multiple databases.
*
- * @author Pinaki Poddar
- *
+ * @author Pinaki Poddar
+ *
*/
- public static class ParallelExecutor extends
- ExpressionStoreQuery.DataStoreExecutor {
+ public static class ParallelExecutor extends
+ ExpressionStoreQuery.DataStoreExecutor {
private List<Executor> executors = new ArrayList<Executor>();
private DistributedStoreQuery owner = null;
private ExecutorService threadPool = null;
- private final boolean serialMode;
-
- public ParallelExecutor(DistributedStoreQuery dsq, ClassMetaData meta,
- boolean subclasses, ExpressionParser parser, Object parsed,
- boolean serial) {
- super(dsq, meta, subclasses, parser, parsed);
- owner = dsq;
- threadPool = dsq.getExecutorServiceInstance();
- serialMode = false;//serial;
- }
-
+ private final boolean parallel;
+
+ public ParallelExecutor(DistributedStoreQuery dsq, ClassMetaData meta,
+ boolean subclasses, ExpressionParser parser, Object parsed,
+ boolean parallel) {
+ super(dsq, meta, subclasses, parser, parsed);
+ owner = dsq;
+ threadPool = dsq.getExecutorServiceInstance();
+ this.parallel = parallel;
+ }
+
public void addExecutor(Executor ex) {
executors.add(ex);
}
-
- /**
- * Each child query must be executed with slice context and not the
- * given query context.
- */
- public ResultObjectProvider executeQuery(StoreQuery q,
- final Object[] params, final Range range) {
- List<Future<ResultObjectProvider>> futures = null;
- final List<Executor> usedExecutors = new ArrayList<Executor>();
- final List<ResultObjectProvider> rops =
- new ArrayList<ResultObjectProvider>();
- List<SliceStoreManager> targets = findTargets();
- QueryContext ctx = q.getContext();
- boolean isReplicated = containsReplicated(ctx);
- for (int i = 0; i < owner._queries.size(); i++) {
- // if replicated, then execute only on single slice
- if (i > 0 && isReplicated) {
- continue;
- }
- StoreManager sm = owner.getDistributedStore().getSlice(i);
- if (!targets.contains(sm))
- continue;
- StoreQuery query = owner._queries.get(i);
- Executor executor = executors.get(i);
- if (!targets.contains(sm))
- continue;
- usedExecutors.add(executor);
- if (serialMode) {
- rops.add(executor.executeQuery(query, params, range));
- } else {
- if (futures == null)
- futures = new ArrayList<Future<ResultObjectProvider>>();
- QueryExecutor call = new QueryExecutor();
- call.executor = executor;
- call.query = query;
- call.params = params;
- call.range = range;
- futures.add(threadPool.submit(call));
- }
- }
- if (!serialMode) {
- for (Future<ResultObjectProvider> future:futures) {
- try {
+
+ /**
+ * Each child query must be executed with slice context and not the
+ * given query context.
+ */
+ public ResultObjectProvider executeQuery(StoreQuery q,
+ final Object[] params, final Range range) {
+ List<Future<ResultObjectProvider>> futures =
+ new ArrayList<Future<ResultObjectProvider>>();
+ final List<Executor> usedExecutors = new ArrayList<Executor>();
+ final List<ResultObjectProvider> rops =
+ new ArrayList<ResultObjectProvider>();
+ List<SliceStoreManager> targets = findTargets();
+ QueryContext ctx = q.getContext();
+ boolean isReplicated = containsReplicated(ctx);
+ for (int i = 0; i < owner._queries.size(); i++) {
+ // if replicated, then execute only on single slice
+ if (i > 0 && isReplicated) {
+ continue;
+ }
+ StoreManager sm = owner.getDistributedStore().getSlice(i);
+ if (!targets.contains(sm))
+ continue;
+ StoreQuery query = owner._queries.get(i);
+ Executor executor = executors.get(i);
+ if (!targets.contains(sm))
+ continue;
+ usedExecutors.add(executor);
+ if (!parallel) {
+ rops.add(executor.executeQuery(query, params, range));
+ } else {
+ QueryExecutor call = new QueryExecutor();
+ call.executor = executor;
+ call.query = query;
+ call.params = params;
+ call.range = range;
+ futures.add(threadPool.submit(call));
+ }
+
+ }
+ if (parallel) {
+ for (Future<ResultObjectProvider> future : futures) {
+ try {
rops.add(future.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new StoreException(e.getCause());
}
- }
- }
- ResultObjectProvider[] tmp = rops.toArray
- (new ResultObjectProvider[rops.size()]);
- ResultObjectProvider result = null;
- boolean[] ascending = getAscending(q);
- boolean isAscending = ascending.length > 0;
- boolean isAggregate = ctx.isAggregate();
- boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE;
- if (isAggregate) {
- result = new UniqueResultObjectProvider(tmp, q,
- getQueryExpressions());
- } else if (isAscending) {
- result = new OrderingMergedResultObjectProvider(tmp, ascending,
- usedExecutors.toArray(new Executor[usedExecutors.size()]),
- q, params);
- } else {
- result = new MergedResultObjectProvider(tmp);
- }
- if (hasRange) {
- result = new RangeResultObjectProvider(result,
- ctx.getStartRange(), ctx.getEndRange());
- }
- return result;
- }
-
- /**
+ }
+ }
+ ResultObjectProvider[] tmp = rops
+ .toArray(new ResultObjectProvider[rops.size()]);
+ ResultObjectProvider result = null;
+ boolean[] ascending = getAscending(q);
+ boolean isAscending = ascending.length > 0;
+ boolean isAggregate = ctx.isAggregate();
+ boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE;
+ if (isAggregate) {
+ result = new UniqueResultObjectProvider(tmp, q,
+ getQueryExpressions());
+ } else if (isAscending) {
+ result = new OrderingMergedResultObjectProvider(tmp, ascending,
+ usedExecutors.toArray(new Executor[usedExecutors.size()]),
+ q, params);
+ } else {
+ result = new MergedResultObjectProvider(tmp);
+ }
+ if (hasRange) {
+ result = new RangeResultObjectProvider(result, ctx
+ .getStartRange(), ctx.getEndRange());
+ }
+ return result;
+ }
+
+ /**
* Scans metadata to find out if a replicated class is the candidate.
- **/
- boolean containsReplicated(QueryContext query) {
- Class candidate = query.getCandidateType();
- if (candidate != null) {
- ClassMetaData meta = query.getStoreContext().getConfiguration()
- .getMetaDataRepositoryInstance()
- .getMetaData(candidate, null, true);
- if (meta != null && meta.isReplicated())
- return true;
- }
- ClassMetaData[] metas = query.getAccessPathMetaDatas();
- if (metas == null || metas.length < 1)
- return false;
- for (ClassMetaData type : metas)
- if (type.isReplicated())
- return true;
- return false;
- }
-
- public Number executeDelete(StoreQuery q, Object[] params) {
- Iterator<StoreQuery> qs = owner._queries.iterator();
- List<Future<Number>> futures = null;
- int result = 0;
- for (Executor ex:executors) {
- if (serialMode) {
- Number n = ex.executeDelete(qs.next(), params);
- if (n != null)
- result += n.intValue();
- } else {
- if (futures == null)
- futures = new ArrayList<Future<Number>>();
- DeleteExecutor call = new DeleteExecutor();
- call.executor = ex;
- call.query = qs.next();
- call.params = params;
- futures.add(threadPool.submit(call));
- }
- }
- if (!serialMode) {
- for (Future<Number> future:futures) {
- try {
- Number n = future.get();
- if (n != null)
- result += n.intValue();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new StoreException(e.getCause());
- }
- }
- }
- return result;
- }
-
- public Number executeUpdate(StoreQuery q, Object[] params) {
- Iterator<StoreQuery> qs = owner._queries.iterator();
- List<Future<Number>> futures = null;
- int result = 0;
- for (Executor ex:executors) {
- if (serialMode) {
- Number n = ex.executeUpdate(qs.next(), params);
- result += (n == null) ? 0 : n.intValue();
- } else {
- if (futures == null)
- futures = new ArrayList<Future<Number>>();
- UpdateExecutor call = new UpdateExecutor();
- call.executor = ex;
- call.query = qs.next();
- call.params = params;
- futures.add(threadPool.submit(call));
- }
- }
- if (serialMode) {
- for (Future<Number> future:futures) {
- try {
- Number n = future.get();
- result += (n == null) ? 0 : n.intValue();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new StoreException(e.getCause());
- }
- }
- }
- return result;
- }
-
- List<SliceStoreManager> findTargets() {
- FetchConfiguration fetch = owner.getContext().getFetchConfiguration();
- return owner.getDistributedStore().getTargets(fetch);
- }
+ */
+ boolean containsReplicated(QueryContext query) {
+ Class candidate = query.getCandidateType();
+ if (candidate != null) {
+ ClassMetaData meta = query.getStoreContext().getConfiguration()
+ .getMetaDataRepositoryInstance().getMetaData(candidate,
+ null, true);
+ if (meta != null && meta.isReplicated())
+ return true;
+ }
+ ClassMetaData[] metas = query.getAccessPathMetaDatas();
+ if (metas == null || metas.length < 1)
+ return false;
+ for (ClassMetaData type : metas)
+ if (type.isReplicated())
+ return true;
+ return false;
+ }
+
+ public Number executeDelete(StoreQuery q, Object[] params) {
+ Iterator<StoreQuery> qs = owner._queries.iterator();
+ List<Future<Number>> futures = null;
+ int result = 0;
+ for (Executor ex : executors) {
+ if (futures == null)
+ futures = new ArrayList<Future<Number>>();
+ DeleteExecutor call = new DeleteExecutor();
+ call.executor = ex;
+ call.query = qs.next();
+ call.params = params;
+ futures.add(threadPool.submit(call));
+ }
+ for (Future<Number> future : futures) {
+ try {
+ Number n = future.get();
+ if (n != null)
+ result += n.intValue();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new StoreException(e.getCause());
+ }
+ }
+ return result;
+ }
+
+ public Number executeUpdate(StoreQuery q, Object[] params) {
+ Iterator<StoreQuery> qs = owner._queries.iterator();
+ List<Future<Number>> futures = null;
+ int result = 0;
+ for (Executor ex : executors) {
+ if (futures == null)
+ futures = new ArrayList<Future<Number>>();
+ UpdateExecutor call = new UpdateExecutor();
+ call.executor = ex;
+ call.query = qs.next();
+ call.params = params;
+ futures.add(threadPool.submit(call));
+ }
+ for (Future<Number> future : futures) {
+ try {
+ Number n = future.get();
+ result += (n == null) ? 0 : n.intValue();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new StoreException(e.getCause());
+ }
+ }
+ return result;
+ }
+
+ List<SliceStoreManager> findTargets() {
+ FetchConfiguration fetch = owner.getContext()
+ .getFetchConfiguration();
+ return owner.getDistributedStore().getTargets(fetch);
+ }
}
-
- static class QueryExecutor implements Callable<ResultObjectProvider> {
+
+ static class QueryExecutor implements Callable<ResultObjectProvider> {
StoreQuery query;
Executor executor;
Object[] params;
Range range;
+
public ResultObjectProvider call() throws Exception {
- return executor.executeQuery(query, params, range);
+ ((QueryImpl)query.getContext()).startLocking();
+ ((BrokerImpl)query.getContext().getStoreContext()).startLocking();
+ ((QueryImpl)query.getContext()).lock();
+ ((BrokerImpl)query.getContext().getStoreContext()).lock();
+ try {
+ return executor.executeQuery(query, params, range);
+ } finally {
+ ((QueryImpl)query.getContext()).unlock();
+ ((BrokerImpl)query.getContext().getStoreContext()).unlock();
+ ((QueryImpl)query.getContext()).stopLocking();
+ ((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
+ }
}
}
-
- static class DeleteExecutor implements Callable<Number> {
+
+ static class DeleteExecutor implements Callable<Number> {
StoreQuery query;
Executor executor;
Object[] params;
+
public Number call() throws Exception {
- return executor.executeDelete(query, params);
+ ((QueryImpl)query.getContext()).startLocking();
+ ((BrokerImpl)query.getContext().getStoreContext()).startLocking();
+ ((QueryImpl)query.getContext()).lock();
+ ((BrokerImpl)query.getContext().getStoreContext()).lock();
+ try {
+ return executor.executeDelete(query, params);
+ } finally {
+ ((QueryImpl)query.getContext()).unlock();
+ ((BrokerImpl)query.getContext().getStoreContext()).unlock();
+ ((QueryImpl)query.getContext()).stopLocking();
+ ((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
+ }
}
}
-
- static class UpdateExecutor implements Callable<Number> {
+
+ static class UpdateExecutor implements Callable<Number> {
StoreQuery query;
Executor executor;
Object[] params;
+
public Number call() throws Exception {
- return executor.executeUpdate(query, params);
+ ((QueryImpl)query.getContext()).startLocking();
+ ((BrokerImpl)query.getContext().getStoreContext()).startLocking();
+ ((QueryImpl)query.getContext()).lock();
+ ((BrokerImpl)query.getContext().getStoreContext()).lock();
+ try {
+ return executor.executeUpdate(query, params);
+ } finally {
+ ((QueryImpl)query.getContext()).unlock();
+ ((BrokerImpl)query.getContext().getStoreContext()).unlock();
+ ((QueryImpl)query.getContext()).stopLocking();
+ ((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
+ }
}
}
}
-
Modified: openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties (original)
+++ openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties Thu Dec 18 16:08:23 2008
@@ -17,4 +17,10 @@
bad-policy-slice:Distribution policy "{0}" has returned invalid slice \
"{1}" for "{2}". The valid slices are {3}. This error may happen \
when one or more of the originally configured slices are unavailable \
- and Lenient property is set to true.
\ No newline at end of file
+ and Lenient property is set to true.
+forced-set-config: Configuration property "{0}" is not set explicitly. Setting \
+ this value to "{1}".
+multithreaded-false: Configuration property "{0}" is set to "false". \
+ It is recommended to set "{0}" to "true", because Slice executes database \
+ operations per slice in parallel in different threads, setting "{0}" to \
+ "false" may cause unpredictable behavior.
\ No newline at end of file
Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java Thu Dec 18 16:08:23 2008
@@ -218,7 +218,7 @@
* Disable this test temporarily as we undergo changes in internal slice
* information structure.
*/
- public void xtestUpdateReplicatedObjects() {
+ public void testUpdateReplicatedObjects() {
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
String[] names = {"USA", "India", "China"};
Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java Thu Dec 18 16:08:23 2008
@@ -18,6 +18,9 @@
*/
package org.apache.openjpa.slice;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@@ -25,6 +28,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.persistence.EntityManager;
@@ -42,7 +48,7 @@
private int POBJECT_COUNT = 25;
private int VALUE_MIN = 100;
private int VALUE_MAX = VALUE_MIN + POBJECT_COUNT - 1;
- private static int THREADS = 3;
+ private static int THREADS = 5;
private ExecutorService group;
private Future[] futures;
@@ -57,7 +63,14 @@
if (count == 0) {
create(POBJECT_COUNT);
}
- group = Executors.newCachedThreadPool();
+ group = new ThreadPoolExecutor(THREADS, THREADS,
+ 60, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r);
+ }
+
+ });
futures = new Future[THREADS];
}
@@ -293,8 +306,9 @@
f.get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
- t.getCause().printStackTrace();
- fail("Failed " + t.getCause());
+ StringWriter writer = new StringWriter();
+ t.printStackTrace(new PrintWriter(writer));
+ fail("Failed " + writer.toString());
}
} catch (InterruptedException e) {