You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2022/02/28 09:45:19 UTC

[ignite-3] branch main updated: IGNITE-16486 Adoption of a bunch of tickets from Ignite-2 - Fixes #637.

This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c3fcb2  IGNITE-16486 Adoption of a bunch of tickets from Ignite-2 - Fixes #637.
2c3fcb2 is described below

commit 2c3fcb2b39ea6c80993b4c9ec84655ebd36686da
Author: zstan <st...@gmail.com>
AuthorDate: Mon Feb 28 12:41:31 2022 +0300

    IGNITE-16486 Adoption of a bunch of tickets from Ignite-2 - Fixes #637.
    
    IGNITE-13179 Fix traits at the IgniteLimit.
    IGNITE-15603 NPE on subquery returning multiple results.
    IGNITE-15982 Correlates passes through table spools.
    
    Signed-off-by: zstan <st...@gmail.com>
---
 modules/runner/pom.xml                             |   6 --
 .../internal/sql/engine/ItCorrelatesTest.java      |  53 ++++++++++
 .../internal/sql/engine/util/QueryChecker.java     |   3 +-
 .../internal/sql/engine/exec/rel/LimitNode.java    |   8 +-
 .../sql/engine/rule/LogicalScanConverterRule.java  |  31 +++---
 .../sql/engine/trait/RewindabilityTraitDef.java    |   2 +-
 .../internal/sql/engine/trait/TraitUtils.java      |  17 +--
 .../sql/engine/exec/rel/LimitExecutionTest.java    | 115 +++++++++++++++++++++
 8 files changed, 207 insertions(+), 28 deletions(-)

diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index d037ca6..a5aa970 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -113,12 +113,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.immutables</groupId>
-            <artifactId>value-annotations</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
             <groupId>com.github.npathai</groupId>
             <artifactId>hamcrest-optional</artifactId>
             <scope>test</scope>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
