You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jj...@apache.org on 2019/06/04 15:16:03 UTC

[geode] branch develop updated: GEODE-6806: Ignore LIMIT in intermediate steps (#3629)

This is an automated email from the ASF dual-hosted git repository.

jjramos pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5afd611  GEODE-6806: Ignore LIMIT in intermediate steps (#3629)
5afd611 is described below

commit 5afd6115e39438e89ef5ee9ce21622a39e44abfe
Author: Juan José Ramos <ju...@users.noreply.github.com>
AuthorDate: Tue Jun 4 16:15:50 2019 +0100

    GEODE-6806: Ignore LIMIT in intermediate steps (#3629)
    
    When joining multiple regions with indexes, only apply the LIMIT
    clause in intermediate execution steps if the proper flag has been set
    in the query context.
    
    - Fixed minor warnings.
    - Replaced `junit.Assert` by `assertj`.
    - Added tests to validate query results of multiple joins.
---
 .../cache/query/JoinQueriesIntegrationTest.java    | 361 ++++++++++++++++++---
 .../geode/cache/query/internal/QueryUtils.java     |   8 +-
 2 files changed, 316 insertions(+), 53 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/JoinQueriesIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/JoinQueriesIntegrationTest.java
index 9d75234..417a8dd4 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/JoinQueriesIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/JoinQueriesIntegrationTest.java
@@ -14,12 +14,16 @@
  */
 package org.apache.geode.cache.query;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.Serializable;
+import java.util.Calendar;
+import java.util.Date;
 
 import junitparams.JUnitParamsRunner;
 import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -27,26 +31,42 @@ import org.junit.runner.RunWith;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.test.junit.categories.OQLQueryTest;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
 
-@Category({OQLQueryTest.class})
+@Category(OQLQueryTest.class)
 @RunWith(JUnitParamsRunner.class)
 public class JoinQueriesIntegrationTest {
 
+  @Rule
+  public ServerStarterRule serverRule = new ServerStarterRule().withAutoStart();
+
+  @SuppressWarnings("unused")
   public static class Customer implements Serializable {
-    public int pkid;
-    public int id;
-    public int joinId;
-    public String name;
+    int pkid;
+    int id;
+    int joinId;
+    String name;
 
-    public Customer(int pkid, int id) {
-      this.pkid = pkid;
-      this.id = id;
-      this.joinId = id;
-      this.name = "name" + pkid;
+    public int getPkid() {
+      return pkid;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    public int getJoinId() {
+      return joinId;
+    }
+
+    public String getName() {
+      return name;
     }
 
-    public Customer(int pkid, int id, int joinId) {
+    Customer(int pkid, int id, int joinId) {
       this.pkid = pkid;
       this.id = id;
       this.joinId = joinId;
@@ -54,11 +74,82 @@ public class JoinQueriesIntegrationTest {
     }
 
     public String toString() {
-      return "Customer pkid = " + pkid + ", id: " + id + " name:" + name + " joinId: " + joinId;
+      return "Customer pkid = " + getPkid() + ", id: " + getId() + " name:" + getName()
+          + " joinId: " + getJoinId();
+    }
+  }
+
+  @SuppressWarnings("unused")
+  public static class Order implements Serializable {
+    private final String orderId;
+    private final Integer version;
+
+    public String getOrderId() {
+      return orderId;
+    }
+
+    public Integer getVersion() {
+      return version;
+    }
+
+    Order(String orderId, Integer version) {
+      this.orderId = orderId;
+      this.version = version;
+    }
+  }
+
+  @SuppressWarnings("unused")
+  public static class ValidationIssue implements Serializable {
+    private final String issueId;
+    private final Date createdTime;
+
+    public String getIssueId() {
+      return issueId;
+    }
+
+    public Date getCreatedTime() {
+      return createdTime;
+    }
+
+    ValidationIssue(String issueId, Date createdTime) {
+      this.issueId = issueId;
+      this.createdTime = createdTime;
     }
   }
 
+  @SuppressWarnings("unused")
+  public static class OrderValidationIssueXRef implements Serializable {
+    private final String referenceOrderId;
+    private final String referenceIssueId;
+    private final String validationIssueXRefID;
+    private final Integer referenceOrderVersion;
+
+    public String getReferenceOrderId() {
+      return referenceOrderId;
+    }
+
+    public String getReferenceIssueId() {
+      return referenceIssueId;
+    }
+
+    public String getValidationIssueXRefID() {
+      return validationIssueXRefID;
+    }
+
+    public Integer getReferenceOrderVersion() {
+      return referenceOrderVersion;
+    }
+
+    OrderValidationIssueXRef(String validationIssueXRefID, String referenceIssueId,
+        String referenceOrderId, Integer referenceOrderVersion) {
+      this.validationIssueXRefID = validationIssueXRefID;
+      this.referenceIssueId = referenceIssueId;
+      this.referenceOrderId = referenceOrderId;
+      this.referenceOrderVersion = referenceOrderVersion;
+    }
+  }
 
+  @SuppressWarnings("unused")
   private static Object[] getQueryStrings() {
     return new Object[] {
         new Object[] {
@@ -69,44 +160,8 @@ public class JoinQueriesIntegrationTest {
             22}};
   }
 
-  @Test
-  @Parameters(method = "getQueryStrings")
-  public void testJoinTwoRegions(String queryString, int expectedResultSize) throws Exception {
-    Cache cache = CacheUtils.getCache();
-    try {
-      Region region1 =
-          cache.createRegionFactory().setDataPolicy(DataPolicy.REPLICATE).create("region1");
-      Region region2 =
-          cache.createRegionFactory().setDataPolicy(DataPolicy.REPLICATE).create("region2");
-
-      populateRegionWithData(region1, region2);
-
-      QueryService queryService = cache.getQueryService();
-
-      SelectResults results = (SelectResults) queryService.newQuery(queryString).execute();
-      int resultsWithoutIndex = results.size();
-      assertEquals(expectedResultSize, resultsWithoutIndex);
-
-      queryService.createIndex("pkidregion1", "p.pkid", "/region1 p");
-      queryService.createIndex("pkidregion2", "p.pkid", "/region2 p");
-      queryService.createIndex("indexIDRegion2", "p.id", "/region2 p");
-      queryService.createIndex("indexIDRegion1", "p.id", "/region1 p");
-      queryService.createIndex("joinIdregion1", "p.joinId", "/region1 p");
-      queryService.createIndex("joinIdregion2", "p.joinId", "/region2 p");
-      queryService.createIndex("nameIndex", "p.name", "/region2 p");
-
-      results = (SelectResults) queryService.newQuery(queryString).execute();
-      int resultsSizeWithIndex = results.size();
-      assertEquals(expectedResultSize, resultsWithoutIndex);
-      assertEquals(resultsSizeWithIndex, resultsWithoutIndex);
-    } finally {
-      cache.getRegion("region1").destroyRegion();
-      cache.getRegion("region2").destroyRegion();
-    }
-
-  }
-
-  private void populateRegionWithData(Region region1, Region region2) {
+  private void populateCustomerRegionsWithData(Region<Integer, Customer> region1,
+      Region<Integer, Customer> region2) {
     for (int i = 1; i < 11; i++) {
       if (i == 1 || i == 3 || i == 8 || i == 2 || i == 5) {
         region1.put(i, new Customer(1, 1, 1));
@@ -120,4 +175,208 @@ public class JoinQueriesIntegrationTest {
       }
     }
   }
+
+  @Test
+  @Parameters(method = "getQueryStrings")
+  public void testJoinTwoRegions(String queryString, int expectedResultSize) throws Exception {
+    InternalCache cache = serverRule.getCache();
+    QueryService queryService = cache.getQueryService();
+    Region<Integer, Customer> region1 = cache.<Integer, Customer>createRegionFactory()
+        .setDataPolicy(DataPolicy.REPLICATE).create("region1");
+    Region<Integer, Customer> region2 = cache.<Integer, Customer>createRegionFactory()
+        .setDataPolicy(DataPolicy.REPLICATE).create("region2");
+    populateCustomerRegionsWithData(region1, region2);
+
+    SelectResults results = (SelectResults) queryService.newQuery(queryString).execute();
+    int resultsWithoutIndex = results.size();
+    assertThat(resultsWithoutIndex).isEqualTo(expectedResultSize);
+
+    queryService.createIndex("pkidregion1", "p.pkid", "/region1 p");
+    queryService.createIndex("pkidregion2", "p.pkid", "/region2 p");
+    queryService.createIndex("indexIDRegion2", "p.id", "/region2 p");
+    queryService.createIndex("indexIDRegion1", "p.id", "/region1 p");
+    queryService.createIndex("joinIdregion1", "p.joinId", "/region1 p");
+    queryService.createIndex("joinIdregion2", "p.joinId", "/region2 p");
+    queryService.createIndex("nameIndex", "p.name", "/region2 p");
+
+    results = (SelectResults) queryService.newQuery(queryString).execute();
+    int resultsSizeWithIndex = results.size();
+    assertThat(resultsWithoutIndex).isEqualTo(expectedResultSize);
+    assertThat(resultsWithoutIndex).isEqualTo(resultsSizeWithIndex);
+  }
+
+  private void populateTripleJointRegionsWithSerializables(
+      int expectedMatches, int extraEntitiesPerRegion,
+      Region<String, Object> orderRegion, Region<String, Object> validationIssueRegion,
+      Region<String, Object> validationIssueXRefRegion) {
+    for (int i = 0; i < expectedMatches; i++) {
+      orderRegion.put("orderId_" + i, new Order("orderId_" + i, i));
+      validationIssueRegion.put("validationIssueID_" + i,
+          new ValidationIssue("issueId_" + i, Calendar.getInstance().getTime()));
+      validationIssueXRefRegion.put("validationIssueXRefID_" + i, new OrderValidationIssueXRef(
+          "validationIssueXRefID_" + i, "issueId_" + i, "orderId_" + i, i));
+    }
+
+    for (int i = 0; i < extraEntitiesPerRegion; i++) {
+      orderRegion.put("orderId#" + i, new Order("orderId#" + i, i));
+      validationIssueRegion.put("referenceIssueId#" + i,
+          new ValidationIssue("referenceIssueId#" + i, Calendar.getInstance().getTime()));
+      validationIssueXRefRegion.put("validationIssueXRefID#" + i, new OrderValidationIssueXRef(
+          "validationIssueXRefID#" + i, "validationIssueID2#" + i, "orderId#2" + i, i));
+    }
+  }
+
+  private void populateTripleJointRegionsWithPdxInstances(
+      int expectedMatches, int extraEntitiesPerRegion,
+      Cache cache, Region<String, Object> orderRegion, Region<String, Object> validationIssueRegion,
+      Region<String, Object> validationIssueXRefRegion) {
+    for (int i = 0; i < expectedMatches; i++) {
+      PdxInstance orderPdx = cache
+          .createPdxInstanceFactory("org.apache.geode.test.Order")
+          .writeString("orderId", "orderId_" + i)
+          .writeInt("version", i)
+          .create();
+      PdxInstance validationIssuePdx = cache
+          .createPdxInstanceFactory("org.apache.geode.test.ValidationIssue")
+          .writeString("issueId", "issueId_" + i)
+          .writeDate("createdTime", Calendar.getInstance().getTime())
+          .create();
+      PdxInstance validationIssueXRefPdx = cache
+          .createPdxInstanceFactory("org.apache.geode.test.OrderValidationIssueXRef")
+          .writeString("validationIssueXRefID", "validationIssueXRefID_" + i)
+          .writeString("referenceIssueId", "issueId_" + i)
+          .writeString("referenceOrderId", "orderId_" + i)
+          .writeInt("referenceOrderVersion", i)
+          .create();
+
+      orderRegion.put("orderId_" + i, orderPdx);
+      validationIssueRegion.put("issueId_" + i, validationIssuePdx);
+      validationIssueXRefRegion.put("validationIssueXRefID_" + i, validationIssueXRefPdx);
+    }
+
+    for (int i = 0; i < extraEntitiesPerRegion; i++) {
+      PdxInstance orderPdx = cache
+          .createPdxInstanceFactory("org.apache.geode.test.Order")
+          .writeString("orderId", "orderId#" + i)
+          .writeInt("version", i)
+          .create();
+      PdxInstance validationIssuePdx = cache
+          .createPdxInstanceFactory("org.apache.geode.test.ValidationIssue")
+          .writeString("referenceIssueId", "referenceIssueId#" + i)
+          .writeDate("createdTime", Calendar.getInstance().getTime())
+          .create();
+      PdxInstance validationIssueXRefPdx = cache
+          .createPdxInstanceFactory("org.apache.geode.test.OrderValidationIssueXRef")
+          .writeString("validationIssueXRefID", "validationIssueXRefID#" + i)
+          .writeString("referenceIssueId", "validationIssueID2#" + i)
+          .writeString("referenceOrderId", "orderId#2" + i)
+          .writeInt("referenceOrderVersion", i)
+          .create();
+
+      orderRegion.put("orderId#" + i, orderPdx);
+      validationIssueRegion.put("issueId#" + i, validationIssuePdx);
+      validationIssueXRefRegion.put("validationIssueXRefID#" + i, validationIssueXRefPdx);
+    }
+  }
+
+  @Test
+  @Parameters({"true", "false"})
+  @TestCaseName("{method} - Using PDX: {params}")
+  public void joiningThreeRegionsWhenIntermediateResultSizeIsHigherThanLimitClauseShouldNotTrimResults(
+      boolean usePdx) throws Exception {
+    int matches = 10;
+    int extraEntitiesPerRegion = 25;
+    InternalCache cache = serverRule.getCache();
+    QueryService queryService = cache.getQueryService();
+    String queryString = "SELECT issue.issueId, issue.createdTime, o.orderId, o.version "
+        + "FROM /ValidationIssue issue, /OrderValidationIssueXRef xRef, /Order o "
+        + "WHERE "
+        + "issue.issueId = xRef.referenceIssueId "
+        + "AND "
+        + "xRef.referenceOrderId = o.orderId "
+        + "AND "
+        + "xRef.referenceOrderVersion = o.version ";
+
+    // Create Regions
+    Region<String, Object> orderRegion = cache.<String, Object>createRegionFactory()
+        .setDataPolicy(DataPolicy.REPLICATE).create("Order");
+    Region<String, Object> validationIssueRegion = cache.<String, Object>createRegionFactory()
+        .setDataPolicy(DataPolicy.REPLICATE).create("ValidationIssue");
+    Region<String, Object> validationIssueXRefRegion = cache.<String, Object>createRegionFactory()
+        .setDataPolicy(DataPolicy.REPLICATE).create("OrderValidationIssueXRef");
+
+    if (!usePdx) {
+      populateTripleJointRegionsWithSerializables(matches, extraEntitiesPerRegion, orderRegion,
+          validationIssueRegion, validationIssueXRefRegion);
+    } else {
+      populateTripleJointRegionsWithPdxInstances(matches, extraEntitiesPerRegion, cache,
+          orderRegion,
+          validationIssueRegion, validationIssueXRefRegion);
+    }
+
+    SelectResults baseResults = (SelectResults) queryService.newQuery(queryString).execute();
+    SelectResults resultsWithLimitOne =
+        (SelectResults) queryService.newQuery(queryString + "LIMIT 2").execute();
+    SelectResults resultsWithLimitTwo =
+        (SelectResults) queryService.newQuery(queryString + "LIMIT 5").execute();
+    assertThat(baseResults.size()).isEqualTo(matches);
+    assertThat(resultsWithLimitOne.size()).isEqualTo(2);
+    assertThat(resultsWithLimitTwo.size()).isEqualTo(5);
+  }
+
+
+  @Test
+  @Parameters({"true", "false"})
+  @TestCaseName("{method} - Using PDX: {params}")
+  public void joiningThreeRegionsWithIndexesWhenIntermediateResultSizeIsHigherThanLimitClauseShouldNotTrimResults(
+      boolean usePdx) throws Exception {
+    int matches = 10;
+    int extraEntitiesPerRegion = 25;
+    InternalCache cache = serverRule.getCache();
+    QueryService queryService = cache.getQueryService();
+    String queryString = "SELECT issue.issueId, issue.createdTime, o.orderId, o.version "
+        + "FROM /ValidationIssue issue, /OrderValidationIssueXRef xRef, /Order o "
+        + "WHERE "
+        + "issue.issueId = xRef.referenceIssueId "
+        + "AND "
+        + "xRef.referenceOrderId = o.orderId "
+        + "AND "
+        + "xRef.referenceOrderVersion = o.version ";
+
+    // Create Regions
+    Region<String, Object> orderRegion = cache.<String, Object>createRegionFactory()
+        .setDataPolicy(DataPolicy.REPLICATE).create("Order");
+    Region<String, Object> validationIssueRegion = cache.<String, Object>createRegionFactory()
+        .setDataPolicy(DataPolicy.REPLICATE).create("ValidationIssue");
+    Region<String, Object> validationIssueXRefRegion = cache.<String, Object>createRegionFactory()
+        .setDataPolicy(DataPolicy.REPLICATE).create("OrderValidationIssueXRef");
+
+    // Create Indexes
+    cache.getQueryService().createIndex("order_orderID", "orderId", "/Order", null);
+    cache.getQueryService().createIndex("validationIssue_issueID", "issueId", "/ValidationIssue",
+        null);
+    cache.getQueryService().createIndex("orderValidationIssueXRef_referenceOrderId",
+        "referenceOrderId", "/OrderValidationIssueXRef", null);
+    cache.getQueryService().createIndex("orderValidationIssueXRef_referenceIssueId",
+        "referenceIssueId", "/OrderValidationIssueXRef", null);
+
+    if (!usePdx) {
+      populateTripleJointRegionsWithSerializables(matches, extraEntitiesPerRegion, orderRegion,
+          validationIssueRegion, validationIssueXRefRegion);
+    } else {
+      populateTripleJointRegionsWithPdxInstances(matches, extraEntitiesPerRegion, cache,
+          orderRegion,
+          validationIssueRegion, validationIssueXRefRegion);
+    }
+
+    SelectResults baseResultsWithIndexes =
+        (SelectResults) queryService.newQuery(queryString).execute();
+    SelectResults resultsWithLimitOneWithIndexes =
+        (SelectResults) queryService.newQuery(queryString + "LIMIT 2").execute();
+    SelectResults resultsWithLimitTwoWithIndexes =
+        (SelectResults) queryService.newQuery(queryString + "LIMIT 5").execute();
+    assertThat(baseResultsWithIndexes.size()).isEqualTo(matches);
+    assertThat(resultsWithLimitOneWithIndexes.size()).isEqualTo(2);
+    assertThat(resultsWithLimitTwoWithIndexes.size()).isEqualTo(5);
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java
index 7d04168..33a0281 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java
@@ -486,11 +486,15 @@ public class QueryUtils {
       QueryInvocationTargetException {
 
     int resultSize = values[level].length;
-    int limit = getLimitValue(context);
-    // stops recursion if limit has already been met
+    // We do not know if the first X results might or might not fulfill all operands.
+    Boolean applyLimit = (Boolean) context.cacheGet(CompiledValue.CAN_APPLY_LIMIT_AT_INDEX);
+    int limit = (applyLimit != null && applyLimit) ? getLimitValue(context) : -1;
+
+    // Stops recursion if limit has already been met AND limit can be applied to index.
     if (limit != -1 && result.size() >= limit) {
       return;
     }
+
     for (int j = 0; j < resultSize; ++j) {
       // Check if query execution on this thread is canceled.
       QueryMonitor.throwExceptionIfQueryOnCurrentThreadIsCanceled();