You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2023/01/13 07:38:00 UTC
[ignite] branch master updated: IGNITE-18439 SQL Calcite: Take into account TTL of entries during scans - Fixes #10467.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f7bd3318482 IGNITE-18439 SQL Calcite: Take into account TTL of entries during scans - Fixes #10467.
f7bd3318482 is described below
commit f7bd3318482d77717cf7ae9037aed18c87cadd40
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Fri Jan 13 10:35:56 2023 +0300
IGNITE-18439 SQL Calcite: Take into account TTL of entries during scans - Fixes #10467.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../query/calcite/exec/IndexFirstLastScan.java | 2 +-
.../processors/query/calcite/exec/IndexScan.java | 40 ++++++-
.../processors/query/calcite/exec/TableScan.java | 4 +
.../query/calcite/schema/CacheIndexImpl.java | 15 ++-
.../integration/ExpiredEntriesIntegrationTest.java | 128 +++++++++++++++++++++
.../query/calcite/planner/AbstractPlannerTest.java | 2 +-
.../ignite/testsuites/IntegrationTestSuite.java | 2 +
7 files changed, 183 insertions(+), 10 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
index 0034c39f46a..dd316442cf6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
@@ -59,7 +59,7 @@ public class IndexFirstLastScan<Row> extends IndexScan<Row> {
return new IndexQueryContext(
res.cacheFilter(),
- createNotNullRowFilter(idx),
+ createNotNullRowFilter(idx, true),
res.mvccSnapshot()
);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index 1ad6a91cdcb..a8fd6c4ab89 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
import org.jetbrains.annotations.Nullable;
@@ -185,6 +186,10 @@ public class IndexScan<Row> extends AbstractIndexScan<Row, IndexRow> {
private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
List<InlineIndexKeyType> inlinedKeys = idx.segment(0).rowHandler().inlineIndexKeyTypes();
+ // Since inline scan doesn't check expire time, allow it only if expired entries are eagerly removed.
+ if (!cctx.config().isEagerTtl())
+ return null;
+
// Even if we need some subset of inlined keys we are required to the read full inlined row, since this row
// is also participated in comparison with other rows when cursor processing the next index page.
if (inlinedKeys.size() < idx.segment(0).rowHandler().indexKeyDefinitions().size() ||
@@ -376,7 +381,9 @@ public class IndexScan<Row> extends AbstractIndexScan<Row, IndexRow> {
InlineIndexRowFactory rowFactory = isInlineScan() ?
new InlineIndexRowFactory(rowHnd.inlineIndexKeyTypes().toArray(new InlineIndexKeyType[0]), rowHnd) : null;
- return new IndexQueryContext(filter, null, rowFactory, mvccSnapshot);
+ BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = isInlineScan() ? null : createNotExpiredRowFilter();
+
+ return new IndexQueryContext(filter, rowFilter, rowFactory, mvccSnapshot);
}
/** */
@@ -444,7 +451,10 @@ public class IndexScan<Row> extends AbstractIndexScan<Row, IndexRow> {
/**
* Creates row filter to skip null values in the first index column.
*/
- public static BPlusTree.TreeRowClosure<IndexRow, IndexRow> createNotNullRowFilter(InlineIndex idx) {
+ public static BPlusTree.TreeRowClosure<IndexRow, IndexRow> createNotNullRowFilter(
+ InlineIndex idx,
+ boolean checkExpired
+ ) {
List<InlineIndexKeyType> inlineKeyTypes = idx.segment(0).rowHandler().inlineIndexKeyTypes();
InlineIndexKeyType keyType = F.isEmpty(inlineKeyTypes) ? null : inlineKeyTypes.get(0);
@@ -457,18 +467,36 @@ public class IndexScan<Row> extends AbstractIndexScan<Row, IndexRow> {
long pageAddr,
int idx
) throws IgniteCheckedException {
- if (keyType != null && io instanceof InlineIO) {
+ if (!checkExpired && keyType != null && io instanceof InlineIO) {
Boolean keyIsNull = keyType.isNull(pageAddr, io.offset(idx), ((InlineIO)io).inlineSize());
- if (keyIsNull != null)
- return !keyIsNull;
+ if (keyIsNull == Boolean.TRUE)
+ return false;
}
- return io.getLookupRow(tree, pageAddr, idx).key(0).type() != IndexKeyType.NULL;
+ IndexRow idxRow = io.getLookupRow(tree, pageAddr, idx);
+
+ if (checkExpired &&
+ idxRow.cacheDataRow().expireTime() > 0 &&
+ idxRow.cacheDataRow().expireTime() <= U.currentTimeMillis())
+ return false;
+
+ return idxRow.key(0).type() != IndexKeyType.NULL;
}
};
}
+ /** */
+ public static BPlusTree.TreeRowClosure<IndexRow, IndexRow> createNotExpiredRowFilter() {
+ return (tree, io, pageAddr, idx) -> {
+ IndexRow idxRow = io.getLookupRow(tree, pageAddr, idx);
+
+ // Skip expired.
+ return !(idxRow.cacheDataRow().expireTime() > 0 &&
+ idxRow.cacheDataRow().expireTime() <= U.currentTimeMillis());
+ };
+ }
+
/** */
protected static class TreeIndexWrapper implements TreeIndex<IndexRow> {
/** Underlying index. */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
index 55011b17443..b4637e6cb44 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDesc
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
/** */
@@ -258,6 +259,9 @@ public class TableScan<Row> implements Iterable<Row>, AutoCloseable {
if (cur.next()) {
CacheDataRow row = cur.get();
+ if (row.expireTime() > 0 && row.expireTime() <= U.currentTimeMillis())
+ continue;
+
if (!desc.match(row))
continue;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
index 68c8ea896a0..9fd8a5553b0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
@@ -167,11 +167,14 @@ public class CacheIndexImpl implements IgniteIndex {
BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = null;
+ boolean checkExpired = !tbl.descriptor().cacheContext().config().isEagerTtl();
+
if (notNull) {
boolean nullsFirst = collation.getFieldCollations().get(0).nullDirection ==
RelFieldCollation.NullDirection.FIRST;
- BPlusTree.TreeRowClosure<IndexRow, IndexRow> notNullRowFilter = IndexScan.createNotNullRowFilter(iidx);
+ BPlusTree.TreeRowClosure<IndexRow, IndexRow> notNullRowFilter =
+ IndexScan.createNotNullRowFilter(iidx, checkExpired);
AtomicBoolean skipCheck = new AtomicBoolean();
@@ -186,7 +189,7 @@ public class CacheIndexImpl implements IgniteIndex {
// don't need to check it with notNullRowFilter.
// In case of NULL-LAST collation, all values after first null value will be null,
// don't need to check it too.
- if (skipCheck.get())
+ if (skipCheck.get() && !checkExpired)
return nullsFirst;
boolean res = notNullRowFilter.apply(tree, io, pageAddr, idx);
@@ -198,6 +201,8 @@ public class CacheIndexImpl implements IgniteIndex {
}
};
}
+ else if (checkExpired)
+ rowFilter = IndexScan.createNotExpiredRowFilter();
try {
for (int i = 0; i < iidx.segmentsCount(); ++i)
@@ -242,6 +247,12 @@ public class CacheIndexImpl implements IgniteIndex {
if (idx == null)
return false;
+ // Since inline scan doesn't check expire time, allow it only if expired entries are eagerly removed.
+ if (tbl.descriptor().cacheInfo() != null) {
+ if (!tbl.descriptor().cacheInfo().config().isEagerTtl())
+ return false;
+ }
+
if (requiredColumns == null)
requiredColumns = ImmutableBitSet.range(tbl.descriptor().columnDescriptors().size());
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/ExpiredEntriesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/ExpiredEntriesIntegrationTest.java
new file mode 100644
index 00000000000..d165db2663f
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/ExpiredEntriesIntegrationTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsIndexScan;
+import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsSubPlan;
+import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsTableScan;
+
+/**
+ * Test query expired entries.
+ */
+public class ExpiredEntriesIntegrationTest extends AbstractBasicIntegrationTest {
+ /** */
+ @Test
+ public void testExpiration() throws Exception {
+ CacheConfiguration<Integer, Developer> cacheCfg = new CacheConfiguration<Integer, Developer>()
+ .setIndexedTypes(Integer.class, Developer.class)
+ .setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, 1)));
+
+ IgniteCache<Integer, Developer> cache1 = client.getOrCreateCache(new CacheConfiguration<>(cacheCfg)
+ .setName("CACHE1")
+ .setEagerTtl(false)
+ );
+
+ IgniteCache<Integer, Developer> cache2 = client.getOrCreateCache(new CacheConfiguration<>(cacheCfg)
+ .setName("CACHE2")
+ .setEagerTtl(true)
+ );
+
+ awaitPartitionMapExchange();
+
+ for (int i = 0; i < 100; i++) {
+ cache1.put(i, new Developer("dev" + i, i));
+ cache2.put(i, new Developer("dev" + i, i));
+ }
+
+ ExpiryPolicy expPlc = new CreatedExpiryPolicy(new Duration(TimeUnit.DAYS, 1));
+
+ for (int i = 50; i < 55; i++) {
+ cache1.withExpiryPolicy(expPlc).put(i, new Developer("dev" + i, i));
+ cache2.withExpiryPolicy(expPlc).put(i, new Developer("dev" + i, i));
+ }
+
+ GridTestUtils.waitForCondition(() -> cache2.size() == 5, 1_000);
+
+ checkExpiration("CACHE1", false);
+ checkExpiration("CACHE2", true);
+ }
+
+ /** */
+ private void checkExpiration(String schema, boolean eagerTtl) {
+ assertQuery("SELECT depId, name FROM " + schema + ".DEVELOPER WHERE name IS NOT NULL")
+ .matches(containsTableScan(schema, "DEVELOPER"))
+ .returns(50, "dev50").returns(51, "dev51").returns(52, "dev52").returns(53, "dev53").returns(54, "dev54")
+ .check();
+
+ assertQuery("SELECT depId, name FROM " + schema + ".DEVELOPER WHERE depId BETWEEN 30 and 70")
+ .matches(containsIndexScan(schema, "DEVELOPER"))
+ .returns(50, "dev50").returns(51, "dev51").returns(52, "dev52").returns(53, "dev53").returns(54, "dev54")
+ .check();
+
+ assertQuery("SELECT depId FROM " + schema + ".DEVELOPER WHERE depId BETWEEN 30 and 70")
+ .matches(containsSubPlan("inlineScan=[" + eagerTtl + "]"))
+ .returns(50).returns(51).returns(52).returns(53).returns(54)
+ .check();
+
+ assertQuery("SELECT min(depId) FROM " + schema + ".DEVELOPER")
+ .matches(containsSubPlan("IgniteIndexBound"))
+ .returns(50)
+ .check();
+
+ assertQuery("SELECT max(depId) FROM " + schema + ".DEVELOPER")
+ .matches(containsSubPlan("IgniteIndexBound"))
+ .returns(54)
+ .check();
+
+ assertQuery("SELECT count(depId) FROM " + schema + ".DEVELOPER")
+ .matches(containsSubPlan("IgniteIndexCount"))
+ .returns(5L)
+ .check();
+
+ assertQuery("SELECT count(*) FROM " + schema + ".DEVELOPER")
+ .matches(containsSubPlan("IgniteIndexCount"))
+ .returns(5L)
+ .check();
+ }
+
+ /** */
+ private static class Developer {
+ /** */
+ @QuerySqlField
+ String name;
+
+ /** */
+ @QuerySqlField(index = true)
+ int depId;
+
+ /** */
+ public Developer(String name, int depId) {
+ this.name = name;
+ this.depId = depId;
+ }
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 64c5858e435..328345e6523 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -701,7 +701,7 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public GridCacheContextInfo cacheInfo() {
- throw new AssertionError();
+ return null;
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 569eaa44328..4ad37c03df1 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.CalciteEr
import org.apache.ignite.internal.processors.query.calcite.integration.CorrelatesIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.DataTypesTest;
import org.apache.ignite.internal.processors.query.calcite.integration.DynamicParametersIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.ExpiredEntriesIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.FunctionsTest;
import org.apache.ignite.internal.processors.query.calcite.integration.HashSpoolIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.IndexDdlIntegrationTest;
@@ -115,6 +116,7 @@ import org.junit.runners.Suite;
MemoryQuotasIntegrationTest.class,
LocalDateTimeSupportTest.class,
DynamicParametersIntegrationTest.class,
+ ExpiredEntriesIntegrationTest.class,
})
public class IntegrationTestSuite {
}