new file mode 100644
index 0000000..bfaf6e3
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsSubPlan;
+
+import org.junit.jupiter.api.Test;
+
+/** Tests for correlated queries. */
+public class ItCorrelatesTest extends AbstractBasicIntegrationTest {
+    private static final String DISABLED_JOIN_RULES = " /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ ";
+
+    /** Checks correlates are assigned before access. */
+    @Test
+    public void testCorrelatesAssignedBeforeAccess() {
+        sql("create table test_tbl(k INTEGER primary key, v INTEGER)");
+        sql("INSERT INTO test_tbl VALUES (1, 1)");
+
+        assertQuery("SELECT " + DISABLED_JOIN_RULES + " t0.v, (SELECT t0.v + t1.v FROM test_tbl t1) AS j FROM test_tbl t0")
+                .matches(containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
+                .returns(1, 2)
+                .check();
+    }
+
+    /** Checks that correlates can't be moved under the table spool. */
+    @Test
+    public void testCorrelatesWithTableSpool() {
+        sql("CREATE TABLE test(k INTEGER primary key, i1 INT, i2 INT)");
+        sql("INSERT INTO test VALUES (1, 1, 1), (2, 2, 2)");
+
+        assertQuery("SELECT " + DISABLED_JOIN_RULES + " (SELECT t1.i1 + t1.i2 + t0.i2 FROM test t1 WHERE i1 = 1) FROM test t0")
+                .matches(containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
+                .matches(containsSubPlan("IgniteTableSpool"))
+                .returns(3)
+                .returns(4)
+                .check();
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java
index b37f21c..1a398e7 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java
@@ -183,7 +183,8 @@ public abstract class QueryChecker {
     /**
      * Adds plan matchers.
      */
-    public QueryChecker matches(Matcher<String>... planMatcher) {
+    @SafeVarargs
+    public final QueryChecker matches(Matcher<String>... planMatcher) {
         Collections.addAll(planMatchers, planMatcher);
 
         return this;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
index 2e27852..4ad4de3 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
@@ -76,9 +76,15 @@ public class LimitNode<RowT> extends AbstractNode<RowT> implements SingleNode<Ro
             rowsCnt = offset + rowsCnt;
         }
 
+        waiting = rowsCnt;
+
+        if (fetch > 0) {
+            rowsCnt = Math.min(rowsCnt, (fetch + offset) - rowsProcessed);
+        }
+
         checkState();
 
-        source().request(waiting = rowsCnt);
+        source().request(rowsCnt);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
index e8f9b96..4796000 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.rule;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -45,6 +46,7 @@ import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.CorrelationTrait;
 import org.apache.ignite.internal.sql.engine.trait.RewindabilityTrait;
 import org.apache.ignite.internal.sql.engine.util.RexUtils;
+import org.apache.ignite.internal.util.CollectionUtils;
 
 /**
  * LogicalScanConverterRule.
@@ -80,17 +82,18 @@ public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTa
                         collation = collation.apply(mapping);
                     }
 
-                    RelTraitSet traits = rel.getCluster().traitSetOf(IgniteConvention.INSTANCE)
-                            .replace(RewindabilityTrait.REWINDABLE)
-                            .replace(distribution)
-                            .replace(collation);
-
                     Set<CorrelationId> corrIds = RexUtils.extractCorrelationIds(rel.condition());
 
-                    if (!corrIds.isEmpty()) {
-                        traits = traits.replace(CorrelationTrait.correlations(corrIds));
+                    if (!CollectionUtils.nullOrEmpty(rel.projects())) {
+                        corrIds = new HashSet<>(CollectionUtils.union(corrIds, RexUtils.extractCorrelationIds(rel.projects())));
                     }
 
+                    RelTraitSet traits = rel.getCluster().traitSetOf(IgniteConvention.INSTANCE)
+                            .replace(RewindabilityTrait.REWINDABLE)
+                            .replace(distribution)
+                            .replace(collation)
+                            .replace(corrIds.isEmpty() ? CorrelationTrait.UNCORRELATED : CorrelationTrait.correlations(corrIds));
+
                     return new IgniteIndexScan(
                         cluster,
                         traits,
@@ -127,15 +130,17 @@ public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTa
                         distribution = distribution.apply(mapping);
                     }
 
-                    RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
-                            .replace(RewindabilityTrait.REWINDABLE)
-                            .replace(distribution);
-
                     Set<CorrelationId> corrIds = RexUtils.extractCorrelationIds(rel.condition());
-                    if (!corrIds.isEmpty()) {
-                        traits = traits.replace(CorrelationTrait.correlations(corrIds));
+
+                    if (!CollectionUtils.nullOrEmpty(rel.projects())) {
+                        corrIds = new HashSet<>(CollectionUtils.union(corrIds, RexUtils.extractCorrelationIds(rel.projects())));
                     }
 
+                    RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
+                            .replace(RewindabilityTrait.REWINDABLE)
+                            .replace(distribution)
+                            .replace(corrIds.isEmpty() ? CorrelationTrait.UNCORRELATED : CorrelationTrait.correlations(corrIds));
+
                     return new IgniteTableScan(rel.getCluster(), traits,
                         rel.getTable(), rel.projects(), rel.condition(), rel.requiredColumns());
                 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/RewindabilityTraitDef.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/RewindabilityTraitDef.java
index fdbca8d..84190c1 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/RewindabilityTraitDef.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/RewindabilityTraitDef.java
@@ -43,7 +43,7 @@ public class RewindabilityTraitDef extends RelTraitDef<RewindabilityTrait> {
     /** {@inheritDoc} */
     @Override
     public RelNode convert(RelOptPlanner planner, RelNode rel, RewindabilityTrait toTrait, boolean allowInfiniteCostConverters) {
-        return TraitUtils.convertRewindability(planner, toTrait, rel);
+        return TraitUtils.convertRewindability(toTrait, rel);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
index 6a3f816..a3d8026 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
@@ -128,7 +128,7 @@ public class TraitUtils {
         } else if (converter == DistributionTraitDef.INSTANCE) {
             return convertDistribution(planner, (IgniteDistribution) toTrait, rel);
         } else if (converter == RewindabilityTraitDef.INSTANCE) {
-            return convertRewindability(planner, (RewindabilityTrait) toTrait, rel);
+            return convertRewindability((RewindabilityTrait) toTrait, rel);
         } else {
             return convertOther(planner, converter, toTrait, rel);
         }
@@ -203,8 +203,7 @@ public class TraitUtils {
      * Convert rewindability. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
     @Nullable
-    public static RelNode convertRewindability(RelOptPlanner planner,
-            RewindabilityTrait toTrait, RelNode rel) {
+    public static RelNode convertRewindability(RewindabilityTrait toTrait, RelNode rel) {
         RewindabilityTrait fromTrait = rewindability(rel);
 
         if (fromTrait.satisfies(toTrait)) {
@@ -212,9 +211,15 @@ public class TraitUtils {
         }
 
         RelTraitSet traits = rel.getTraitSet()
-                .replace(toTrait);
-
-        return new IgniteTableSpool(rel.getCluster(), traits, Spool.Type.LAZY, rel);
+                .replace(toTrait)
+                .replace(CorrelationTrait.UNCORRELATED);
+
+        return new IgniteTableSpool(
+                rel.getCluster(),
+                traits,
+                Spool.Type.LAZY,
+                RelOptRule.convert(rel, rel.getTraitSet().replace(CorrelationTrait.UNCORRELATED)))
+                ;
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java
new file mode 100644
index 0000000..d0dc3ab
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test LimitNode execution.
+ */
+public class LimitExecutionTest extends AbstractExecutionTest {
+    /** Tests correct results fetched with Limit node. */
+    @Test
+    public void testLimit() {
+        int bufSize = Commons.IN_BUFFER_SIZE;
+
+        checkLimit(0, 1);
+        checkLimit(1, 0);
+        checkLimit(1, 1);
+        checkLimit(0, bufSize);
+        checkLimit(bufSize, 0);
+        checkLimit(bufSize, bufSize);
+        checkLimit(bufSize - 1, 1);
+        checkLimit(2000, 0);
+        checkLimit(0, 3000);
+        checkLimit(2000, 3000);
+    }
+
+    /**
+     * Check correct result size fetched.
+     *
+     * @param offset Rows offset.
+     * @param fetch Fetch rows count (zero means unlimited).
+     */
+    private void checkLimit(int offset, int fetch) {
+        ExecutionContext<Object[]> ctx = executionContext(true);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class);
+
+        RootNode<Object[]> rootNode = new RootNode<>(ctx, rowType);
+        LimitNode<Object[]> limitNode = new LimitNode<>(ctx, rowType, () -> offset, fetch == 0 ? null : () -> fetch);
+        SourceNode srcNode = new SourceNode(ctx, rowType);
+
+        rootNode.register(limitNode);
+        limitNode.register(srcNode);
+
+        if (fetch > 0) {
+            for (int i = offset; i < offset + fetch; i++) {
+                assertTrue(rootNode.hasNext());
+                assertEquals(i, rootNode.next()[0]);
+            }
+
+            assertFalse(rootNode.hasNext());
+            assertEquals(srcNode.requested.get(), offset + fetch);
+        } else {
+            assertTrue(rootNode.hasNext());
+            assertEquals(offset, rootNode.next()[0]);
+            assertTrue(srcNode.requested.get() > offset);
+        }
+    }
+
+    private static class SourceNode extends AbstractNode<Object[]> {
+
+        AtomicInteger requested = new AtomicInteger();
+
+        public SourceNode(ExecutionContext<Object[]> ctx, RelDataType rowType) {
+            super(ctx, rowType);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void rewindInternal() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Downstream<Object[]> requestDownstream(int idx) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void request(int rowsCnt) {
+            int r = requested.getAndAdd(rowsCnt);
+
+            context().execute(() -> {
+                for (int i = 0; i < rowsCnt; i++) {
+                    downstream().push(new Object[]{r + i});
+                }
+            }, this::onError);
+        }
+    }
+}