You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2018/09/05 18:37:53 UTC

[geode] branch develop updated: GEODE-5678: Primary key index results are wrapped in CqEntry for cq (#2416)

This is an automated email from the ASF dual-hosted git repository.

jasonhuynh pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new a99013b  GEODE-5678: Primary key index results are wrapped in CqEntry for cq (#2416)
a99013b is described below

commit a99013b6d8bc667ade1943c72105c9a111a867d4
Author: Jason Huynh <hu...@gmail.com>
AuthorDate: Wed Sep 5 11:37:47 2018 -0700

    GEODE-5678: Primary key index results are wrapped in CqEntry for cq (#2416)
    
    * Added tests and small refactor
---
 .../cache/query/internal/index/AbstractIndex.java  | 12 ++-
 .../query/internal/index/CompactRangeIndex.java    |  8 +-
 .../cache/query/internal/index/HashIndex.java      |  8 +-
 .../query/internal/index/PrimaryKeyIndex.java      | 22 +++--
 .../query/internal/index/PrimaryKeyIndexTest.java  | 98 ++++++++++++++++++++++
 .../geode/cache/query/cq/CQDistributedTest.java    | 20 +++++
 6 files changed, 149 insertions(+), 19 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/AbstractIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/AbstractIndex.java
index 30b1401..67ae2c5 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/AbstractIndex.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/AbstractIndex.java
@@ -557,7 +557,6 @@ public abstract class AbstractIndex implements IndexProtocol {
 
   private void addToResultsWithUnionOrIntersection(Collection results,
       SelectResults intermediateResults, boolean isIntersection, Object value) {
-
     value = verifyAndGetPdxDomainObject(value);
 
     if (intermediateResults == null) {
@@ -628,6 +627,17 @@ public abstract class AbstractIndex implements IndexProtocol {
     }
   }
 
+  void applyCqOrProjection(List projAttrib, ExecutionContext context, Collection result,
+      Object iterValue, SelectResults intermediateResults, boolean isIntersection, Object key)
+      throws FunctionDomainException, TypeMismatchException, NameResolutionException,
+      QueryInvocationTargetException {
+    if (context != null && context.isCqQueryContext()) {
+      result.add(new CqEntry(key, iterValue));
+    } else {
+      applyProjection(projAttrib, context, result, iterValue, intermediateResults, isIntersection);
+    }
+  }
+
   void applyProjection(List projAttrib, ExecutionContext context, Collection result,
       Object iterValue, SelectResults intermediateResults, boolean isIntersection)
       throws FunctionDomainException, TypeMismatchException, NameResolutionException,
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/CompactRangeIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/CompactRangeIndex.java
index 7b75b02..748e84a 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/CompactRangeIndex.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/CompactRangeIndex.java
@@ -793,12 +793,8 @@ public class CompactRangeIndex extends AbstractIndex {
                 ok = QueryUtils.applyCondition(iterOps, context);
               }
               if (ok) {
-                if (context != null && context.isCqQueryContext()) {
-                  result.add(new CqEntry(indexEntry.getDeserializedRegionKey(), value));
-                } else {
-                  applyProjection(projAttrib, context, result, value, intermediateResults,
-                      isIntersection);
-                }
+                applyCqOrProjection(projAttrib, context, result, value, intermediateResults,
+                    isIntersection, indexEntry.getDeserializedRegionKey());
                 if (verifyLimit(result, limit)) {
                   observer.limitAppliedAtIndexLevel(this, limit, result);
                   return;
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/HashIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/HashIndex.java
index 2c3ab64..044804c 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/HashIndex.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/HashIndex.java
@@ -49,7 +49,6 @@ import org.apache.geode.cache.query.internal.CompiledIteratorDef;
 import org.apache.geode.cache.query.internal.CompiledPath;
 import org.apache.geode.cache.query.internal.CompiledSortCriterion;
 import org.apache.geode.cache.query.internal.CompiledValue;
-import org.apache.geode.cache.query.internal.CqEntry;
 import org.apache.geode.cache.query.internal.DefaultQuery;
 import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.cache.query.internal.IndexInfo;
@@ -677,11 +676,8 @@ public class HashIndex extends AbstractIndex {
         ok = QueryUtils.applyCondition(iterOps, context);
       }
       if (ok) {
-        if (context != null && context.isCqQueryContext()) {
-          result.add(new CqEntry(re.getKey(), value));
-        } else {
-          applyProjection(projAttrib, context, result, value, intermediateResults, isIntersection);
-        }
+        applyCqOrProjection(projAttrib, context, result, value, intermediateResults,
+            isIntersection, re.getKey());
         if (limit != -1 && result.size() == limit) {
           observer.limitAppliedAtIndexLevel(this, limit, result);
           return;
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndex.java
index af80e48..4103113 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndex.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndex.java
@@ -31,6 +31,7 @@ import org.apache.geode.cache.query.QueryService;
 import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.TypeMismatchException;
 import org.apache.geode.cache.query.internal.CompiledValue;
+import org.apache.geode.cache.query.internal.CqEntry;
 import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.cache.query.internal.QueryMonitor;
 import org.apache.geode.cache.query.internal.QueryObserver;
@@ -107,6 +108,7 @@ public class PrimaryKeyIndex extends AbstractIndex {
       observer.limitAppliedAtIndexLevel(this, limit, results);
       return;
     }
+
     switch (operator) {
       case OQLLexerTokenTypes.TOK_EQ: {
         if (key != null && key != QueryService.UNDEFINED) {
@@ -114,7 +116,7 @@ public class PrimaryKeyIndex extends AbstractIndex {
           if (entry != null) {
             Object value = entry.getValue();
             if (value != null) {
-              results.add(value);
+              addResultToResults(context, results, key, value);
             }
           }
         }
@@ -132,7 +134,7 @@ public class PrimaryKeyIndex extends AbstractIndex {
         while (iter.hasNext()) {
           // Check if query execution on this thread is canceled.
           QueryMonitor.isQueryExecutionCanceled();
-          results.add(iter.next());
+          addResultToResults(context, results, key, iter.next());
           if (limit != -1 && results.size() == limit) {
             observer.limitAppliedAtIndexLevel(this, limit, results);
             return;
@@ -200,8 +202,8 @@ public class PrimaryKeyIndex extends AbstractIndex {
                 ok = QueryUtils.applyCondition(iterOps, context);
               }
               if (ok) {
-                applyProjection(projAttrib, context, results, value, intermediateResults,
-                    isIntersection);
+                applyCqOrProjection(projAttrib, context, results, value, intermediateResults,
+                    isIntersection, key);
               }
             }
           }
@@ -228,8 +230,8 @@ public class PrimaryKeyIndex extends AbstractIndex {
               ok = QueryUtils.applyCondition(iterOps, context);
             }
             if (ok) {
-              applyProjection(projAttrib, context, results, val, intermediateResults,
-                  isIntersection);
+              applyCqOrProjection(projAttrib, context, results, val, intermediateResults,
+                  isIntersection, key);
             }
             if (limit != -1 && results.size() == limit) {
               observer.limitAppliedAtIndexLevel(this, limit, results);
@@ -255,6 +257,14 @@ public class PrimaryKeyIndex extends AbstractIndex {
     return true;
   }
 
+  private void addResultToResults(ExecutionContext context, Collection results, Object key,
+      Object result) {
+    if (context != null && context.isCqQueryContext()) {
+      results.add(new CqEntry(key, result));
+    } else {
+      results.add(result);
+    }
+  }
 
   protected InternalIndexStatistics createStats(String indexName) {
     return new PrimaryKeyIndexStatistics();
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndexTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndexTest.java
new file mode 100644
index 0000000..a84c997
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndexTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal.index;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.query.internal.CqEntry;
+import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
+import org.apache.geode.cache.query.internal.parse.OQLLexerTokenTypes;
+import org.apache.geode.internal.cache.LocalRegion;
+
+public class PrimaryKeyIndexTest {
+
+  private PrimaryKeyIndex index;
+  LocalRegion region = mock(LocalRegion.class);
+  ExecutionContext context = mock(ExecutionContext.class);
+
+  @Before
+  public void setup() {
+    when(region.getAttributes()).thenReturn(mock(RegionAttributes.class));
+    index = new PrimaryKeyIndex(null, null, region, null,
+        null, null, null, null, null, null);
+  }
+
+  @Test
+  public void applyCqOrProjectionWhenContextNotForCqShouldAddValueToResults() throws Exception {
+    when(context.getQuery()).thenReturn(mock(DefaultQuery.class));
+    List results = new LinkedList();
+    String value = "value";
+    index.applyCqOrProjection(null, context, results, value, null, false, "key");
+    assertEquals(value, results.get(0));
+  }
+
+  @Test
+  public void applyCqOrProjectionWhenContextIsCqShouldAddCqEntryToResults() throws Exception {
+    when(context.isCqQueryContext()).thenReturn(true);
+    when(context.getQuery()).thenReturn(mock(DefaultQuery.class));
+    List results = new LinkedList();
+    String value = "value";
+    index.applyCqOrProjection(null, context, results, value, null, false, "key");
+    assertTrue(results.get(0) instanceof CqEntry);
+  }
+
+  @Test
+  public void lockQueryWithoutProjectionWithMatchingResultForCqShouldReturnCorrectCqEvent()
+      throws Exception {
+    String value = "value";
+    when(context.isCqQueryContext()).thenReturn(true);
+    when(context.getQuery()).thenReturn(mock(DefaultQuery.class));
+    Region.Entry entry = mock(Region.Entry.class);
+    when(entry.getValue()).thenReturn(value);
+    when(region.accessEntry(eq("key"), anyBoolean())).thenReturn(entry);
+    List results = new LinkedList();
+    index.lockedQuery("key", OQLLexerTokenTypes.TOK_EQ, results, null, context);
+    assertTrue(results.get(0) instanceof CqEntry);
+  }
+
+  @Test
+  public void lockQueryWithoutProjectionWithMatchingResultShouldReturnCorrectValue()
+      throws Exception {
+    String value = "value";
+    when(context.getQuery()).thenReturn(mock(DefaultQuery.class));
+    Region.Entry entry = mock(Region.Entry.class);
+    when(entry.getValue()).thenReturn(value);
+    when(region.accessEntry(eq("key"), anyBoolean())).thenReturn(entry);
+    List results = new LinkedList();
+    index.lockedQuery("key", OQLLexerTokenTypes.TOK_EQ, results, null, context);
+    assertEquals(value, results.get(0));
+  }
+
+
+}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
index fd33d27..37f01d9 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
@@ -36,6 +36,7 @@ import org.apache.geode.cache.query.CqAttributesFactory;
 import org.apache.geode.cache.query.CqEvent;
 import org.apache.geode.cache.query.CqListener;
 import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.data.Portfolio;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
@@ -155,6 +156,25 @@ public class CQDistributedTest implements Serializable {
         .untilAsserted(() -> assertEquals(3, testListener.onEventCalls));
   }
 
+  @Test
+  public void cqExecuteWithInitialResultsWithValuesMatchingPrimaryKeyShouldNotThrowClassCastException()
+      throws Exception {
+    server.invoke(() -> {
+      Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+      ClusterStartupRule.getCache().getQueryService().createKeyIndex("PrimaryKeyIndex", "ID",
+          "/region");
+      regionOnServer.put(0, new Portfolio(0));
+      regionOnServer.put(1, new Portfolio(1));
+      regionOnServer.put(2, new Portfolio(2));
+      regionOnServer.put(3, new Portfolio(3));
+      regionOnServer.put(4, new Portfolio(4));
+    });
+
+    SelectResults results =
+        qs.newCq("Select * from /region where ID = 1", cqa).executeWithInitialResults();
+    assertEquals(1, results.size());
+  }
+
 
   private class TestCqListener implements CqListener, Serializable {
     public int onEventCalls = 0;