You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2023/01/13 07:38:00 UTC

[ignite] branch master updated: IGNITE-18439 SQL Calcite: Take into account TTL of entries during scans - Fixes #10467.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f7bd3318482 IGNITE-18439 SQL Calcite: Take into account TTL of entries during scans - Fixes #10467.
f7bd3318482 is described below

commit f7bd3318482d77717cf7ae9037aed18c87cadd40
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Fri Jan 13 10:35:56 2023 +0300

    IGNITE-18439 SQL Calcite: Take into account TTL of entries during scans - Fixes #10467.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../query/calcite/exec/IndexFirstLastScan.java     |   2 +-
 .../processors/query/calcite/exec/IndexScan.java   |  40 ++++++-
 .../processors/query/calcite/exec/TableScan.java   |   4 +
 .../query/calcite/schema/CacheIndexImpl.java       |  15 ++-
 .../integration/ExpiredEntriesIntegrationTest.java | 128 +++++++++++++++++++++
 .../query/calcite/planner/AbstractPlannerTest.java |   2 +-
 .../ignite/testsuites/IntegrationTestSuite.java    |   2 +
 7 files changed, 183 insertions(+), 10 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
index 0034c39f46a..dd316442cf6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexFirstLastScan.java
@@ -59,7 +59,7 @@ public class IndexFirstLastScan<Row> extends IndexScan<Row> {
 
         return new IndexQueryContext(
             res.cacheFilter(),
-            createNotNullRowFilter(idx),
+            createNotNullRowFilter(idx, true),
             res.mvccSnapshot()
         );
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index 1ad6a91cdcb..a8fd6c4ab89 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
 import org.jetbrains.annotations.Nullable;
@@ -185,6 +186,10 @@ public class IndexScan<Row> extends AbstractIndexScan<Row, IndexRow> {
     private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
         List<InlineIndexKeyType> inlinedKeys = idx.segment(0).rowHandler().inlineIndexKeyTypes();
 
+        // Since inline scan doesn't check expire time, allow it only if expired entries are eagerly removed.
+        if (!cctx.config().isEagerTtl())
+            return null;
+
         // Even if we need some subset of inlined keys we are required to the read full inlined row, since this row
         // is also participated in comparison with other rows when cursor processing the next index page.
         if (inlinedKeys.size() < idx.segment(0).rowHandler().indexKeyDefinitions().size() ||
@@ -376,7 +381,9 @@ public class IndexScan<Row> extends AbstractIndexScan<Row, IndexRow> {
         InlineIndexRowFactory rowFactory = isInlineScan() ?
             new InlineIndexRowFactory(rowHnd.inlineIndexKeyTypes().toArray(new InlineIndexKeyType[0]), rowHnd) : null;
 
-        return new IndexQueryContext(filter, null, rowFactory, mvccSnapshot);
+        BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = isInlineScan() ? null : createNotExpiredRowFilter();
+
+        return new IndexQueryContext(filter, rowFilter, rowFactory, mvccSnapshot);
     }
 
     /** */
@@ -444,7 +451,10 @@ public class IndexScan<Row> extends AbstractIndexScan<Row, IndexRow> {
     /**
      * Creates row filter to skip null values in the first index column.
      */
-    public static BPlusTree.TreeRowClosure<IndexRow, IndexRow> createNotNullRowFilter(InlineIndex idx) {
+    public static BPlusTree.TreeRowClosure<IndexRow, IndexRow> createNotNullRowFilter(
+        InlineIndex idx,
+        boolean checkExpired
+    ) {
         List<InlineIndexKeyType> inlineKeyTypes = idx.segment(0).rowHandler().inlineIndexKeyTypes();
 
         InlineIndexKeyType keyType = F.isEmpty(inlineKeyTypes) ? null : inlineKeyTypes.get(0);
@@ -457,18 +467,36 @@ public class IndexScan<Row> extends AbstractIndexScan<Row, IndexRow> {
                 long pageAddr,
                 int idx
             ) throws IgniteCheckedException {
-                if (keyType != null && io instanceof InlineIO) {
+                if (!checkExpired && keyType != null && io instanceof InlineIO) {
                     Boolean keyIsNull = keyType.isNull(pageAddr, io.offset(idx), ((InlineIO)io).inlineSize());
 
-                    if (keyIsNull != null)
-                        return !keyIsNull;
+                    if (keyIsNull == Boolean.TRUE)
+                        return false;
                 }
 
-                return io.getLookupRow(tree, pageAddr, idx).key(0).type() != IndexKeyType.NULL;
+                IndexRow idxRow = io.getLookupRow(tree, pageAddr, idx);
+
+                if (checkExpired &&
+                    idxRow.cacheDataRow().expireTime() > 0 &&
+                    idxRow.cacheDataRow().expireTime() <= U.currentTimeMillis())
+                    return false;
+
+                return idxRow.key(0).type() != IndexKeyType.NULL;
             }
         };
     }
 
+    /** */
+    public static BPlusTree.TreeRowClosure<IndexRow, IndexRow> createNotExpiredRowFilter() {
+        return (tree, io, pageAddr, idx) -> {
+            IndexRow idxRow = io.getLookupRow(tree, pageAddr, idx);
+
+            // Skip expired.
+            return !(idxRow.cacheDataRow().expireTime() > 0 &&
+                idxRow.cacheDataRow().expireTime() <= U.currentTimeMillis());
+        };
+    }
+
     /** */
     protected static class TreeIndexWrapper implements TreeIndex<IndexRow> {
         /** Underlying index. */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
index 55011b17443..b4637e6cb44 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDesc
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 /** */
@@ -258,6 +259,9 @@ public class TableScan<Row> implements Iterable<Row>, AutoCloseable {
                 if (cur.next()) {
                     CacheDataRow row = cur.get();
 
+                    if (row.expireTime() > 0 && row.expireTime() <= U.currentTimeMillis())
+                        continue;
+
                     if (!desc.match(row))
                         continue;
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
index 68c8ea896a0..9fd8a5553b0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheIndexImpl.java
@@ -167,11 +167,14 @@ public class CacheIndexImpl implements IgniteIndex {
 
             BPlusTree.TreeRowClosure<IndexRow, IndexRow> rowFilter = null;
 
+            boolean checkExpired = !tbl.descriptor().cacheContext().config().isEagerTtl();
+
             if (notNull) {
                 boolean nullsFirst = collation.getFieldCollations().get(0).nullDirection ==
                     RelFieldCollation.NullDirection.FIRST;
 
-                BPlusTree.TreeRowClosure<IndexRow, IndexRow> notNullRowFilter = IndexScan.createNotNullRowFilter(iidx);
+                BPlusTree.TreeRowClosure<IndexRow, IndexRow> notNullRowFilter =
+                    IndexScan.createNotNullRowFilter(iidx, checkExpired);
 
                 AtomicBoolean skipCheck = new AtomicBoolean();
 
@@ -186,7 +189,7 @@ public class CacheIndexImpl implements IgniteIndex {
                         // don't need to check it with notNullRowFilter.
                         // In case of NULL-LAST collation, all values after first null value will be null,
                         // don't need to check it too.
-                        if (skipCheck.get())
+                        if (skipCheck.get() && !checkExpired)
                             return nullsFirst;
 
                         boolean res = notNullRowFilter.apply(tree, io, pageAddr, idx);
@@ -198,6 +201,8 @@ public class CacheIndexImpl implements IgniteIndex {
                     }
                 };
             }
+            else if (checkExpired)
+                rowFilter = IndexScan.createNotExpiredRowFilter();
 
             try {
                 for (int i = 0; i < iidx.segmentsCount(); ++i)
@@ -242,6 +247,12 @@ public class CacheIndexImpl implements IgniteIndex {
         if (idx == null)
             return false;
 
+        // Since inline scan doesn't check expire time, allow it only if expired entries are eagerly removed.
+        if (tbl.descriptor().cacheInfo() != null) {
+            if (!tbl.descriptor().cacheInfo().config().isEagerTtl())
+                return false;
+        }
+
         if (requiredColumns == null)
             requiredColumns = ImmutableBitSet.range(tbl.descriptor().columnDescriptors().size());
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/ExpiredEntriesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/ExpiredEntriesIntegrationTest.java
new file mode 100644
index 00000000000..d165db2663f
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/ExpiredEntriesIntegrationTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsIndexScan;
+import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsSubPlan;
+import static org.apache.ignite.internal.processors.query.calcite.QueryChecker.containsTableScan;
+
+/**
+ * Test query expired entries.
+ */
+public class ExpiredEntriesIntegrationTest extends AbstractBasicIntegrationTest {
+    /** */
+    @Test
+    public void testExpiration() throws Exception {
+        CacheConfiguration<Integer, Developer> cacheCfg = new CacheConfiguration<Integer, Developer>()
+            .setIndexedTypes(Integer.class, Developer.class)
+            .setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, 1)));
+
+        IgniteCache<Integer, Developer> cache1 = client.getOrCreateCache(new CacheConfiguration<>(cacheCfg)
+            .setName("CACHE1")
+            .setEagerTtl(false)
+        );
+
+        IgniteCache<Integer, Developer> cache2 = client.getOrCreateCache(new CacheConfiguration<>(cacheCfg)
+            .setName("CACHE2")
+            .setEagerTtl(true)
+        );
+
+        awaitPartitionMapExchange();
+
+        for (int i = 0; i < 100; i++) {
+            cache1.put(i, new Developer("dev" + i, i));
+            cache2.put(i, new Developer("dev" + i, i));
+        }
+
+        ExpiryPolicy expPlc = new CreatedExpiryPolicy(new Duration(TimeUnit.DAYS, 1));
+
+        for (int i = 50; i < 55; i++) {
+            cache1.withExpiryPolicy(expPlc).put(i, new Developer("dev" + i, i));
+            cache2.withExpiryPolicy(expPlc).put(i, new Developer("dev" + i, i));
+        }
+
+        GridTestUtils.waitForCondition(() -> cache2.size() == 5, 1_000);
+
+        checkExpiration("CACHE1", false);
+        checkExpiration("CACHE2", true);
+    }
+
+    /** */
+    private void checkExpiration(String schema, boolean eagerTtl) {
+        assertQuery("SELECT depId, name FROM " + schema + ".DEVELOPER WHERE name IS NOT NULL")
+            .matches(containsTableScan(schema, "DEVELOPER"))
+            .returns(50, "dev50").returns(51, "dev51").returns(52, "dev52").returns(53, "dev53").returns(54, "dev54")
+            .check();
+
+        assertQuery("SELECT depId, name FROM " + schema + ".DEVELOPER WHERE depId BETWEEN 30 and 70")
+            .matches(containsIndexScan(schema, "DEVELOPER"))
+            .returns(50, "dev50").returns(51, "dev51").returns(52, "dev52").returns(53, "dev53").returns(54, "dev54")
+            .check();
+
+        assertQuery("SELECT depId FROM " + schema + ".DEVELOPER WHERE depId BETWEEN 30 and 70")
+            .matches(containsSubPlan("inlineScan=[" + eagerTtl + "]"))
+            .returns(50).returns(51).returns(52).returns(53).returns(54)
+            .check();
+
+        assertQuery("SELECT min(depId) FROM " + schema + ".DEVELOPER")
+            .matches(containsSubPlan("IgniteIndexBound"))
+            .returns(50)
+            .check();
+
+        assertQuery("SELECT max(depId) FROM " + schema + ".DEVELOPER")
+            .matches(containsSubPlan("IgniteIndexBound"))
+            .returns(54)
+            .check();
+
+        assertQuery("SELECT count(depId) FROM " + schema + ".DEVELOPER")
+            .matches(containsSubPlan("IgniteIndexCount"))
+            .returns(5L)
+            .check();
+
+        assertQuery("SELECT count(*) FROM " + schema + ".DEVELOPER")
+            .matches(containsSubPlan("IgniteIndexCount"))
+            .returns(5L)
+            .check();
+    }
+
+    /** */
+    private static class Developer {
+        /** */
+        @QuerySqlField
+        String name;
+
+        /** */
+        @QuerySqlField(index = true)
+        int depId;
+
+        /** */
+        public Developer(String name, int depId) {
+            this.name = name;
+            this.depId = depId;
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 64c5858e435..328345e6523 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -701,7 +701,7 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public GridCacheContextInfo cacheInfo() {
-            throw new AssertionError();
+            return null;
         }
 
         /** {@inheritDoc} */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 569eaa44328..4ad37c03df1 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.CalciteEr
 import org.apache.ignite.internal.processors.query.calcite.integration.CorrelatesIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.DataTypesTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.DynamicParametersIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.ExpiredEntriesIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.FunctionsTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.HashSpoolIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.IndexDdlIntegrationTest;
@@ -115,6 +116,7 @@ import org.junit.runners.Suite;
     MemoryQuotasIntegrationTest.class,
     LocalDateTimeSupportTest.class,
     DynamicParametersIntegrationTest.class,
+    ExpiredEntriesIntegrationTest.class,
 })
 public class IntegrationTestSuite {
 }