You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jj...@apache.org on 2019/06/04 15:16:03 UTC
[geode] branch develop updated: GEODE-6806: Ignore LIMIT in
intermediate steps (#3629)
This is an automated email from the ASF dual-hosted git repository.
jjramos pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 5afd611 GEODE-6806: Ignore LIMIT in intermediate steps (#3629)
5afd611 is described below
commit 5afd6115e39438e89ef5ee9ce21622a39e44abfe
Author: Juan José Ramos <ju...@users.noreply.github.com>
AuthorDate: Tue Jun 4 16:15:50 2019 +0100
GEODE-6806: Ignore LIMIT in intermediate steps (#3629)
When joining multiple regions with indexes, only apply the LIMIT
clause in intermediate execution steps if the proper flag has been set
in the query context.
- Fixed minor warnings.
- Replaced `junit.Assert` by `assertj`.
- Added tests to validate query results of multiple joins.
---
.../cache/query/JoinQueriesIntegrationTest.java | 361 ++++++++++++++++++---
.../geode/cache/query/internal/QueryUtils.java | 8 +-
2 files changed, 316 insertions(+), 53 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/JoinQueriesIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/JoinQueriesIntegrationTest.java
index 9d75234..417a8dd4 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/JoinQueriesIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/JoinQueriesIntegrationTest.java
@@ -14,12 +14,16 @@
*/
package org.apache.geode.cache.query;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
+import java.util.Calendar;
+import java.util.Date;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -27,26 +31,42 @@ import org.junit.runner.RunWith;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.PdxInstance;
import org.apache.geode.test.junit.categories.OQLQueryTest;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
-@Category({OQLQueryTest.class})
+@Category(OQLQueryTest.class)
@RunWith(JUnitParamsRunner.class)
public class JoinQueriesIntegrationTest {
+ @Rule
+ public ServerStarterRule serverRule = new ServerStarterRule().withAutoStart();
+
+ @SuppressWarnings("unused")
public static class Customer implements Serializable {
- public int pkid;
- public int id;
- public int joinId;
- public String name;
+ int pkid;
+ int id;
+ int joinId;
+ String name;
- public Customer(int pkid, int id) {
- this.pkid = pkid;
- this.id = id;
- this.joinId = id;
- this.name = "name" + pkid;
+ public int getPkid() {
+ return pkid;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public int getJoinId() {
+ return joinId;
+ }
+
+ public String getName() {
+ return name;
}
- public Customer(int pkid, int id, int joinId) {
+ Customer(int pkid, int id, int joinId) {
this.pkid = pkid;
this.id = id;
this.joinId = joinId;
@@ -54,11 +74,82 @@ public class JoinQueriesIntegrationTest {
}
public String toString() {
- return "Customer pkid = " + pkid + ", id: " + id + " name:" + name + " joinId: " + joinId;
+ return "Customer pkid = " + getPkid() + ", id: " + getId() + " name:" + getName()
+ + " joinId: " + getJoinId();
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public static class Order implements Serializable {
+ private final String orderId;
+ private final Integer version;
+
+ public String getOrderId() {
+ return orderId;
+ }
+
+ public Integer getVersion() {
+ return version;
+ }
+
+ Order(String orderId, Integer version) {
+ this.orderId = orderId;
+ this.version = version;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ public static class ValidationIssue implements Serializable {
+ private final String issueId;
+ private final Date createdTime;
+
+ public String getIssueId() {
+ return issueId;
+ }
+
+ public Date getCreatedTime() {
+ return createdTime;
+ }
+
+ ValidationIssue(String issueId, Date createdTime) {
+ this.issueId = issueId;
+ this.createdTime = createdTime;
}
}
+ @SuppressWarnings("unused")
+ public static class OrderValidationIssueXRef implements Serializable {
+ private final String referenceOrderId;
+ private final String referenceIssueId;
+ private final String validationIssueXRefID;
+ private final Integer referenceOrderVersion;
+
+ public String getReferenceOrderId() {
+ return referenceOrderId;
+ }
+
+ public String getReferenceIssueId() {
+ return referenceIssueId;
+ }
+
+ public String getValidationIssueXRefID() {
+ return validationIssueXRefID;
+ }
+
+ public Integer getReferenceOrderVersion() {
+ return referenceOrderVersion;
+ }
+
+ OrderValidationIssueXRef(String validationIssueXRefID, String referenceIssueId,
+ String referenceOrderId, Integer referenceOrderVersion) {
+ this.validationIssueXRefID = validationIssueXRefID;
+ this.referenceIssueId = referenceIssueId;
+ this.referenceOrderId = referenceOrderId;
+ this.referenceOrderVersion = referenceOrderVersion;
+ }
+ }
+ @SuppressWarnings("unused")
private static Object[] getQueryStrings() {
return new Object[] {
new Object[] {
@@ -69,44 +160,8 @@ public class JoinQueriesIntegrationTest {
22}};
}
- @Test
- @Parameters(method = "getQueryStrings")
- public void testJoinTwoRegions(String queryString, int expectedResultSize) throws Exception {
- Cache cache = CacheUtils.getCache();
- try {
- Region region1 =
- cache.createRegionFactory().setDataPolicy(DataPolicy.REPLICATE).create("region1");
- Region region2 =
- cache.createRegionFactory().setDataPolicy(DataPolicy.REPLICATE).create("region2");
-
- populateRegionWithData(region1, region2);
-
- QueryService queryService = cache.getQueryService();
-
- SelectResults results = (SelectResults) queryService.newQuery(queryString).execute();
- int resultsWithoutIndex = results.size();
- assertEquals(expectedResultSize, resultsWithoutIndex);
-
- queryService.createIndex("pkidregion1", "p.pkid", "/region1 p");
- queryService.createIndex("pkidregion2", "p.pkid", "/region2 p");
- queryService.createIndex("indexIDRegion2", "p.id", "/region2 p");
- queryService.createIndex("indexIDRegion1", "p.id", "/region1 p");
- queryService.createIndex("joinIdregion1", "p.joinId", "/region1 p");
- queryService.createIndex("joinIdregion2", "p.joinId", "/region2 p");
- queryService.createIndex("nameIndex", "p.name", "/region2 p");
-
- results = (SelectResults) queryService.newQuery(queryString).execute();
- int resultsSizeWithIndex = results.size();
- assertEquals(expectedResultSize, resultsWithoutIndex);
- assertEquals(resultsSizeWithIndex, resultsWithoutIndex);
- } finally {
- cache.getRegion("region1").destroyRegion();
- cache.getRegion("region2").destroyRegion();
- }
-
- }
-
- private void populateRegionWithData(Region region1, Region region2) {
+ private void populateCustomerRegionsWithData(Region<Integer, Customer> region1,
+ Region<Integer, Customer> region2) {
for (int i = 1; i < 11; i++) {
if (i == 1 || i == 3 || i == 8 || i == 2 || i == 5) {
region1.put(i, new Customer(1, 1, 1));
@@ -120,4 +175,208 @@ public class JoinQueriesIntegrationTest {
}
}
}
+
+ @Test
+ @Parameters(method = "getQueryStrings")
+ public void testJoinTwoRegions(String queryString, int expectedResultSize) throws Exception {
+ InternalCache cache = serverRule.getCache();
+ QueryService queryService = cache.getQueryService();
+ Region<Integer, Customer> region1 = cache.<Integer, Customer>createRegionFactory()
+ .setDataPolicy(DataPolicy.REPLICATE).create("region1");
+ Region<Integer, Customer> region2 = cache.<Integer, Customer>createRegionFactory()
+ .setDataPolicy(DataPolicy.REPLICATE).create("region2");
+ populateCustomerRegionsWithData(region1, region2);
+
+ SelectResults results = (SelectResults) queryService.newQuery(queryString).execute();
+ int resultsWithoutIndex = results.size();
+ assertThat(resultsWithoutIndex).isEqualTo(expectedResultSize);
+
+ queryService.createIndex("pkidregion1", "p.pkid", "/region1 p");
+ queryService.createIndex("pkidregion2", "p.pkid", "/region2 p");
+ queryService.createIndex("indexIDRegion2", "p.id", "/region2 p");
+ queryService.createIndex("indexIDRegion1", "p.id", "/region1 p");
+ queryService.createIndex("joinIdregion1", "p.joinId", "/region1 p");
+ queryService.createIndex("joinIdregion2", "p.joinId", "/region2 p");
+ queryService.createIndex("nameIndex", "p.name", "/region2 p");
+
+ results = (SelectResults) queryService.newQuery(queryString).execute();
+ int resultsSizeWithIndex = results.size();
+ assertThat(resultsWithoutIndex).isEqualTo(expectedResultSize);
+ assertThat(resultsWithoutIndex).isEqualTo(resultsSizeWithIndex);
+ }
+
+ private void populateTripleJointRegionsWithSerializables(
+ int expectedMatches, int extraEntitiesPerRegion,
+ Region<String, Object> orderRegion, Region<String, Object> validationIssueRegion,
+ Region<String, Object> validationIssueXRefRegion) {
+ for (int i = 0; i < expectedMatches; i++) {
+ orderRegion.put("orderId_" + i, new Order("orderId_" + i, i));
+ validationIssueRegion.put("validationIssueID_" + i,
+ new ValidationIssue("issueId_" + i, Calendar.getInstance().getTime()));
+ validationIssueXRefRegion.put("validationIssueXRefID_" + i, new OrderValidationIssueXRef(
+ "validationIssueXRefID_" + i, "issueId_" + i, "orderId_" + i, i));
+ }
+
+ for (int i = 0; i < extraEntitiesPerRegion; i++) {
+ orderRegion.put("orderId#" + i, new Order("orderId#" + i, i));
+ validationIssueRegion.put("referenceIssueId#" + i,
+ new ValidationIssue("referenceIssueId#" + i, Calendar.getInstance().getTime()));
+ validationIssueXRefRegion.put("validationIssueXRefID#" + i, new OrderValidationIssueXRef(
+ "validationIssueXRefID#" + i, "validationIssueID2#" + i, "orderId#2" + i, i));
+ }
+ }
+
+ private void populateTripleJointRegionsWithPdxInstances(
+ int expectedMatches, int extraEntitiesPerRegion,
+ Cache cache, Region<String, Object> orderRegion, Region<String, Object> validationIssueRegion,
+ Region<String, Object> validationIssueXRefRegion) {
+ for (int i = 0; i < expectedMatches; i++) {
+ PdxInstance orderPdx = cache
+ .createPdxInstanceFactory("org.apache.geode.test.Order")
+ .writeString("orderId", "orderId_" + i)
+ .writeInt("version", i)
+ .create();
+ PdxInstance validationIssuePdx = cache
+ .createPdxInstanceFactory("org.apache.geode.test.ValidationIssue")
+ .writeString("issueId", "issueId_" + i)
+ .writeDate("createdTime", Calendar.getInstance().getTime())
+ .create();
+ PdxInstance validationIssueXRefPdx = cache
+ .createPdxInstanceFactory("org.apache.geode.test.OrderValidationIssueXRef")
+ .writeString("validationIssueXRefID", "validationIssueXRefID_" + i)
+ .writeString("referenceIssueId", "issueId_" + i)
+ .writeString("referenceOrderId", "orderId_" + i)
+ .writeInt("referenceOrderVersion", i)
+ .create();
+
+ orderRegion.put("orderId_" + i, orderPdx);
+ validationIssueRegion.put("issueId_" + i, validationIssuePdx);
+ validationIssueXRefRegion.put("validationIssueXRefID_" + i, validationIssueXRefPdx);
+ }
+
+ for (int i = 0; i < extraEntitiesPerRegion; i++) {
+ PdxInstance orderPdx = cache
+ .createPdxInstanceFactory("org.apache.geode.test.Order")
+ .writeString("orderId", "orderId#" + i)
+ .writeInt("version", i)
+ .create();
+ PdxInstance validationIssuePdx = cache
+ .createPdxInstanceFactory("org.apache.geode.test.ValidationIssue")
+ .writeString("referenceIssueId", "referenceIssueId#" + i)
+ .writeDate("createdTime", Calendar.getInstance().getTime())
+ .create();
+ PdxInstance validationIssueXRefPdx = cache
+ .createPdxInstanceFactory("org.apache.geode.test.OrderValidationIssueXRef")
+ .writeString("validationIssueXRefID", "validationIssueXRefID#" + i)
+ .writeString("referenceIssueId", "validationIssueID2#" + i)
+ .writeString("referenceOrderId", "orderId#2" + i)
+ .writeInt("referenceOrderVersion", i)
+ .create();
+
+ orderRegion.put("orderId#" + i, orderPdx);
+ validationIssueRegion.put("issueId#" + i, validationIssuePdx);
+ validationIssueXRefRegion.put("validationIssueXRefID#" + i, validationIssueXRefPdx);
+ }
+ }
+
+ @Test
+ @Parameters({"true", "false"})
+ @TestCaseName("{method} - Using PDX: {params}")
+ public void joiningThreeRegionsWhenIntermediateResultSizeIsHigherThanLimitClauseShouldNotTrimResults(
+ boolean usePdx) throws Exception {
+ int matches = 10;
+ int extraEntitiesPerRegion = 25;
+ InternalCache cache = serverRule.getCache();
+ QueryService queryService = cache.getQueryService();
+ String queryString = "SELECT issue.issueId, issue.createdTime, o.orderId, o.version "
+ + "FROM /ValidationIssue issue, /OrderValidationIssueXRef xRef, /Order o "
+ + "WHERE "
+ + "issue.issueId = xRef.referenceIssueId "
+ + "AND "
+ + "xRef.referenceOrderId = o.orderId "
+ + "AND "
+ + "xRef.referenceOrderVersion = o.version ";
+
+ // Create Regions
+ Region<String, Object> orderRegion = cache.<String, Object>createRegionFactory()
+ .setDataPolicy(DataPolicy.REPLICATE).create("Order");
+ Region<String, Object> validationIssueRegion = cache.<String, Object>createRegionFactory()
+ .setDataPolicy(DataPolicy.REPLICATE).create("ValidationIssue");
+ Region<String, Object> validationIssueXRefRegion = cache.<String, Object>createRegionFactory()
+ .setDataPolicy(DataPolicy.REPLICATE).create("OrderValidationIssueXRef");
+
+ if (!usePdx) {
+ populateTripleJointRegionsWithSerializables(matches, extraEntitiesPerRegion, orderRegion,
+ validationIssueRegion, validationIssueXRefRegion);
+ } else {
+ populateTripleJointRegionsWithPdxInstances(matches, extraEntitiesPerRegion, cache,
+ orderRegion,
+ validationIssueRegion, validationIssueXRefRegion);
+ }
+
+ SelectResults baseResults = (SelectResults) queryService.newQuery(queryString).execute();
+ SelectResults resultsWithLimitOne =
+ (SelectResults) queryService.newQuery(queryString + "LIMIT 2").execute();
+ SelectResults resultsWithLimitTwo =
+ (SelectResults) queryService.newQuery(queryString + "LIMIT 5").execute();
+ assertThat(baseResults.size()).isEqualTo(matches);
+ assertThat(resultsWithLimitOne.size()).isEqualTo(2);
+ assertThat(resultsWithLimitTwo.size()).isEqualTo(5);
+ }
+
+
+ @Test
+ @Parameters({"true", "false"})
+ @TestCaseName("{method} - Using PDX: {params}")
+ public void joiningThreeRegionsWithIndexesWhenIntermediateResultSizeIsHigherThanLimitClauseShouldNotTrimResults(
+ boolean usePdx) throws Exception {
+ int matches = 10;
+ int extraEntitiesPerRegion = 25;
+ InternalCache cache = serverRule.getCache();
+ QueryService queryService = cache.getQueryService();
+ String queryString = "SELECT issue.issueId, issue.createdTime, o.orderId, o.version "
+ + "FROM /ValidationIssue issue, /OrderValidationIssueXRef xRef, /Order o "
+ + "WHERE "
+ + "issue.issueId = xRef.referenceIssueId "
+ + "AND "
+ + "xRef.referenceOrderId = o.orderId "
+ + "AND "
+ + "xRef.referenceOrderVersion = o.version ";
+
+ // Create Regions
+ Region<String, Object> orderRegion = cache.<String, Object>createRegionFactory()
+ .setDataPolicy(DataPolicy.REPLICATE).create("Order");
+ Region<String, Object> validationIssueRegion = cache.<String, Object>createRegionFactory()
+ .setDataPolicy(DataPolicy.REPLICATE).create("ValidationIssue");
+ Region<String, Object> validationIssueXRefRegion = cache.<String, Object>createRegionFactory()
+ .setDataPolicy(DataPolicy.REPLICATE).create("OrderValidationIssueXRef");
+
+ // Create Indexes
+ cache.getQueryService().createIndex("order_orderID", "orderId", "/Order", null);
+ cache.getQueryService().createIndex("validationIssue_issueID", "issueId", "/ValidationIssue",
+ null);
+ cache.getQueryService().createIndex("orderValidationIssueXRef_referenceOrderId",
+ "referenceOrderId", "/OrderValidationIssueXRef", null);
+ cache.getQueryService().createIndex("orderValidationIssueXRef_referenceIssueId",
+ "referenceIssueId", "/OrderValidationIssueXRef", null);
+
+ if (!usePdx) {
+ populateTripleJointRegionsWithSerializables(matches, extraEntitiesPerRegion, orderRegion,
+ validationIssueRegion, validationIssueXRefRegion);
+ } else {
+ populateTripleJointRegionsWithPdxInstances(matches, extraEntitiesPerRegion, cache,
+ orderRegion,
+ validationIssueRegion, validationIssueXRefRegion);
+ }
+
+ SelectResults baseResultsWithIndexes =
+ (SelectResults) queryService.newQuery(queryString).execute();
+ SelectResults resultsWithLimitOneWithIndexes =
+ (SelectResults) queryService.newQuery(queryString + "LIMIT 2").execute();
+ SelectResults resultsWithLimitTwoWithIndexes =
+ (SelectResults) queryService.newQuery(queryString + "LIMIT 5").execute();
+ assertThat(baseResultsWithIndexes.size()).isEqualTo(matches);
+ assertThat(resultsWithLimitOneWithIndexes.size()).isEqualTo(2);
+ assertThat(resultsWithLimitTwoWithIndexes.size()).isEqualTo(5);
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java
index 7d04168..33a0281 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java
@@ -486,11 +486,15 @@ public class QueryUtils {
QueryInvocationTargetException {
int resultSize = values[level].length;
- int limit = getLimitValue(context);
- // stops recursion if limit has already been met
+ // We do not know if the first X results might or might not fulfill all operands.
+ Boolean applyLimit = (Boolean) context.cacheGet(CompiledValue.CAN_APPLY_LIMIT_AT_INDEX);
+ int limit = (applyLimit != null && applyLimit) ? getLimitValue(context) : -1;
+
+ // Stops recursion if limit has already been met AND limit can be applied to index.
if (limit != -1 && result.size() >= limit) {
return;
}
+
for (int j = 0; j < resultSize; ++j) {
// Check if query execution on this thread is canceled.
QueryMonitor.throwExceptionIfQueryOnCurrentThreadIsCanceled();