You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openjpa.apache.org by pp...@apache.org on 2008/12/19 01:08:24 UTC

svn commit: r727864 - in /openjpa/trunk: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/ openjpa-kernel/src/main/java/org/apache/openjpa/kernel/ openjpa-persistence/src/main/java/org/apache/openjpa/persistence/ openjpa-persistence/src/main/r...

Author: ppoddar
Date: Thu Dec 18 16:08:23 2008
New Revision: 727864

URL: http://svn.apache.org/viewvc?rev=727864&view=rev
Log:
OPENJPA-825: Introduced internal locking for shared contexts (BrokerImpl/QueryImpl).

Modified:
    openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
    openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
    openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java
    openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
    openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
    openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties
    openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
    openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java

Modified: openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java (original)
+++ openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java Thu Dec 18 16:08:23 2008
@@ -84,7 +84,6 @@
 public class JDBCStoreQuery 
     extends ExpressionStoreQuery {
 
-	private boolean _isUnique = false;
     private static final Table INVALID = new Table();
 
     // add all standard filter and aggregate listeners to these maps
@@ -112,11 +111,6 @@
         _store = store;
     }
 
-    @Override
-    public void setContext(QueryContext ctx) {
-    	super.setContext(ctx);
-    	_isUnique = ctx.isUnique();
-    }
     /**
      * Return the store.
      */
@@ -348,7 +342,7 @@
                 evaluate(ctx, null, null, exps[i], states[i]);
             if (optHint != null)
                sel.setExpectedResultCount(optHint.intValue(), true);
-            else if (_isUnique)
+            else if (this.ctx.isUnique())
                 sel.setExpectedResultCount(1, false);
             for (int j = 0; j < verts.length; j++) {
                 selMappings.add(verts[j]);
@@ -430,7 +424,7 @@
         long end) {
         if (exps.projections.length > 0 || start >= end)
             return EagerFetchModes.EAGER_NONE;
-        if (end - start == 1 || _isUnique)
+        if (end - start == 1 || ctx.isUnique())
             return EagerFetchModes.EAGER_JOIN;
         return EagerFetchModes.EAGER_PARALLEL;
     }

Modified: openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java (original)
+++ openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java Thu Dec 18 16:08:23 2008
@@ -37,6 +37,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.LockSupport;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.transaction.Status;
@@ -4161,7 +4162,7 @@
     ///////////////////
 
     public void lock() {
-        if (_lock != null)
+        if (_lock != null) 
             _lock.lock();
     }
 
@@ -4169,6 +4170,24 @@
         if (_lock != null)
             _lock.unlock();
     }
+    
+    /**
+     * Creates a locks irrespective of multithreaded support. Used only by 
+     * internal implementation to guard access when it spawns its own threads 
+     * and user configured the broker for single-threaded access. 
+     */
+    public synchronized void startLocking() {
+    	if (_lock == null)
+    		_lock = new ReentrantLock();
+    }
+    
+    /**
+     * Destroys the lock if not multithreaded support. 
+     */
+    public synchronized void stopLocking() {
+    	if (_lock != null && !getMultithreaded())
+    		_lock = null;
+    }
 
     ////////////////////
     // State management

Modified: openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java (original)
+++ openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java Thu Dec 18 16:08:23 2008
@@ -84,7 +84,7 @@
     private transient ClassLoader _loader = null;
 
     // query has its own internal lock
-    private final ReentrantLock _lock;
+    private ReentrantLock _lock;
 
     // unparsed state
     private Class _class = null;
@@ -138,8 +138,6 @@
 
         if (_broker != null && _broker.getMultithreaded())
             _lock = new ReentrantLock();
