You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2015/10/05 19:02:06 UTC
incubator-geode git commit: [GEODE-146] Queries are not thread safe
due to scopeId variable ScopeId was not thread safe when stored and used in
CompiledSelect Multiple threads could set/unset the value. Instead it is
stored in the query execution contex
Repository: incubator-geode
Updated Branches:
refs/heads/develop fc9ee589b -> 1512677f7
[GEODE-146] Queries are not thread safe due to scopeId variable
ScopeId was not thread safe when stored and used in CompiledSelect
Multiple threads could set/unset the value. Instead it is stored in
the query execution context.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/1512677f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/1512677f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/1512677f
Branch: refs/heads/develop
Commit: 1512677f721279a05c6ebddad443d2043d032f99
Parents: fc9ee58
Author: Jason Huynh <jh...@pivotal.io>
Authored: Mon Oct 5 09:58:04 2015 -0700
Committer: Jason Huynh <jh...@pivotal.io>
Committed: Mon Oct 5 10:01:57 2015 -0700
----------------------------------------------------------------------
.../cache/query/internal/CompiledSelect.java | 34 +++-----
.../gemfire/cache/query/QueryJUnitTest.java | 91 ++++++++++++++++++++
2 files changed, 103 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1512677f/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledSelect.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledSelect.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledSelect.java
index 0b07211..a18d9c3 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledSelect.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledSelect.java
@@ -59,7 +59,6 @@ public class CompiledSelect extends AbstractCompiledValue {
// 0 is projection name, 1 is the CompiledValue for the expression
private boolean distinct;
private boolean count;
- private int scopeID;
//Asif: limits the SelectResults by the number specified.
private CompiledValue limit;
//Shobhit: counts the no of results satisfying where condition for
@@ -72,17 +71,10 @@ public class CompiledSelect extends AbstractCompiledValue {
protected boolean transformationDone = false;
protected ObjectType cachedElementTypeForOrderBy = null;
private boolean hasUnmappedOrderByCols = false;
-
- /**
- * Identifies the scope ID assosciated with the Select. The CompiledSelect object
- * is shared across by multiple query executing threads, but since the scopeID
- * which gets assigned in the computeDependency phase & is obtained from
- * ExecutionContext, it will not differ across threads. This field may get reassigned
- * by various threads, but still the value will be consistent.
- * It is also therefore not needed to make this field volatile
- * Asif
- */
+ //used as a key in a context to identify the scope of this CompiledSelect
+ private Object scopeID = new Object();
+
public CompiledSelect(boolean distinct, boolean count, CompiledValue whereClause,
List iterators, List projAttrs,List<CompiledSortCriterion> orderByAttrs, CompiledValue limit,
List<String> hints, List<CompiledValue> groupByClause) {
@@ -170,10 +162,9 @@ public class CompiledSelect extends AbstractCompiledValue {
AmbiguousNameException,
NameResolutionException {
// bind iterators in new scope in order to determine dependencies
- //int scopeID = context.assosciateScopeID(this);
- this.scopeID = context.assosciateScopeID();
- context.newScope(scopeID);
- context.pushExecCache(scopeID);
+ context.cachePut(scopeID, context.assosciateScopeID());
+ context.newScope((Integer)context.cacheGet(scopeID));
+ context.pushExecCache((Integer)context.cacheGet(scopeID));
try {
Iterator iter = this.iterators.iterator();
while (iter.hasNext()) {
@@ -365,8 +356,8 @@ public class CompiledSelect extends AbstractCompiledValue {
throws FunctionDomainException, TypeMismatchException, NameResolutionException, QueryInvocationTargetException {
ExecutionContext context = new QueryExecutionContext(parameters, cache, query);
computeDependencies(context);
- context.newScope(this.scopeID);
- context.pushExecCache(scopeID);
+ context.newScope((Integer)context.cacheGet(scopeID));
+ context.pushExecCache((Integer)context.cacheGet(scopeID));
SelectResults results = null;
try {
Iterator iter = iterators.iterator();
@@ -390,9 +381,8 @@ public class CompiledSelect extends AbstractCompiledValue {
public SelectResults evaluate(ExecutionContext context) throws FunctionDomainException, TypeMismatchException,
NameResolutionException, QueryInvocationTargetException {
- // context.newScope(context.getScopeID(this));
- context.newScope(this.scopeID);
- context.pushExecCache(scopeID);
+ context.newScope((Integer)context.cacheGet(scopeID));
+ context.pushExecCache((Integer)context.cacheGet(scopeID));
context.setDistinct(this.distinct);
if(this.hasUnmappedOrderByCols && context.getBucketList() != null) {
throw new QueryInvalidException(LocalizedStrings.DefaultQuery_ORDER_BY_ATTRIBS_NOT_PRESENT_IN_PROJ.toLocalizedString());
@@ -1477,8 +1467,8 @@ public class CompiledSelect extends AbstractCompiledValue {
return true;
}
- context.newScope(this.scopeID);
- context.pushExecCache(scopeID);
+ context.newScope((Integer)context.cacheGet(scopeID));
+ context.pushExecCache((Integer)context.cacheGet(scopeID));
try {
CompiledIteratorDef iterDef = (CompiledIteratorDef) iterators.get(0);
RuntimeIterator rIter = iterDef.getRuntimeIterator(context);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1512677f/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/QueryJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/QueryJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/QueryJUnitTest.java
index 5490278..03b6d41 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/QueryJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/QueryJUnitTest.java
@@ -23,7 +23,11 @@ import static org.junit.runners.MethodSorters.NAME_ASCENDING;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
@@ -31,9 +35,14 @@ import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import util.TestException;
+
import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.query.data.Portfolio;
import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
@@ -353,5 +362,87 @@ public class QueryJUnitTest {
}
assertEquals("Incorrect result size ", 1, sr.size());
}
+
+ @Test
+ public void testThreadSafetyOfCompiledSelectScopeId() throws Exception {
+ try {
+ Cache cache = CacheUtils.getCache();
+ RegionFactory<Integer, Portfolio> rf = cache
+ .createRegionFactory(RegionShortcut.PARTITION);
+ Region r = rf.create("keyzset");
+ for (int i = 0; i < 100; i++) {
+ r.put(i, new Portfolio(i));
+ }
+ ScopeThreadingTestHook scopeIDTestHook = new ScopeThreadingTestHook(3);
+ DefaultQuery.testHook = scopeIDTestHook;
+ QueryService qs = cache.getQueryService();
+ Query q = qs
+ .newQuery("SELECT DISTINCT * FROM /keyzset.keySet key WHERE key.id > 0 AND key.id <= 0 ORDER BY key asc LIMIT $3");
+ Thread q1 = new Thread(new QueryRunnable(q, new Object[] { 10, 20, 10 }));
+ Thread q2 = new Thread(new QueryRunnable(q, new Object[] { 5, 10, 5 }));
+ Thread q3 = new Thread(new QueryRunnable(q, new Object[] { 2, 10, 8 }));
+ q1.start();
+ q2.start();
+ q3.start();
+ q1.join();
+ q2.join();
+ q3.join();
+ assertEquals("Exceptions were thrown due to DefaultQuery not being thread-safe", true, scopeIDTestHook.isOk());
+ }
+ finally {
+ DefaultQuery.testHook = null;
+ }
+ }
+
+ private class QueryRunnable implements Runnable {
+ private Query q;
+ private Object[] params;
+
+ public QueryRunnable(Query q, Object[] params) {
+ this.q = q;
+ this.params = params;
+ }
+
+ public void run() {
+ try {
+ q.execute(params);
+ } catch (Exception e) {
+ throw new TestException("exception occured while executing query", e);
+ }
+ }
+ }
+
+ public class ScopeThreadingTestHook implements DefaultQuery.TestHook {
+ private CyclicBarrier barrier;
+ private List<Exception> exceptionsThrown = new LinkedList<Exception>();
+
+ public ScopeThreadingTestHook(int numThreads) {
+ barrier = new CyclicBarrier(numThreads);
+ }
+
+ @Override
+ public void doTestHook(int spot) {
+ this.doTestHook(spot + "");
+ }
+
+ @Override
+ public void doTestHook(String spot) {
+ if (spot.equals("1")) {
+ try {
+ barrier.await(8, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ exceptionsThrown.add(e);
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ exceptionsThrown.add(e);
+ }
+ }
+ }
+
+ public boolean isOk() {
+ return exceptionsThrown.size() == 0;
+ }
+ }
+
}