You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2018/09/05 18:37:53 UTC
[geode] branch develop updated: GEODE-5678: Primary key index
results are wrapped in CqEntry for cq (#2416)
This is an automated email from the ASF dual-hosted git repository.
jasonhuynh pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new a99013b GEODE-5678: Primary key index results are wrapped in CqEntry for cq (#2416)
a99013b is described below
commit a99013b6d8bc667ade1943c72105c9a111a867d4
Author: Jason Huynh <hu...@gmail.com>
AuthorDate: Wed Sep 5 11:37:47 2018 -0700
GEODE-5678: Primary key index results are wrapped in CqEntry for cq (#2416)
* Added tests and small refactor
---
.../cache/query/internal/index/AbstractIndex.java | 12 ++-
.../query/internal/index/CompactRangeIndex.java | 8 +-
.../cache/query/internal/index/HashIndex.java | 8 +-
.../query/internal/index/PrimaryKeyIndex.java | 22 +++--
.../query/internal/index/PrimaryKeyIndexTest.java | 98 ++++++++++++++++++++++
.../geode/cache/query/cq/CQDistributedTest.java | 20 +++++
6 files changed, 149 insertions(+), 19 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/AbstractIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/AbstractIndex.java
index 30b1401..67ae2c5 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/AbstractIndex.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/AbstractIndex.java
@@ -557,7 +557,6 @@ public abstract class AbstractIndex implements IndexProtocol {
private void addToResultsWithUnionOrIntersection(Collection results,
SelectResults intermediateResults, boolean isIntersection, Object value) {
-
value = verifyAndGetPdxDomainObject(value);
if (intermediateResults == null) {
@@ -628,6 +627,17 @@ public abstract class AbstractIndex implements IndexProtocol {
}
}
+ void applyCqOrProjection(List projAttrib, ExecutionContext context, Collection result,
+ Object iterValue, SelectResults intermediateResults, boolean isIntersection, Object key)
+ throws FunctionDomainException, TypeMismatchException, NameResolutionException,
+ QueryInvocationTargetException {
+ if (context != null && context.isCqQueryContext()) {
+ result.add(new CqEntry(key, iterValue));
+ } else {
+ applyProjection(projAttrib, context, result, iterValue, intermediateResults, isIntersection);
+ }
+ }
+
void applyProjection(List projAttrib, ExecutionContext context, Collection result,
Object iterValue, SelectResults intermediateResults, boolean isIntersection)
throws FunctionDomainException, TypeMismatchException, NameResolutionException,
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/CompactRangeIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/CompactRangeIndex.java
index 7b75b02..748e84a 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/CompactRangeIndex.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/CompactRangeIndex.java
@@ -793,12 +793,8 @@ public class CompactRangeIndex extends AbstractIndex {
ok = QueryUtils.applyCondition(iterOps, context);
}
if (ok) {
- if (context != null && context.isCqQueryContext()) {
- result.add(new CqEntry(indexEntry.getDeserializedRegionKey(), value));
- } else {
- applyProjection(projAttrib, context, result, value, intermediateResults,
- isIntersection);
- }
+ applyCqOrProjection(projAttrib, context, result, value, intermediateResults,
+ isIntersection, indexEntry.getDeserializedRegionKey());
if (verifyLimit(result, limit)) {
observer.limitAppliedAtIndexLevel(this, limit, result);
return;
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/HashIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/HashIndex.java
index 2c3ab64..044804c 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/HashIndex.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/HashIndex.java
@@ -49,7 +49,6 @@ import org.apache.geode.cache.query.internal.CompiledIteratorDef;
import org.apache.geode.cache.query.internal.CompiledPath;
import org.apache.geode.cache.query.internal.CompiledSortCriterion;
import org.apache.geode.cache.query.internal.CompiledValue;
-import org.apache.geode.cache.query.internal.CqEntry;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.query.internal.IndexInfo;
@@ -677,11 +676,8 @@ public class HashIndex extends AbstractIndex {
ok = QueryUtils.applyCondition(iterOps, context);
}
if (ok) {
- if (context != null && context.isCqQueryContext()) {
- result.add(new CqEntry(re.getKey(), value));
- } else {
- applyProjection(projAttrib, context, result, value, intermediateResults, isIntersection);
- }
+ applyCqOrProjection(projAttrib, context, result, value, intermediateResults,
+ isIntersection, re.getKey());
if (limit != -1 && result.size() == limit) {
observer.limitAppliedAtIndexLevel(this, limit, result);
return;
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndex.java
index af80e48..4103113 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndex.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndex.java
@@ -31,6 +31,7 @@ import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.TypeMismatchException;
import org.apache.geode.cache.query.internal.CompiledValue;
+import org.apache.geode.cache.query.internal.CqEntry;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.query.internal.QueryMonitor;
import org.apache.geode.cache.query.internal.QueryObserver;
@@ -107,6 +108,7 @@ public class PrimaryKeyIndex extends AbstractIndex {
observer.limitAppliedAtIndexLevel(this, limit, results);
return;
}
+
switch (operator) {
case OQLLexerTokenTypes.TOK_EQ: {
if (key != null && key != QueryService.UNDEFINED) {
@@ -114,7 +116,7 @@ public class PrimaryKeyIndex extends AbstractIndex {
if (entry != null) {
Object value = entry.getValue();
if (value != null) {
- results.add(value);
+ addResultToResults(context, results, key, value);
}
}
}
@@ -132,7 +134,7 @@ public class PrimaryKeyIndex extends AbstractIndex {
while (iter.hasNext()) {
// Check if query execution on this thread is canceled.
QueryMonitor.isQueryExecutionCanceled();
- results.add(iter.next());
+ addResultToResults(context, results, key, iter.next());
if (limit != -1 && results.size() == limit) {
observer.limitAppliedAtIndexLevel(this, limit, results);
return;
@@ -200,8 +202,8 @@ public class PrimaryKeyIndex extends AbstractIndex {
ok = QueryUtils.applyCondition(iterOps, context);
}
if (ok) {
- applyProjection(projAttrib, context, results, value, intermediateResults,
- isIntersection);
+ applyCqOrProjection(projAttrib, context, results, value, intermediateResults,
+ isIntersection, key);
}
}
}
@@ -228,8 +230,8 @@ public class PrimaryKeyIndex extends AbstractIndex {
ok = QueryUtils.applyCondition(iterOps, context);
}
if (ok) {
- applyProjection(projAttrib, context, results, val, intermediateResults,
- isIntersection);
+ applyCqOrProjection(projAttrib, context, results, val, intermediateResults,
+ isIntersection, key);
}
if (limit != -1 && results.size() == limit) {
observer.limitAppliedAtIndexLevel(this, limit, results);
@@ -255,6 +257,14 @@ public class PrimaryKeyIndex extends AbstractIndex {
return true;
}
+ private void addResultToResults(ExecutionContext context, Collection results, Object key,
+ Object result) {
+ if (context != null && context.isCqQueryContext()) {
+ results.add(new CqEntry(key, result));
+ } else {
+ results.add(result);
+ }
+ }
protected InternalIndexStatistics createStats(String indexName) {
return new PrimaryKeyIndexStatistics();
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndexTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndexTest.java
new file mode 100644
index 0000000..a84c997
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/internal/index/PrimaryKeyIndexTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.internal.index;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.query.internal.CqEntry;
+import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
+import org.apache.geode.cache.query.internal.parse.OQLLexerTokenTypes;
+import org.apache.geode.internal.cache.LocalRegion;
+
+public class PrimaryKeyIndexTest {
+
+ private PrimaryKeyIndex index;
+ LocalRegion region = mock(LocalRegion.class);
+ ExecutionContext context = mock(ExecutionContext.class);
+
+ @Before
+ public void setup() {
+ when(region.getAttributes()).thenReturn(mock(RegionAttributes.class));
+ index = new PrimaryKeyIndex(null, null, region, null,
+ null, null, null, null, null, null);
+ }
+
+ @Test
+ public void applyCqOrProjectionWhenContextNotForCqShouldAddValueToResults() throws Exception {
+ when(context.getQuery()).thenReturn(mock(DefaultQuery.class));
+ List results = new LinkedList();
+ String value = "value";
+ index.applyCqOrProjection(null, context, results, value, null, false, "key");
+ assertEquals(value, results.get(0));
+ }
+
+ @Test
+ public void applyCqOrProjectionWhenContextIsCqShouldAddCqEntryToResults() throws Exception {
+ when(context.isCqQueryContext()).thenReturn(true);
+ when(context.getQuery()).thenReturn(mock(DefaultQuery.class));
+ List results = new LinkedList();
+ String value = "value";
+ index.applyCqOrProjection(null, context, results, value, null, false, "key");
+ assertTrue(results.get(0) instanceof CqEntry);
+ }
+
+ @Test
+ public void lockQueryWithoutProjectionWithMatchingResultForCqShouldReturnCorrectCqEvent()
+ throws Exception {
+ String value = "value";
+ when(context.isCqQueryContext()).thenReturn(true);
+ when(context.getQuery()).thenReturn(mock(DefaultQuery.class));
+ Region.Entry entry = mock(Region.Entry.class);
+ when(entry.getValue()).thenReturn(value);
+ when(region.accessEntry(eq("key"), anyBoolean())).thenReturn(entry);
+ List results = new LinkedList();
+ index.lockedQuery("key", OQLLexerTokenTypes.TOK_EQ, results, null, context);
+ assertTrue(results.get(0) instanceof CqEntry);
+ }
+
+ @Test
+ public void lockQueryWithoutProjectionWithMatchingResultShouldReturnCorrectValue()
+ throws Exception {
+ String value = "value";
+ when(context.getQuery()).thenReturn(mock(DefaultQuery.class));
+ Region.Entry entry = mock(Region.Entry.class);
+ when(entry.getValue()).thenReturn(value);
+ when(region.accessEntry(eq("key"), anyBoolean())).thenReturn(entry);
+ List results = new LinkedList();
+ index.lockedQuery("key", OQLLexerTokenTypes.TOK_EQ, results, null, context);
+ assertEquals(value, results.get(0));
+ }
+
+
+}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
index fd33d27..37f01d9 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
@@ -36,6 +36,7 @@ import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
@@ -155,6 +156,25 @@ public class CQDistributedTest implements Serializable {
.untilAsserted(() -> assertEquals(3, testListener.onEventCalls));
}
+ @Test
+ public void cqExecuteWithInitialResultsWithValuesMatchingPrimaryKeyShouldNotThrowClassCastException()
+ throws Exception {
+ server.invoke(() -> {
+ Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+ ClusterStartupRule.getCache().getQueryService().createKeyIndex("PrimaryKeyIndex", "ID",
+ "/region");
+ regionOnServer.put(0, new Portfolio(0));
+ regionOnServer.put(1, new Portfolio(1));
+ regionOnServer.put(2, new Portfolio(2));
+ regionOnServer.put(3, new Portfolio(3));
+ regionOnServer.put(4, new Portfolio(4));
+ });
+
+ SelectResults results =
+ qs.newCq("Select * from /region where ID = 1", cqa).executeWithInitialResults();
+ assertEquals(1, results.size());
+ }
+
private class TestCqListener implements CqListener, Serializable {
public int onEventCalls = 0;