-        else
-            _lock = null;
     }
 
     /**
@@ -458,8 +456,8 @@
             // no explicit setting; default
             StoreQuery.Executor ex = compileForExecutor();
             if (!ex.isAggregate(_storeQuery))
-                return _unique = false;
-            return _unique = !ex.hasGrouping(_storeQuery);
+                return false;
+            return !ex.hasGrouping(_storeQuery);
         } finally {
             unlock();
         }
@@ -1553,9 +1551,22 @@
     }
 
     public void unlock() {
-        if (_lock != null && _lock.isLocked())
+        if (_lock != null)
             _lock.unlock();
     }
+    
+    public synchronized void startLocking() {
+    	if (_lock == null) {
+    		_lock = new ReentrantLock();
+    	}
+    }
+    
+    public synchronized void stopLocking() {
+    	if (_lock != null && !_broker.getMultithreaded())
+    		_lock = null;
+    }
+    
+    
 
     /////////
     // Utils

Modified: openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java (original)
+++ openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java Thu Dec 18 16:08:23 2008
@@ -37,6 +37,8 @@
 
 import javax.persistence.FlushModeType;
 import javax.persistence.LockModeType;
+import javax.persistence.NoResultException;
+import javax.persistence.NonUniqueResultException;
 import javax.persistence.Query;
 import javax.persistence.TemporalType;
 
@@ -444,15 +446,14 @@
 	 */
 	public Object getSingleResult() {
 		_em.assertNotCloseInvoked();
-		// temporarily set query to unique so that a single result is validated
-		// and returned; unset again in case the user executes query again
-		// via getResultList
-		_query.setUnique(true);
-		try {
-			return execute();
-		} finally {
-			_query.setUnique(false);
-		}
+		List result = getResultList();
+		if (result == null || result.isEmpty())
+			throw new NoResultException(_loc.get("no-result", getQueryString())
+				.getMessage());
+		if (result.size() > 1)
+			throw new NonUniqueResultException(_loc.get("non-unique-result",
+				getQueryString(), result.size()).getMessage());
+		return result.get(0);
 	}
 
 	public int executeUpdate() {

Modified: openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties (original)
+++ openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties Thu Dec 18 16:08:23 2008
@@ -163,4 +163,5 @@
 	but this parameter is bound to a field of primitive type "{2}".
 version-check-error: An error occurred while attempting to determine the \
     version of "{0}".
-	
\ No newline at end of file
+no-result: Query "{0}" selected no result, but expected unique result.
+non-unique-result: Query "{0}" selected {1} results, but expected unique result.
\ No newline at end of file

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java Thu Dec 18 16:08:23 2008
@@ -91,4 +91,11 @@
 	    }
 	    return true;
 	}
+	
+	/**
+	 * A virtual datastore need not be opened.
+	 */
+	@Override
+	public void beginStore() {
+	}
 }

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java Thu Dec 18 16:08:23 2008
@@ -23,6 +23,10 @@
 import org.apache.openjpa.conf.OpenJPAProductDerivation;
 import org.apache.openjpa.lib.conf.AbstractProductDerivation;
 import org.apache.openjpa.lib.conf.Configuration;
+import org.apache.openjpa.lib.conf.PluginValue;
+import org.apache.openjpa.lib.conf.Value;
+import org.apache.openjpa.lib.log.Log;
+import org.apache.openjpa.lib.util.Localizer;
 import org.apache.openjpa.slice.jdbc.DistributedJDBCBrokerFactory;
 import org.apache.openjpa.slice.jdbc.DistributedJDBCConfigurationImpl;
 
