You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mk...@apache.org on 2022/07/29 07:43:17 UTC
[geode] branch develop updated: GEODE-9632: fix for queries with multy operations and indexes (#7824)
This is an automated email from the ASF dual-hosted git repository.
mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 0ecd6f6738 GEODE-9632: fix for queries with multy operations and indexes (#7824)
0ecd6f6738 is described below
commit 0ecd6f673801cbdcc9cfeba7da425c83502d66f8
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Fri Jul 29 09:43:11 2022 +0200
GEODE-9632: fix for queries with multy operations and indexes (#7824)
---
.../internal/AbstractGroupOrRangeJunction.java | 15 +
.../cache/query/internal/CompiledJunction.java | 15 +
.../query/internal/index/MemoryIndexStore.java | 45 +++
.../query/dunit/QueryWithRangeIndexDUnitTest.java | 316 +++++++++++++++++++++
4 files changed, 391 insertions(+)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java
index 31060b0f62..3113e10925 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java
@@ -264,6 +264,15 @@ public abstract class AbstractGroupOrRangeJunction extends AbstractCompiledValue
List sortedConditionsList =
getCondtionsSortedOnIncreasingEstimatedIndexResultSize(context);
+ Boolean applyLimit = (Boolean) context.cacheGet(CompiledValue.CAN_APPLY_LIMIT_AT_INDEX);
+
+ boolean modifiedApplyLimits = false;
+ if (applyLimit != null && applyLimit && sortedConditionsList.size() > 1
+ && _operator == LITERAL_and) {
+ context.cachePut(CAN_APPLY_LIMIT_AT_INDEX, Boolean.FALSE);
+ modifiedApplyLimits = true;
+ }
+
// Sort the operands in increasing order of resultset size
Iterator i = sortedConditionsList.iterator();
// SortedSet intersectionSet = new TreeSet(new SelectResultsComparator());
@@ -285,6 +294,12 @@ public abstract class AbstractGroupOrRangeJunction extends AbstractCompiledValue
// RangeJunction or a CompiledComparison. But if the parent Object is a
// RangeJunction then the Filter is a RangeJunctionEvaluator
SelectResults filterResults = null;
+
+ if (modifiedApplyLimits && sortedConditionsList.size() == 1) {
+ context.cachePut(CAN_APPLY_LIMIT_AT_INDEX, Boolean.TRUE);
+ modifiedApplyLimits = false;
+ }
+
Filter filter = (Filter) i.next();
boolean isConditioningNeeded = filter.isConditioningNeededForIndex(
indpndntItr.length == 1 ? indpndntItr[0] : null, context,
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java
index 5b23519d76..adc4003bef 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java
@@ -278,6 +278,15 @@ public class CompiledJunction extends AbstractCompiledValue implements Negatable
List sortedConditionsList =
getCondtionsSortedOnIncreasingEstimatedIndexResultSize(context);
+ Boolean applyLimit = (Boolean) context.cacheGet(CompiledValue.CAN_APPLY_LIMIT_AT_INDEX);
+
+ boolean modifiedApplyLimits = false;
+ if (applyLimit != null && applyLimit && sortedConditionsList.size() > 1
+ && _operator == LITERAL_and) {
+ context.cachePut(CAN_APPLY_LIMIT_AT_INDEX, Boolean.FALSE);
+ modifiedApplyLimits = true;
+ }
+
// Sort the operands in increasing order of resultset size
Iterator sortedConditionsItr = sortedConditionsList.iterator();
while (sortedConditionsItr.hasNext()) {
@@ -293,6 +302,12 @@ public class CompiledJunction extends AbstractCompiledValue implements Negatable
// recursion being ended by evaluating auxIterEvaluate if any. The passing
// of IntermediateResult in filterEvalaute causes AND junction evaluation
// to be corrupted , if the intermediateResultset contains some value.
+
+ if (modifiedApplyLimits && sortedConditionsList.size() == 1) {
+ context.cachePut(CAN_APPLY_LIMIT_AT_INDEX, Boolean.TRUE);
+ modifiedApplyLimits = false;
+ }
+
SelectResults filterResults =
((Filter) sortedConditionsItr.next()).filterEvaluate(context, null);
if (_operator == LITERAL_and) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java
index 736de22a2d..8c536368af 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/MemoryIndexStore.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.cache.query.internal.index;
+import static java.util.Objects.hash;
+
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@@ -44,6 +46,7 @@ import org.apache.geode.internal.cache.NonTXEntry;
import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.persistence.query.CloseableIterator;
+import org.apache.geode.pdx.internal.PdxInstanceImpl;
/**
* The in-memory index storage
@@ -867,6 +870,48 @@ public class MemoryIndexStore implements IndexStore {
+ Integer.toHexString(System.identityHashCode(this)) + ' ' + key
+ ' ' + value;
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof CachedEntryWrapper)) {
+ return false;
+ }
+ CachedEntryWrapper object = (CachedEntryWrapper) obj;
+ if (!getKey().equals(object.getKey())) {
+ if (!(getKey() instanceof PdxInstanceImpl)) {
+ return false;
+ }
+ if (!(object.getKey() instanceof PdxInstanceImpl)) {
+ return false;
+ }
+ PdxInstanceImpl pdxkey1 = (PdxInstanceImpl) getKey();
+ PdxInstanceImpl pdxkey2 = (PdxInstanceImpl) object.getKey();
+ if (!pdxkey1.equals(pdxkey2)) {
+ return false;
+ }
+ }
+ if (!getValue().equals(object.getValue())) {
+ if (!(getValue() instanceof PdxInstanceImpl)) {
+ return false;
+ }
+ if (!(object.getValue() instanceof PdxInstanceImpl)) {
+ return false;
+ }
+ PdxInstanceImpl pdxvalue1 = (PdxInstanceImpl) getValue();
+ PdxInstanceImpl pdxvalue2 = (PdxInstanceImpl) object.getValue();
+ if (!pdxvalue1.equals(pdxvalue2)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return hash(key, value);
+ }
+
}
}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryWithRangeIndexDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryWithRangeIndexDUnitTest.java
new file mode 100644
index 0000000000..90b9311114
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/dunit/QueryWithRangeIndexDUnitTest.java
@@ -0,0 +1,316 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.query.dunit;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.VM.getVMId;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.HashMap;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.cache.query.data.PortfolioPdx;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+
+@SuppressWarnings("ALL")
+public class QueryWithRangeIndexDUnitTest extends JUnit4DistributedTestCase
+ implements Serializable {
+
+ @Rule
+ public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+ private static final String locatorName = "locator";
+ private static final String serverName = "server";
+
+ private File locatorDir;
+ private File serverDir;
+
+ private int locatorPort;
+ private int locatorJmxPort;
+ private int serverPort;
+
+ private String locators;
+
+ private VM server;
+
+ private static final String regionName = "exampleRegion";
+
+ @Before
+ public void setUp() throws Exception {
+ VM locator = getVM(0);
+ server = getVM(1);
+
+ locatorDir = temporaryFolder.newFolder(locatorName);
+ serverDir = temporaryFolder.newFolder(serverName);
+
+ int[] port = getRandomAvailableTCPPorts(3);
+ locatorPort = port[0];
+ locatorJmxPort = port[1];
+ serverPort = port[2];
+
+ locators = "localhost[" + locatorPort + "]";
+
+ locator.invoke(() -> startLocator(locatorDir, locatorPort, locatorJmxPort));
+
+ gfsh.connectAndVerify(locatorJmxPort, GfshCommandRule.PortType.jmxManager);
+
+ server.invoke(() -> startServer(serverDir, serverPort, locators));
+
+ }
+
+ @Test
+ public void testQueryWithWildcardAndIndexOnAttributeFromHashMap() {
+ gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION")
+ .statusIsSuccess();
+
+ server.invoke(() -> {
+ Cache cache = CacheFactory.getAnyInstance();
+ QueryService cacheQS = cache.getQueryService();
+ cacheQS.createIndex("IdIndex", "value.positions['SUN']",
+ SEPARATOR + regionName + ".entrySet");
+ Region<Integer, Portfolio> region =
+ cache.getRegion(regionName);
+
+ for (int i = 1; i < 10001; i++) {
+ Portfolio p1 = new Portfolio(i, i);
+ p1.positions = new HashMap<>();
+ p1.positions.put("IBM", "something");
+ if (i == 1) {
+ p1.positions.put("SUN", "something");
+ } else {
+ p1.positions.put("SUN", "some");
+ }
+ region.put(i, p1);
+ }
+ });
+
+ String query = "query --query=\"<trace> select e.key, e.value from " +
+ SEPARATOR + regionName + ".entrySet e where e.value.positions['SUN'] like 'somethin%'\"";
+
+ String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel());
+ assertThat(cmdResult).contains("\"Rows\":\"1\"");
+ assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 1)");
+ }
+
+ @Test
+ public void testQueryWithWildcardAndIndexOnMultiValues() {
+ gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION")
+ .statusIsSuccess();
+
+ server.invoke(() -> {
+ Cache cache = CacheFactory.getAnyInstance();
+ QueryService cacheQS = cache.getQueryService();
+ cacheQS.createIndex("IdIndex", "value.positions['SUN']",
+ SEPARATOR + regionName + ".entrySet");
+ Region<Integer, Portfolio> region =
+ cache.getRegion(regionName);
+
+ for (int i = 1; i < 10001; i++) {
+ Portfolio p1 = new Portfolio(i, i);
+ p1.positions = new HashMap<>();
+ p1.positions.put("IBM", "something");
+ if (i % 500 == 0) {
+ p1.positions.put("SUN", "something" + i);
+ } else {
+ p1.positions.put("SUN", "some");
+ }
+ region.put(i, p1);
+ }
+ });
+
+ String query = "query --query=\"<trace> select e.key, e.value from " +
+ SEPARATOR + regionName
+ + ".entrySet e where e.value.positions['SUN'] like 'somethin%'\"";
+
+ String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel());
+ assertThat(cmdResult).contains("\"Rows\":\"20\"");
+ assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 20)");
+ }
+
+ @Test
+ public void testLimitIsAppliedOnlyOnQueryResults() {
+ gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION")
+ .statusIsSuccess();
+
+ server.invoke(() -> {
+ Cache cache = CacheFactory.getAnyInstance();
+ QueryService cacheQS = cache.getQueryService();
+ cacheQS.createIndex("IdIndex", "value.positions['SUN']",
+ SEPARATOR + regionName + ".entrySet");
+ Region<Integer, Portfolio> region =
+ cache.getRegion(regionName);
+
+ for (int i = 1; i < 10001; i++) {
+ Portfolio p1 = new Portfolio(i, i);
+ p1.positions = new HashMap<>();
+ p1.positions.put("IBM", "something");
+ if (i % 500 == 0) {
+ p1.positions.put("SUN", "something");
+ } else {
+ p1.positions.put("SUN", "some");
+ }
+ region.put(i, p1);
+ }
+ });
+
+ String query = "query --query=\"<trace> select e.key, e.value from " +
+ SEPARATOR + regionName
+ + ".entrySet e where e.value.positions['SUN'] like 'somethin%' limit 5\"";
+
+ String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel());
+ assertThat(cmdResult).contains("\"Rows\":\"5\"");
+ assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 5)");
+ }
+
+ @Test
+ public void testQueryWithWildcardAndIndexOnAttributeFromHashMapPdx() {
+ gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION")
+ .statusIsSuccess();
+
+ server.invoke(() -> {
+ Cache cache = CacheFactory.getAnyInstance();
+ QueryService cacheQS = cache.getQueryService();
+ cacheQS.createIndex("IdIndex", "value.positions['SUN']",
+ SEPARATOR + regionName + ".entrySet");
+ Region<Integer, PortfolioPdx> region =
+ cache.getRegion(regionName);
+
+ for (int i = 1; i < 10001; i++) {
+ PortfolioPdx p1 = new PortfolioPdx(i, i);
+ p1.positions = new HashMap<>();
+ p1.positions.put("IBM", "something");
+ if (i == 1) {
+ p1.positions.put("SUN", "something");
+ } else {
+ p1.positions.put("SUN", "some");
+ }
+ region.put(i, p1);
+ }
+ });
+
+ String query = "query --query=\"<trace> select e.key, e.value from " +
+ SEPARATOR + regionName + ".entrySet e where e.value.positions['SUN'] like 'somethin%'\"";
+
+ String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel());
+ assertThat(cmdResult).contains("\"Rows\":\"1\"");
+ assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 1)");
+ }
+
+
+
+ @Test
+ public void testLimitIsAppliedOnlyOnQueryResultsPdx() {
+ gfsh.executeAndAssertThat("create region --name=" + regionName + " --type=PARTITION")
+ .statusIsSuccess();
+
+ server.invoke(() -> {
+ Cache cache = CacheFactory.getAnyInstance();
+ QueryService cacheQS = cache.getQueryService();
+ cacheQS.createIndex("IdIndex", "value.positions['SUN']",
+ SEPARATOR + regionName + ".entrySet");
+ Region<Integer, PortfolioPdx> region =
+ cache.getRegion(regionName);
+
+ for (int i = 1; i < 10001; i++) {
+ PortfolioPdx p1 = new PortfolioPdx(i, i);
+ p1.positions = new HashMap<>();
+ p1.positions.put("IBM", "something");
+ if (i % 500 == 0) {
+ p1.positions.put("SUN", "something");
+ } else {
+ p1.positions.put("SUN", "some");
+ }
+ region.put(i, p1);
+ }
+ });
+
+ String query = "query --query=\"<trace> select e.key, e.value from " +
+ SEPARATOR + regionName
+ + ".entrySet e where e.value.positions['SUN'] like 'somethin%' limit 5\"";
+
+ String cmdResult = String.valueOf(gfsh.executeAndAssertThat(query).getResultModel());
+ assertThat(cmdResult).contains("\"Rows\":\"5\"");
+ assertThat(cmdResult).contains("indexesUsed(1):IdIndex(Results: 5)");
+ }
+
+ private static void startLocator(File workingDirectory, int locatorPort,
+ int jmxPort) {
+ LocatorLauncher locatorLauncher = new LocatorLauncher.Builder()
+ .setMemberName(locatorName)
+ .setPort(locatorPort)
+ .setWorkingDirectory(workingDirectory.getAbsolutePath())
+ .set(JMX_MANAGER, "true")
+ .set(JMX_MANAGER_PORT, String.valueOf(jmxPort))
+ .set(JMX_MANAGER_START, "true")
+ .build();
+
+ locatorLauncher.start();
+
+ await().untilAsserted(() -> {
+ InternalLocator locator = (InternalLocator) locatorLauncher.getLocator();
+ assertThat(locator.isSharedConfigurationRunning())
+ .as("Locator shared configuration is running on locator" + getVMId())
+ .isTrue();
+ });
+ }
+
+ private static void startServer(File workingDirectory, int serverPort,
+ String locators) {
+ ServerLauncher serverLauncher = new ServerLauncher.Builder()
+ .setDeletePidFileOnStop(Boolean.TRUE)
+ .setMemberName(serverName)
+ .setServerPort(serverPort)
+ .setWorkingDirectory(workingDirectory.getAbsolutePath())
+ .setDebug(true)
+ .set(HTTP_SERVICE_PORT, "0")
+ .set(LOCATORS, locators)
+ .build();
+
+ serverLauncher.start();
+ }
+}