You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2015/10/16 22:42:03 UTC

[22/50] [abbrv] incubator-geode git commit: [GEODE-146] Queries are not thread safe due to scopeId variable ScopeId was not thread safe when stored and used in CompiledSelect Multiple threads could set/unset the value. Instead it is stored in the query

[GEODE-146] Queries are not thread safe due to scopeId variable
ScopeId was not thread safe when stored and used in CompiledSelect
Multiple threads could set/unset the value.  Instead it is stored in
the query execution context.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/1512677f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/1512677f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/1512677f

Branch: refs/heads/feature/GEODE-291
Commit: 1512677f721279a05c6ebddad443d2043d032f99
Parents: fc9ee58
Author: Jason Huynh <jh...@pivotal.io>
Authored: Mon Oct 5 09:58:04 2015 -0700
Committer: Jason Huynh <jh...@pivotal.io>
Committed: Mon Oct 5 10:01:57 2015 -0700

----------------------------------------------------------------------
 .../cache/query/internal/CompiledSelect.java    | 34 +++-----
 .../gemfire/cache/query/QueryJUnitTest.java     | 91 ++++++++++++++++++++
 2 files changed, 103 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1512677f/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledSelect.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledSelect.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledSelect.java
index 0b07211..a18d9c3 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledSelect.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/query/internal/CompiledSelect.java
@@ -59,7 +59,6 @@ public class CompiledSelect extends AbstractCompiledValue {
      // 0 is projection name, 1 is the CompiledValue for the expression 
   private boolean distinct;
   private boolean count;
-  private int scopeID;
   //Asif: limits the SelectResults by the number specified.
   private CompiledValue limit;
   //Shobhit: counts the no of results satisfying where condition for
@@ -72,17 +71,10 @@ public class CompiledSelect extends AbstractCompiledValue {
   protected boolean transformationDone = false;
   protected ObjectType cachedElementTypeForOrderBy = null;
   private boolean hasUnmappedOrderByCols = false; 
-  
 
-  /** 
-   * Identifies the scope ID assosciated with the Select. The CompiledSelect object
-   * is shared across by multiple query executing threads, but since the scopeID 
-   * which gets assigned in the computeDependency phase & is obtained from 
-   * ExecutionContext, it will not differ across threads. This field may get reassigned
-   * by various threads, but still the value will be consistent.
-   * It is also therefore not needed to make this field volatile
-   * Asif
-   */
+  //used as a key in a context to identify the scope of this CompiledSelect 
+  private Object scopeID = new Object(); 
+
   public CompiledSelect(boolean distinct, boolean count, CompiledValue whereClause,
                         List iterators, List projAttrs,List<CompiledSortCriterion> orderByAttrs, CompiledValue limit,
                         List<String> hints, List<CompiledValue> groupByClause) {
@@ -170,10 +162,9 @@ public class CompiledSelect extends AbstractCompiledValue {
          AmbiguousNameException,
          NameResolutionException {
     // bind iterators in new scope in order to determine dependencies
-    //int scopeID = context.assosciateScopeID(this);
-    this.scopeID = context.assosciateScopeID();
-    context.newScope(scopeID);
-    context.pushExecCache(scopeID);
+    context.cachePut(scopeID, context.assosciateScopeID());
+    context.newScope((Integer)context.cacheGet(scopeID));
+    context.pushExecCache((Integer)context.cacheGet(scopeID));
     try {
       Iterator iter = this.iterators.iterator();
       while (iter.hasNext()) {
@@ -365,8 +356,8 @@ public class CompiledSelect extends AbstractCompiledValue {
   throws FunctionDomainException, TypeMismatchException, NameResolutionException, QueryInvocationTargetException {
     ExecutionContext context = new QueryExecutionContext(parameters, cache, query);
     computeDependencies(context);
-    context.newScope(this.scopeID);
-    context.pushExecCache(scopeID);
+    context.newScope((Integer)context.cacheGet(scopeID));
+    context.pushExecCache((Integer)context.cacheGet(scopeID));
     SelectResults results = null;
     try {
       Iterator iter = iterators.iterator();
@@ -390,9 +381,8 @@ public class CompiledSelect extends AbstractCompiledValue {
   
   public SelectResults evaluate(ExecutionContext context) throws FunctionDomainException, TypeMismatchException,
       NameResolutionException, QueryInvocationTargetException {
-   // context.newScope(context.getScopeID(this));
-    context.newScope(this.scopeID);
-    context.pushExecCache(scopeID);
+    context.newScope((Integer)context.cacheGet(scopeID));
+    context.pushExecCache((Integer)context.cacheGet(scopeID));
     context.setDistinct(this.distinct);
     if(this.hasUnmappedOrderByCols && context.getBucketList() != null) {
       throw new QueryInvalidException(LocalizedStrings.DefaultQuery_ORDER_BY_ATTRIBS_NOT_PRESENT_IN_PROJ.toLocalizedString()); 
@@ -1477,8 +1467,8 @@ public class CompiledSelect extends AbstractCompiledValue {
       return true;
     }
 
-    context.newScope(this.scopeID);
-    context.pushExecCache(scopeID);
+    context.newScope((Integer)context.cacheGet(scopeID));
+    context.pushExecCache((Integer)context.cacheGet(scopeID));
     try {
       CompiledIteratorDef iterDef = (CompiledIteratorDef) iterators.get(0);
       RuntimeIterator rIter = iterDef.getRuntimeIterator(context);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1512677f/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/QueryJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/QueryJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/QueryJUnitTest.java
index 5490278..03b6d41 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/QueryJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/QueryJUnitTest.java
@@ -23,7 +23,11 @@ import static org.junit.runners.MethodSorters.NAME_ASCENDING;
 
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.After;
 import org.junit.Before;
@@ -31,9 +35,14 @@ import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import util.TestException;
+
 import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
 import com.gemstone.gemfire.cache.query.data.Portfolio;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
@@ -353,5 +362,87 @@ public class QueryJUnitTest {
     }
     assertEquals("Incorrect result size ", 1, sr.size());
   }
+
+  @Test
+  public void testThreadSafetyOfCompiledSelectScopeId() throws Exception {
+    try {
+      Cache cache = CacheUtils.getCache();
+      RegionFactory<Integer, Portfolio> rf = cache
+          .createRegionFactory(RegionShortcut.PARTITION);
+      Region r = rf.create("keyzset");
+      for (int i = 0; i < 100; i++) {
+        r.put(i, new Portfolio(i));
+      }
+      ScopeThreadingTestHook scopeIDTestHook = new ScopeThreadingTestHook(3);
+      DefaultQuery.testHook = scopeIDTestHook;
+      QueryService qs = cache.getQueryService();
+      Query q = qs
+          .newQuery("SELECT DISTINCT * FROM /keyzset.keySet key WHERE key.id > 0 AND key.id <= 0 ORDER BY key asc LIMIT $3");
+      Thread q1 = new Thread(new QueryRunnable(q, new Object[] { 10, 20, 10 }));
+      Thread q2 = new Thread(new QueryRunnable(q, new Object[] { 5, 10, 5 }));
+      Thread q3 = new Thread(new QueryRunnable(q, new Object[] { 2, 10, 8 }));
+      q1.start();
+      q2.start();
+      q3.start();
+      q1.join();
+      q2.join();
+      q3.join();
+      assertEquals("Exceptions were thrown due to DefaultQuery not being thread-safe", true, scopeIDTestHook.isOk());
+    }
+    finally {
+      DefaultQuery.testHook = null;
+    }
+  }
+
+  private class QueryRunnable implements Runnable {
+    private Query q;
+    private Object[] params;
+
+    public QueryRunnable(Query q, Object[] params) {
+      this.q = q;
+      this.params = params;
+    }
+
+    public void run() {
+      try {
+        q.execute(params);
+      } catch (Exception e) {
+        throw new TestException("exception occured while executing query", e);
+      }
+    }
+  }
+
+  public class ScopeThreadingTestHook implements DefaultQuery.TestHook {
+    private CyclicBarrier barrier;
+    private List<Exception> exceptionsThrown = new LinkedList<Exception>();
+
+    public ScopeThreadingTestHook(int numThreads) {
+      barrier = new CyclicBarrier(numThreads);
+    }
+
+    @Override
+    public void doTestHook(int spot) {
+      this.doTestHook(spot + "");
+    }
+
+    @Override
+    public void doTestHook(String spot) {
+      if (spot.equals("1")) {
+        try {
+          barrier.await(8, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          exceptionsThrown.add(e);
+          Thread.currentThread().interrupt();
+        } catch (Exception e) {
+          exceptionsThrown.add(e);
+        }
+      }
+    }
+
+    public boolean isOk() {
+      return exceptionsThrown.size() == 0;
+    }
+  }
+
   
 }