@@ -37,10 +41,12 @@
  */
 public class ProductDerivation extends AbstractProductDerivation implements
 		OpenJPAProductDerivation {
+	private static final Localizer _loc = 
+		Localizer.forPackage(ProductDerivation.class);
     /**
      * Prefix for all Slice-specific configuration properties. 
      */
-    public static final String PREFIX_SLICE = "openjpa.slice";
+    public static final String PREFIX_SLICE   = "openjpa.slice";
     
     /**
      * Hint key <code>openjpa.hint.slice.Target </code> to specify a subset of 
@@ -74,14 +80,22 @@
         DistributedJDBCConfigurationImpl conf = 
         	(DistributedJDBCConfigurationImpl)c;
         boolean modified = false;
+        Log log = conf.getConfigurationLog();
         if (conf.getDistributionPolicyInstance() == null) {
-        	conf.distributionPolicyPlugin.setString("random");
+        	forceSet(PREFIX_SLICE, conf.distributionPolicyPlugin,"random", log);
         	modified = true;
         }
         if (conf.getReplicationPolicyInstance() == null) {
-        	conf.replicationPolicyPlugin.setString("all");
+        	forceSet(PREFIX_SLICE, conf.replicationPolicyPlugin, "all", log);
         	modified = true;
         }
         return modified;
     }
+    
+    void forceSet(String prefix, Value v, String forced, Log log) {
+    	v.setString(forced);
+    	if (log.isWarnEnabled())
+        	log.warn(_loc.get("forced-set-config", 
+        		prefix+"."+v.getProperty(), forced));
+    }
 }

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java Thu Dec 18 16:08:23 2008
@@ -39,6 +39,7 @@
 import org.apache.openjpa.jdbc.kernel.JDBCStoreManager;
 import org.apache.openjpa.jdbc.sql.Result;
 import org.apache.openjpa.jdbc.sql.ResultSetResult;
+import org.apache.openjpa.kernel.BrokerImpl;
 import org.apache.openjpa.kernel.FetchConfiguration;
 import org.apache.openjpa.kernel.OpenJPAStateManager;
 import org.apache.openjpa.kernel.PCState;
@@ -242,7 +243,7 @@
         List<Future<Collection>> futures = new ArrayList<Future<Collection>>();
         Map<String, List<OpenJPAStateManager>> subsets = bin(sms, null);
         
-        boolean serialMode = getConfiguration().getMultithreaded();
+        boolean parallel = !getConfiguration().getMultithreaded();
         for (SliceStoreManager slice : _slices) {
             List<OpenJPAStateManager> subset = subsets.get(slice.getName());
             if (subset.isEmpty())
@@ -250,14 +251,14 @@
             if (containsReplicated(subset)) {
             	collectException(slice.flush(subset), exceptions);
             } else {
-            	if (serialMode) {
-                	collectException(slice.flush(subset), exceptions);
-            	} else {
+            	if (parallel) {
             		futures.add(threadPool.submit(new Flusher(slice, subset)));
+            	} else {
+                	collectException(slice.flush(subset), exceptions);
             	}
             }
         }
-        if (!serialMode) {
+        if (parallel) {
 	        for (Future<Collection> future : futures) {
 	            try {
 	            	collectException(future.get(), exceptions);
@@ -459,7 +460,12 @@
         }
 
         public Collection call() throws Exception {
-            return store.flush(toFlush);
+        	((BrokerImpl)store.getContext()).startLocking();
+        	try {
+        		return store.flush(toFlush);
+        	} finally {
+            	((BrokerImpl)store.getContext()).stopLocking();
+        	}
         }
     }
 

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java (original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java Thu Dec 18 16:08:23 2008
@@ -28,10 +28,12 @@
 
 import org.apache.openjpa.jdbc.kernel.JDBCStore;
 import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
+import org.apache.openjpa.kernel.BrokerImpl;
 import org.apache.openjpa.kernel.ExpressionStoreQuery;
 import org.apache.openjpa.kernel.FetchConfiguration;
 import org.apache.openjpa.kernel.OrderingMergedResultObjectProvider;
 import org.apache.openjpa.kernel.QueryContext;
+import org.apache.openjpa.kernel.QueryImpl;
 import org.apache.openjpa.kernel.StoreManager;
 import org.apache.openjpa.kernel.StoreQuery;
 import org.apache.openjpa.kernel.exps.ExpressionParser;
@@ -44,272 +46,292 @@
 /**
  * A query for distributed databases.
  * 
- * @author Pinaki Poddar 
- *
+ * @author Pinaki Poddar
+ * 
  */
 @SuppressWarnings("serial")
 class DistributedStoreQuery extends JDBCStoreQuery {
 	private List<StoreQuery> _queries = new ArrayList<StoreQuery>();
 	private ExpressionParser _parser;
-	private boolean _serialMode;
-	
+
 	public DistributedStoreQuery(JDBCStore store, ExpressionParser parser) {
 		super(store, parser);
 		_parser = parser;
-		_serialMode = store.getContext().getConfiguration().getMultithreaded();
-		
 	}
-	
+
 	void add(StoreQuery q) {
 		_queries.add(q);
 	}
-	
+
 	public DistributedStoreManager getDistributedStore() {
-		return (DistributedStoreManager)getStore();
+		return (DistributedStoreManager) getStore();
+	}
+
+	public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
+		boolean parallel = !getContext().getStoreContext().getBroker()
+			.getMultithreaded();
+		ParallelExecutor ex = new ParallelExecutor(this, meta, subs, _parser, 
+			ctx.getCompilation(), parallel);
+		for (StoreQuery q : _queries) {
+			ex.addExecutor(q.newDataStoreExecutor(meta, subs));
+		}
+		return ex;
+	}
+
+	public void setContext(QueryContext ctx) {
+		super.setContext(ctx);
+		for (StoreQuery q : _queries)
+			q.setContext(ctx);
+	}
+
+	public ExecutorService getExecutorServiceInstance() {
+		DistributedJDBCConfiguration conf = ((DistributedJDBCConfiguration) 
+			getStore().getConfiguration());
+		return conf.getExecutorServiceInstance();
 	}
-	
-    public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
-    	ParallelExecutor ex = new ParallelExecutor(this, meta, subs, _parser, 
-    			ctx.getCompilation(), _serialMode);
-        for (StoreQuery q : _queries) {
-            ex.addExecutor(q.newDataStoreExecutor(meta, subs));
-        }
-        return ex;
-    }
-    
-    public void setContext(QueryContext ctx) {
-    	super.setContext(ctx);
-    	for (StoreQuery q : _queries) 
-    		q.setContext(ctx); 
-    }
-    
-    public ExecutorService getExecutorServiceInstance() {
-        DistributedJDBCConfiguration conf = 
-            ((DistributedJDBCConfiguration)getStore().getConfiguration());
-        return conf.getExecutorServiceInstance();
-    }
-    
+
 	/**
 	 * Executes queries on multiple databases.
 	 * 
-	 * @author Pinaki Poddar 
-	 *
+	 * @author Pinaki Poddar
+	 * 
 	 */
