You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mk...@apache.org on 2022/07/29 07:43:17 UTC

[geode] branch develop updated: GEODE-9632: fix for queries with multy operations and indexes (#7824)

This is an automated email from the ASF dual-hosted git repository.

mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0ecd6f6738 GEODE-9632: fix for queries with multy operations and indexes (#7824)
0ecd6f6738 is described below

commit 0ecd6f673801cbdcc9cfeba7da425c83502d66f8
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Fri Jul 29 09:43:11 2022 +0200

    GEODE-9632: fix for queries with multy operations and indexes (#7824)
---
 .../internal/AbstractGroupOrRangeJunction.java     |  15 +
 .../cache/query/internal/CompiledJunction.java     |  15 +
 .../query/internal/index/MemoryIndexStore.java     |  45 +++
 .../query/dunit/QueryWithRangeIndexDUnitTest.java  | 316 +++++++++++++++++++++
 4 files changed, 391 insertions(+)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java
index 31060b0f62..3113e10925 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java
@@ -264,6 +264,15 @@ public abstract class AbstractGroupOrRangeJunction extends AbstractCompiledValue
     List sortedConditionsList =
         getCondtionsSortedOnIncreasingEstimatedIndexResultSize(context);
 
+    Boolean applyLimit = (Boolean) context.cacheGet(CompiledValue.CAN_APPLY_LIMIT_AT_INDEX);
+
+    boolean modifiedApplyLimits = false;
+    if (applyLimit != null && applyLimit && sortedConditionsList.size() > 1
+        && _operator == LITERAL_and) {
+      context.cachePut(CAN_APPLY_LIMIT_AT_INDEX, Boolean.FALSE);
+      modifiedApplyLimits = true;
+    }
+
     // Sort the operands in increasing order of resultset size
     Iterator i = sortedConditionsList.iterator();
     // SortedSet intersectionSet = new TreeSet(new SelectResultsComparator());
@@ -285,6 +294,12 @@ public abstract class AbstractGroupOrRangeJunction extends AbstractCompiledValue
       // RangeJunction or a CompiledComparison. But if the parent Object is a
       // RangeJunction then the Filter is a RangeJunctionEvaluator
       SelectResults filterResults = null;
+
+      if (modifiedApplyLimits && sortedConditionsList.size() == 1) {
+        context.cachePut(CAN_APPLY_LIMIT_AT_INDEX, Boolean.TRUE);
+        modifiedApplyLimits = false;
+      }
+
       Filter filter = (Filter) i.next();
       boolean isConditioningNeeded = filter.isConditioningNeededForIndex(
           indpndntItr.length == 1 ? indpndntItr[0] : null, context,
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java
index 5b23519d76..adc4003bef 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java
@@ -278,6 +278,15 @@ public class CompiledJunction extends AbstractCompiledValue implements Negatable
     List sortedConditionsList =
         getCondtionsSortedOnIncreasingEstimatedIndexResultSize(context);
 
+    Boolean applyLimit = (Boolean) context.cacheGet(CompiledValue.CAN_APPLY_LIMIT_AT_INDEX);
+
+    boolean modifiedApplyLimits = false;
+    if (applyLimit != null && applyLimit && sortedConditionsList.size() > 1
+        && _operator == LITERAL_and) {
+      context.cachePut(CAN_APPLY_LIMIT_AT_INDEX, Boolean.FALSE);
+      modifiedApplyLimits = true;
+    }
+
     // Sort the operands in increasing order of resultset size
     Iterator sortedConditionsItr = sortedConditionsList.iterator();
     while (sortedConditionsItr.hasNext()) {
@@ -293,6 +302,12 @@ public class CompiledJunction extends AbstractCompiledValue implements Negatable
       // recursion being ended by evaluating auxIterEvaluate if any. The passing
       // of IntermediateResult in filterEvalaute causes AND junction evaluation
       // to be corrupted , if the intermediateResultset contains some value.
+
+      if (modifiedApplyLimits && sortedConditionsList.size() == 1) {
+        context.cachePut(CAN_APPLY_LIMIT_AT_INDEX, Boolean.TRUE);
+        modifiedApplyLimits = false;
+      }
+
       SelectResults filterResults =
           ((Filter) sortedConditionsItr.next()).filterEvaluate(context, null);
       if (_operator == LITERAL_and) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java
index 736de22a2d..8c536368af 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.cache.query.internal.index;
 
+import static java.util.Objects.hash;
+
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -44,6 +46,7 @@ import org.apache.geode.internal.cache.NonTXEntry;
 import org.apache.geode.internal.cache.RegionEntry;
 import org.apache.geode.internal.cache.Token;
 import org.apache.geode.internal.cache.persistence.query.CloseableIterator;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
 
 /**
  * The in-memory index storage
@@ -867,6 +870,48 @@ public class MemoryIndexStore implements IndexStore {
           + Integer.toHexString(System.identityHashCode(this)) + ' ' + key
           + ' ' + value;
     }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof CachedEntryWrapper)) {
+        return false;
+      }
+      CachedEntryWrapper object = (CachedEntryWrapper) obj;
+      if (!getKey().equals(object.getKey())) {
+        if (!(getKey() instanceof PdxInstanceImpl)) {
+          return false;
+        }
+        if (!(object.getKey() instanceof PdxInstanceImpl)) {
+          return false;
+        }
+        PdxInstanceImpl pdxkey1 = (PdxInstanceImpl) getKey();
+        PdxInstanceImpl pdxkey2 = (PdxInstanceImpl) object.getKey();
+        if (!pdxkey1.equals(pdxkey2)) {
+          return false;
+        }
+      }
+      if (!getValue().equals(object.getValue())) {
+        if (!(getValue() instanceof PdxInstanceImpl)) {
+          return false;
+        }
+        if (!(object.getValue() instanceof PdxInstanceImpl)) {
+          return false;
+        }
+        PdxInstanceImpl pdxvalue1 = (PdxInstanceImpl) getValue();
+        PdxInstanceImpl pdxvalue2 = (PdxInstanceImpl) object.getValue();
+        if (!pdxvalue1.equals(pdxvalue2)) {
+          return false;
+        }
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return hash(key, value);
+    }
+
   }
 
 }
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryWithRangeIndexDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryWithRangeIndexDUnitTest.java
new file mode 100644
index 0000000000..90b9311114
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryWithRangeIndexDUnitTest.java
@@ -0,0 +1,316 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.query.dunit;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.VM.getVMId;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.HashMap;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.cache.query.data.PortfolioPdx;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+
+@SuppressWarnings("ALL")
+public class QueryWithRangeIndexDUnitTest extends JUnit4DistributedTestCase
+    implements Serializable {
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+  private static final String locatorName = "locator";
+  private static final String serverName = "server";
+
+  private File locatorDir;
+  private File serverDir;
+
+  private int locatorPort;
+  private int locatorJmxPort;
+  private int serverPort;
+
+  private String locators;
+
+  private VM server;
+
+  private static final String regionName = "exampleRegion";
+
+  @Before
+  public void setUp() throws Exception {
+    VM locator = getVM(0);
+    server = getVM(1);
+
+    locatorDir = temporaryFolder.newFolder(locatorName);
+    serverDir = temporaryFolder.newFolder(serverName);
+
+    int[] port = getRandomAvailableTCPPorts(3);
+    locatorPort = port[0];
+    locatorJmxPort = port[1];
+    serverPort = port[2];
+
+    locators = "localhost[" + locatorPort + "]";
+
+    locator.invoke(() -> startLocator(locatorDir, locatorPort, locatorJmxPort));
+
+    gfsh.connectAndVerify(locatorJmxPort, GfshCommandRule.PortType.jmxManager);
+
+    server.invoke(() -> startServer(serverDir, serverPort, locators));
+
+  }
+
+  @Test
+  public void testQueryWithWildcardAndIndexOnAttributeFromHashMap() {
+    gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION")
+        .statusIsSuccess();
+
+    server.invoke(() -> {
+      Cache cache = CacheFactory.getAnyInstance();
+      QueryService cacheQS = cache.getQueryService();
+      cacheQS.createIndex("IdIndex", "value.positions['SUN']",
+          SEPARATOR + regionName + ".entrySet");
+      Region<Integer, Portfolio> region =
+          cache.getRegion(regionName);
+
+      for (int i = 1; i < 10001; i++) {
+        Portfolio p1 = new Portfolio(i, i);
+        p1.positions = new HashMap<>();
+        p1.positions.put("IBM", "something");
+        if (i == 1) {
+          p1.positions.put("SUN", "something");
+        } else {
+          p1.positions.put("SUN", "some");
+        }
+        region.put(i, p1);
+      }
+    });
+
+    String query = "query --query=\"<trace> select e.key, e.value from " +
+        SEPARATOR + regionName + ".entrySet e where e.value.positions['SUN'] like 'somethin%'\"";
+
+    String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel());
+    assertThat(cmdResult).contains("\"Rows\":\"1\"");
+    assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 1)");
+  }
+
+  @Test
+  public void testQueryWithWildcardAndIndexOnMultiValues() {
+    gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION")
+        .statusIsSuccess();
+
+    server.invoke(() -> {
+      Cache cache = CacheFactory.getAnyInstance();
+      QueryService cacheQS = cache.getQueryService();
+      cacheQS.createIndex("IdIndex", "value.positions['SUN']",
+          SEPARATOR + regionName + ".entrySet");
+      Region<Integer, Portfolio> region =
+          cache.getRegion(regionName);
+
+      for (int i = 1; i < 10001; i++) {
+        Portfolio p1 = new Portfolio(i, i);
+        p1.positions = new HashMap<>();
+        p1.positions.put("IBM", "something");
+        if (i % 500 == 0) {
+          p1.positions.put("SUN", "something" + i);
+        } else {
+          p1.positions.put("SUN", "some");
+        }
+        region.put(i, p1);
+      }
+    });
+
+    String query = "query --query=\"<trace> select e.key, e.value from " +
+        SEPARATOR + regionName
+        + ".entrySet e where e.value.positions['SUN'] like 'somethin%'\"";
+
+    String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel());
+    assertThat(cmdResult).contains("\"Rows\":\"20\"");
+    assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 20)");
+  }
+
+  @Test
+  public void testLimitIsAppliedOnlyOnQueryResults() {
+    gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION")
+        .statusIsSuccess();
+
+    server.invoke(() -> {
+      Cache cache = CacheFactory.getAnyInstance();
+      QueryService cacheQS = cache.getQueryService();
+      cacheQS.createIndex("IdIndex", "value.positions['SUN']",
+          SEPARATOR + regionName + ".entrySet");
+      Region<Integer, Portfolio> region =
+          cache.getRegion(regionName);
+
+      for (int i = 1; i < 10001; i++) {
+        Portfolio p1 = new Portfolio(i, i);
+        p1.positions = new HashMap<>();
+        p1.positions.put("IBM", "something");
+        if (i % 500 == 0) {
+          p1.positions.put("SUN", "something");
+        } else {
+          p1.positions.put("SUN", "some");
+        }
+        region.put(i, p1);
+      }
+    });
+
+    String query = "query --query=\"<trace> select e.key, e.value from " +
+        SEPARATOR + regionName
+        + ".entrySet e where e.value.positions['SUN'] like 'somethin%' limit 5\"";
+
+    String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel());
+    assertThat(cmdResult).contains("\"Rows\":\"5\"");
+    assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 5)");
+  }
+
+  @Test
+  public void testQueryWithWildcardAndIndexOnAttributeFromHashMapPdx() {
+    gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION")
+        .statusIsSuccess();
+
+    server.invoke(() -> {
+      Cache cache = CacheFactory.getAnyInstance();
+      QueryService cacheQS = cache.getQueryService();
+      cacheQS.createIndex("IdIndex", "value.positions['SUN']",
+          SEPARATOR + regionName + ".entrySet");
+      Region<Integer, PortfolioPdx> region =
+          cache.getRegion(regionName);
+
+      for (int i = 1; i < 10001; i++) {
+        PortfolioPdx p1 = new PortfolioPdx(i, i);
+        p1.positions = new HashMap<>();
+        p1.positions.put("IBM", "something");
+        if (i == 1) {
+          p1.positions.put("SUN", "something");
+        } else {
+          p1.positions.put("SUN", "some");
+        }
+        region.put(i, p1);
+      }
+    });
+
+    String query = "query --query=\"<trace> select e.key, e.value from " +
+        SEPARATOR + regionName + ".entrySet e where e.value.positions['SUN'] like 'somethin%'\"";
+
+    String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel());
+    assertThat(cmdResult).contains("\"Rows\":\"1\"");
+    assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 1)");
+  }
+
+
+
+  @Test
+  public void testLimitIsAppliedOnlyOnQueryResultsPdx() {
+    gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION")
+        .statusIsSuccess();
+
+    server.invoke(() -> {
+      Cache cache = CacheFactory.getAnyInstance();
+      QueryService cacheQS = cache.getQueryService();
+      cacheQS.createIndex("IdIndex", "value.positions['SUN']",
+          SEPARATOR + regionName + ".entrySet");
+      Region<Integer, PortfolioPdx> region =
+          cache.getRegion(regionName);
+
+      for (int i = 1; i < 10001; i++) {
+        PortfolioPdx p1 = new PortfolioPdx(i, i);
+        p1.positions = new HashMap<>();
+        p1.positions.put("IBM", "something");
+        if (i % 500 == 0) {
+          p1.positions.put("SUN", "something");
+        } else {
+          p1.positions.put("SUN", "some");
+        }
+        region.put(i, p1);
+      }
+    });
+
+    String query = "query --query=\"<trace> select e.key, e.value from " +
+        SEPARATOR + regionName
+        + ".entrySet e where e.value.positions['SUN'] like 'somethin%' limit 5\"";
+
+    String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel());
+    assertThat(cmdResult).contains("\"Rows\":\"5\"");
+    assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 5)");
+  }
+
+  private static void startLocator(File workingDirectory, int locatorPort,
+      int jmxPort) {
+    LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
+        .setMemberName(locatorName)
+        .setPort(locatorPort)
+        .setWorkingDirectory(workingDirectory.getAbsolutePath())
+        .set(JMX_MANAGER, "true")
+        .set(JMX_MANAGER_PORT, String.valueOf(jmxPort))
+        .set(JMX_MANAGER_START, "true")
+        .build();
+
+    locatorLauncher.start();
+
+    await().untilAsserted(() -> {
+      InternalLocator locator = (InternalLocator) locatorLauncher.getLocator();
+      assertThat(locator.isSharedConfigurationRunning())
+          .as("Locator shared configuration is running on locator" + getVMId())
+          .isTrue();
+    });
+  }
+
+  private static void startServer(File workingDirectory, int serverPort,
+      String locators) {
+    ServerLauncher serverLauncher = new ServerLauncher.Builder()
+        .setDeletePidFileOnStop(Boolean.TRUE)
+        .setMemberName(serverName)
+        .setServerPort(serverPort)
+        .setWorkingDirectory(workingDirectory.getAbsolutePath())
+        .setDebug(true)
+        .set(HTTP_SERVICE_PORT, "0")
+        .set(LOCATORS, locators)
+        .build();
+
+    serverLauncher.start();
+  }
+}