-	public static class ParallelExecutor extends 
-		ExpressionStoreQuery.DataStoreExecutor {
+	public static class ParallelExecutor extends
+			ExpressionStoreQuery.DataStoreExecutor {
 		private List<Executor> executors = new ArrayList<Executor>();
 		private DistributedStoreQuery owner = null;
 		private ExecutorService threadPool = null;
-		private final boolean serialMode;
-		
-        public ParallelExecutor(DistributedStoreQuery dsq, ClassMetaData meta, 
-        	boolean subclasses, ExpressionParser parser, Object parsed, 
-        	boolean serial) {
-        	super(dsq, meta, subclasses, parser, parsed);
-        	owner = dsq;
-        	threadPool = dsq.getExecutorServiceInstance();
-        	serialMode = false;//serial;
-        }
-        
+		private final boolean parallel;
+
+		public ParallelExecutor(DistributedStoreQuery dsq, ClassMetaData meta,
+				boolean subclasses, ExpressionParser parser, Object parsed, 
+				boolean parallel) {
+			super(dsq, meta, subclasses, parser, parsed);
+			owner = dsq;
+			threadPool = dsq.getExecutorServiceInstance();
+			this.parallel = parallel;
+		}
+
 		public void addExecutor(Executor ex) {
 			executors.add(ex);
 		}
-		
-        /**
-         * Each child query must be executed with slice context and not the 
-         * given query context.
-         */
-        public ResultObjectProvider executeQuery(StoreQuery q,
-                final Object[] params, final Range range) {
-        	List<Future<ResultObjectProvider>> futures = null;
-        	final List<Executor> usedExecutors = new ArrayList<Executor>();
-        	final List<ResultObjectProvider> rops = 
-        		new ArrayList<ResultObjectProvider>();
-        	List<SliceStoreManager> targets = findTargets();
-        	QueryContext ctx = q.getContext();
-        	boolean isReplicated = containsReplicated(ctx);
-        	for (int i = 0; i < owner._queries.size(); i++) {
-        		// if replicated, then execute only on single slice
-        		if (i > 0 && isReplicated) {
-        			continue;
-        		}
-        		StoreManager sm  = owner.getDistributedStore().getSlice(i);
-        		if (!targets.contains(sm))
-        			continue;
-         		StoreQuery query = owner._queries.get(i);
-        		Executor executor = executors.get(i);
-        		if (!targets.contains(sm))
-        			continue;
-        		usedExecutors.add(executor);
-        		if (serialMode) {
-        			rops.add(executor.executeQuery(query, params, range));
-        		} else {
-        			if (futures == null)
-        				futures = new ArrayList<Future<ResultObjectProvider>>();
-	        		QueryExecutor call = new QueryExecutor();
-	        		call.executor = executor;
-	        		call.query    = query;
-	        		call.params   = params;
-	        		call.range    = range;
-	        		futures.add(threadPool.submit(call)); 
-        		}
-        	}
-        	if (!serialMode) {
-	    		for (Future<ResultObjectProvider> future:futures) {
-	        		try {
+
+		/**
+		 * Each child query must be executed with slice context and not the
+		 * given query context.
+		 */
+		public ResultObjectProvider executeQuery(StoreQuery q,
+				final Object[] params, final Range range) {
+			List<Future<ResultObjectProvider>> futures = 
+				new ArrayList<Future<ResultObjectProvider>>();
+			final List<Executor> usedExecutors = new ArrayList<Executor>();
+			final List<ResultObjectProvider> rops = 
+				new ArrayList<ResultObjectProvider>();
+			List<SliceStoreManager> targets = findTargets();
+			QueryContext ctx = q.getContext();
+			boolean isReplicated = containsReplicated(ctx);
+			for (int i = 0; i < owner._queries.size(); i++) {
+				// if replicated, then execute only on single slice
+				if (i > 0 && isReplicated) {
+					continue;
+				}
+				StoreManager sm = owner.getDistributedStore().getSlice(i);
+				if (!targets.contains(sm))
+					continue;
+				StoreQuery query = owner._queries.get(i);
+				Executor executor = executors.get(i);
+				if (!targets.contains(sm))
+					continue;
+				usedExecutors.add(executor);
+				if (!parallel) {
+					rops.add(executor.executeQuery(query, params, range));
+				} else {
+					QueryExecutor call = new QueryExecutor();
+					call.executor = executor;
+					call.query = query;
+					call.params = params;
+					call.range = range;
+					futures.add(threadPool.submit(call));
+				}
+
+			}
+			if (parallel) {
+				for (Future<ResultObjectProvider> future : futures) {
+					try {
 						rops.add(future.get());
 					} catch (InterruptedException e) {
 						throw new RuntimeException(e);
 					} catch (ExecutionException e) {
 						throw new StoreException(e.getCause());
 					}
-	        	}
-        	}
-        	ResultObjectProvider[] tmp = rops.toArray
-        		(new ResultObjectProvider[rops.size()]);
-        	ResultObjectProvider result = null;
-        	boolean[] ascending = getAscending(q);
-        	boolean isAscending = ascending.length > 0;
-        	boolean isAggregate = ctx.isAggregate();
-        	boolean hasRange    = ctx.getEndRange() != Long.MAX_VALUE;
-        	if (isAggregate) {
-        	    result = new UniqueResultObjectProvider(tmp, q, 
-        	            getQueryExpressions());
-        	} else if (isAscending) {
-        	    result = new OrderingMergedResultObjectProvider(tmp, ascending, 
-                  usedExecutors.toArray(new Executor[usedExecutors.size()]),
-                  q, params);
-        	} else {
-        	    result = new MergedResultObjectProvider(tmp);
-        	}
-        	if (hasRange) {
-        	    result = new RangeResultObjectProvider(result, 
-        	            ctx.getStartRange(), ctx.getEndRange());
-        	}
-        	return result;
-        }
-        
-        /**
+				}
+			}
+			ResultObjectProvider[] tmp = rops
+					.toArray(new ResultObjectProvider[rops.size()]);
+			ResultObjectProvider result = null;
+			boolean[] ascending = getAscending(q);
+			boolean isAscending = ascending.length > 0;
+			boolean isAggregate = ctx.isAggregate();
+			boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE;
+			if (isAggregate) {
+				result = new UniqueResultObjectProvider(tmp, q,
+						getQueryExpressions());
+			} else if (isAscending) {
+				result = new OrderingMergedResultObjectProvider(tmp, ascending,
+					usedExecutors.toArray(new Executor[usedExecutors.size()]),
+					q, params);
+			} else {
+				result = new MergedResultObjectProvider(tmp);
+			}
+			if (hasRange) {
+				result = new RangeResultObjectProvider(result, ctx
+						.getStartRange(), ctx.getEndRange());
+			}
+			return result;
+		}
+
+		/**
 		 * Scans metadata to find out if a replicated class is the candidate.
-        **/
-        boolean containsReplicated(QueryContext query) {
-        	Class candidate = query.getCandidateType();
-        	if (candidate != null) {
-        		ClassMetaData meta = query.getStoreContext().getConfiguration()
-        			.getMetaDataRepositoryInstance()
-        			.getMetaData(candidate, null, true);
-        		if (meta != null && meta.isReplicated())
-        			return true;
-        	}
-        	ClassMetaData[] metas = query.getAccessPathMetaDatas();
-        	if (metas == null || metas.length < 1)
-        		return false;
-        	for (ClassMetaData type : metas)
-        		if (type.isReplicated())
-        			return true;
-        	return false;
-        }
-        
-        public Number executeDelete(StoreQuery q, Object[] params) {
-        	Iterator<StoreQuery> qs = owner._queries.iterator();
-        	List<Future<Number>> futures = null;
-        	int result = 0;
-        	for (Executor ex:executors) {
-        		if (serialMode) {
-        			Number n = ex.executeDelete(qs.next(), params);    
-        			if (n != null)
-        				result += n.intValue();
-        		} else {
-        			if (futures == null)
-        				futures = new ArrayList<Future<Number>>();
-	        		DeleteExecutor call = new DeleteExecutor();
-	        		call.executor = ex;
-	        		call.query    = qs.next();
-	        		call.params   = params;
-	        		futures.add(threadPool.submit(call)); 
-        		}
-        	}
-        	if (!serialMode) {
-	        	for (Future<Number> future:futures) {
-	        		try {
-	            		Number n = future.get();
-	            		if (n != null) 
-	            			result += n.intValue();
-					} catch (InterruptedException e) {
-						throw new RuntimeException(e);
-					} catch (ExecutionException e) {
-						throw new StoreException(e.getCause());
-					}
-	        	}
-        	}
-        	return result;
-        }
-        
-        public Number executeUpdate(StoreQuery q, Object[] params) {
-        	Iterator<StoreQuery> qs = owner._queries.iterator();
-        	List<Future<Number>> futures = null;
-        	int result = 0;
-        	for (Executor ex:executors) {
-        		if (serialMode) {
-        			Number n = ex.executeUpdate(qs.next(), params);
-        			result += (n == null) ? 0 : n.intValue();
-        		} else {
-        			if (futures == null)
-        				futures = new ArrayList<Future<Number>>();
-        		UpdateExecutor call = new UpdateExecutor();
-        		call.executor = ex;
-        		call.query    = qs.next();
-        		call.params   = params;
-        		futures.add(threadPool.submit(call)); 
-        		}
-        	}
-        	if (serialMode) {
-	        	for (Future<Number> future:futures) {
-	        		try {
-	            		Number n = future.get();
-	        			result += (n == null) ? 0 : n.intValue();
-					} catch (InterruptedException e) {
-						throw new RuntimeException(e);
-					} catch (ExecutionException e) {
-						throw new StoreException(e.getCause());
-					}
-	        	}
-        	}
-        	return result;
-        }
-        
-        List<SliceStoreManager> findTargets() {
-        	FetchConfiguration fetch = owner.getContext().getFetchConfiguration();
-        	return owner.getDistributedStore().getTargets(fetch);
-        }
+		 */
+		boolean containsReplicated(QueryContext query) {
+			Class candidate = query.getCandidateType();
+			if (candidate != null) {
+				ClassMetaData meta = query.getStoreContext().getConfiguration()
+						.getMetaDataRepositoryInstance().getMetaData(candidate,
+								null, true);
+				if (meta != null && meta.isReplicated())
+					return true;
+			}
+			ClassMetaData[] metas = query.getAccessPathMetaDatas();
+			if (metas == null || metas.length < 1)
+				return false;
+			for (ClassMetaData type : metas)
+				if (type.isReplicated())
+					return true;
+			return false;
+		}
+
+		public Number executeDelete(StoreQuery q, Object[] params) {
+			Iterator<StoreQuery> qs = owner._queries.iterator();
+			List<Future<Number>> futures = null;
+			int result = 0;
+			for (Executor ex : executors) {
+				if (futures == null)
+					futures = new ArrayList<Future<Number>>();
+				DeleteExecutor call = new DeleteExecutor();
+				call.executor = ex;
+				call.query = qs.next();
+				call.params = params;
+				futures.add(threadPool.submit(call));
+			}
+			for (Future<Number> future : futures) {
+				try {
+					Number n = future.get();
+					if (n != null)
+						result += n.intValue();
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				} catch (ExecutionException e) {
+					throw new StoreException(e.getCause());
+				}
+			}
+			return result;
+		}
+
+		public Number executeUpdate(StoreQuery q, Object[] params) {
+			Iterator<StoreQuery> qs = owner._queries.iterator();
+			List<Future<Number>> futures = null;
+			int result = 0;
+			for (Executor ex : executors) {
+				if (futures == null)
+					futures = new ArrayList<Future<Number>>();
+				UpdateExecutor call = new UpdateExecutor();
+				call.executor = ex;
+				call.query = qs.next();
+				call.params = params;
+				futures.add(threadPool.submit(call));
+			}
+			for (Future<Number> future : futures) {
+				try {
+					Number n = future.get();
+					result += (n == null) ? 0 : n.intValue();
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				} catch (ExecutionException e) {
+					throw new StoreException(e.getCause());
+				}
+			}
+			return result;
+		}
+
+		List<SliceStoreManager> findTargets() {
+			FetchConfiguration fetch = owner.getContext()
+					.getFetchConfiguration();
+			return owner.getDistributedStore().getTargets(fetch);
+		}
 	}
-	
-	static  class QueryExecutor implements Callable<ResultObjectProvider> {
+
+	static class QueryExecutor implements Callable<ResultObjectProvider> {
 		StoreQuery query;
 		Executor executor;
 		Object[] params;
 		Range range;
+
 		public ResultObjectProvider call() throws Exception {
-			return executor.executeQuery(query, params, range);
+			((QueryImpl)query.getContext()).startLocking();
+			((BrokerImpl)query.getContext().getStoreContext()).startLocking();
+			((QueryImpl)query.getContext()).lock();
+			((BrokerImpl)query.getContext().getStoreContext()).lock();
+			try { 
+				return executor.executeQuery(query, params, range);
+			} finally {
+				((QueryImpl)query.getContext()).unlock();
+				((BrokerImpl)query.getContext().getStoreContext()).unlock();
+				((QueryImpl)query.getContext()).stopLocking();
+				((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
+			}
 		}
 	}
-	
-	static  class DeleteExecutor implements Callable<Number> {
+
+	static class DeleteExecutor implements Callable<Number> {
 		StoreQuery query;
 		Executor executor;
 		Object[] params;
+
 		public Number call() throws Exception {
-			return executor.executeDelete(query, params);
+			((QueryImpl)query.getContext()).startLocking();
+			((BrokerImpl)query.getContext().getStoreContext()).startLocking();
+			((QueryImpl)query.getContext()).lock();
+			((BrokerImpl)query.getContext().getStoreContext()).lock();
+			try { 
+				return executor.executeDelete(query, params);
+			} finally {
+				((QueryImpl)query.getContext()).unlock();
+				((BrokerImpl)query.getContext().getStoreContext()).unlock();
+				((QueryImpl)query.getContext()).stopLocking();
+				((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
+			}
 		}
 	}
-	
-	static  class UpdateExecutor implements Callable<Number> {
+
+	static class UpdateExecutor implements Callable<Number> {
 		StoreQuery query;
 		Executor executor;
 		Object[] params;
+
 		public Number call() throws Exception {
-			return executor.executeUpdate(query, params);
+			((QueryImpl)query.getContext()).startLocking();
+			((BrokerImpl)query.getContext().getStoreContext()).startLocking();
+			((QueryImpl)query.getContext()).lock();
+			((BrokerImpl)query.getContext().getStoreContext()).lock();
+			try { 
+				return executor.executeUpdate(query, params);
+			} finally {
+				((QueryImpl)query.getContext()).unlock();
+				((BrokerImpl)query.getContext().getStoreContext()).unlock();
+				((QueryImpl)query.getContext()).stopLocking();
+				((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
+			}
 		}
 	}
 }
-

Modified: openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties (original)
+++ openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties Thu Dec 18 16:08:23 2008
@@ -17,4 +17,10 @@
 bad-policy-slice:Distribution policy "{0}" has returned invalid slice \
 	"{1}" for "{2}". The valid slices are {3}. This error may happen \
 	when one or more of the originally configured slices are unavailable \
-	and Lenient property is set to true.
\ No newline at end of file
+	and Lenient property is set to true.
+forced-set-config: Configuration property "{0}" is not set explicitly. Setting \
+	this value to "{1}".
+multithreaded-false: Configuration property "{0}" is set to "false". \
+	It is recommended to set "{0}" to "true", because Slice executes database \
+	operations per slice in parallel in different threads, setting "{0}" to \
+	"false" may cause unpredictable behavior. 
\ No newline at end of file

Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java Thu Dec 18 16:08:23 2008
@@ -218,7 +218,7 @@
      * Disable this test temporarily as we undergo changes in internal slice 
      * information structure.
      */
-    public void xtestUpdateReplicatedObjects() {
+    public void testUpdateReplicatedObjects() {
         EntityManager em = emf.createEntityManager();
         em.getTransaction().begin();
         String[] names = {"USA", "India", "China"};

Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java Thu Dec 18 16:08:23 2008
@@ -18,6 +18,9 @@
  */
 package org.apache.openjpa.slice;
 
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -25,6 +28,9 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.persistence.EntityManager;
@@ -42,7 +48,7 @@
 	private int POBJECT_COUNT = 25;
 	private int VALUE_MIN = 100;
 	private int VALUE_MAX = VALUE_MIN + POBJECT_COUNT - 1;
-	private static int THREADS = 3;
+	private static int THREADS = 5;
 	private ExecutorService group; 
 	private Future[] futures;
 
@@ -57,7 +63,14 @@
 		if (count == 0) {
 			create(POBJECT_COUNT);
 		}
-		group = Executors.newCachedThreadPool();
+		group = new ThreadPoolExecutor(THREADS, THREADS,
+                60, TimeUnit.SECONDS,
+                new SynchronousQueue<Runnable>(), new ThreadFactory() {
+					public Thread newThread(Runnable r) {
+						return new Thread(r);
+					}
+				
+				});
 		futures = new Future[THREADS];
 	}
 	
@@ -293,8 +306,9 @@
 					f.get();
 				} catch (ExecutionException e) {
 					Throwable t = e.getCause();
-					t.getCause().printStackTrace();
-					fail("Failed " + t.getCause());
+					StringWriter writer = new StringWriter();
+					t.printStackTrace(new PrintWriter(writer));
+					fail("Failed " + writer.toString());
 				}
 		} catch (InterruptedException e) {