You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2022/02/28 09:45:19 UTC
[ignite-3] branch main updated: IGNITE-16486 Adoption of a bunch of tickets from Ignite-2 - Fixes #637.
This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2c3fcb2 IGNITE-16486 Adoption of a bunch of tickets from Ignite-2 - Fixes #637.
2c3fcb2 is described below
commit 2c3fcb2b39ea6c80993b4c9ec84655ebd36686da
Author: zstan <st...@gmail.com>
AuthorDate: Mon Feb 28 12:41:31 2022 +0300
IGNITE-16486 Adoption of a bunch of tickets from Ignite-2 - Fixes #637.
IGNITE-13179 Fix traits at the IgniteLimit.
IGNITE-15603 NPE on subquery returning multiple results.
IGNITE-15982 Correlates passes through table spools.
Signed-off-by: zstan <st...@gmail.com>
---
modules/runner/pom.xml | 6 --
.../internal/sql/engine/ItCorrelatesTest.java | 53 ++++++++++
.../internal/sql/engine/util/QueryChecker.java | 3 +-
.../internal/sql/engine/exec/rel/LimitNode.java | 8 +-
.../sql/engine/rule/LogicalScanConverterRule.java | 31 +++---
.../sql/engine/trait/RewindabilityTraitDef.java | 2 +-
.../internal/sql/engine/trait/TraitUtils.java | 17 +--
.../sql/engine/exec/rel/LimitExecutionTest.java | 115 +++++++++++++++++++++
8 files changed, 207 insertions(+), 28 deletions(-)
diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index d037ca6..a5aa970 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -113,12 +113,6 @@
</dependency>
<dependency>
- <groupId>org.immutables</groupId>
- <artifactId>value-annotations</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>com.github.npathai</groupId>
<artifactId>hamcrest-optional</artifactId>
<scope>test</scope>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
new file mode 100644
index 0000000..bfaf6e3
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsSubPlan;
+
+import org.junit.jupiter.api.Test;
+
+/** Tests for correlated queries. */
+public class ItCorrelatesTest extends AbstractBasicIntegrationTest {
+ private static final String DISABLED_JOIN_RULES = " /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ ";
+
+ /** Checks correlates are assigned before access. */
+ @Test
+ public void testCorrelatesAssignedBeforeAccess() {
+ sql("create table test_tbl(k INTEGER primary key, v INTEGER)");
+ sql("INSERT INTO test_tbl VALUES (1, 1)");
+
+ assertQuery("SELECT " + DISABLED_JOIN_RULES + " t0.v, (SELECT t0.v + t1.v FROM test_tbl t1) AS j FROM test_tbl t0")
+ .matches(containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
+ .returns(1, 2)
+ .check();
+ }
+
+ /** Checks that correlates can't be moved under the table spool. */
+ @Test
+ public void testCorrelatesWithTableSpool() {
+ sql("CREATE TABLE test(k INTEGER primary key, i1 INT, i2 INT)");
+ sql("INSERT INTO test VALUES (1, 1, 1), (2, 2, 2)");
+
+ assertQuery("SELECT " + DISABLED_JOIN_RULES + " (SELECT t1.i1 + t1.i2 + t0.i2 FROM test t1 WHERE i1 = 1) FROM test t0")
+ .matches(containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
+ .matches(containsSubPlan("IgniteTableSpool"))
+ .returns(3)
+ .returns(4)
+ .check();
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java
index b37f21c..1a398e7 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/util/QueryChecker.java
@@ -183,7 +183,8 @@ public abstract class QueryChecker {
/**
* Adds plan matchers.
*/
- public QueryChecker matches(Matcher<String>... planMatcher) {
+ @SafeVarargs
+ public final QueryChecker matches(Matcher<String>... planMatcher) {
Collections.addAll(planMatchers, planMatcher);
return this;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
index 2e27852..4ad4de3 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
@@ -76,9 +76,15 @@ public class LimitNode<RowT> extends AbstractNode<RowT> implements SingleNode<Ro
rowsCnt = offset + rowsCnt;
}
+ waiting = rowsCnt;
+
+ if (fetch > 0) {
+ rowsCnt = Math.min(rowsCnt, (fetch + offset) - rowsProcessed);
+ }
+
checkState();
- source().request(waiting = rowsCnt);
+ source().request(rowsCnt);
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
index e8f9b96..4796000 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/LogicalScanConverterRule.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine.rule;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -45,6 +46,7 @@ import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
import org.apache.ignite.internal.sql.engine.trait.CorrelationTrait;
import org.apache.ignite.internal.sql.engine.trait.RewindabilityTrait;
import org.apache.ignite.internal.sql.engine.util.RexUtils;
+import org.apache.ignite.internal.util.CollectionUtils;
/**
* LogicalScanConverterRule.
@@ -80,17 +82,18 @@ public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTa
collation = collation.apply(mapping);
}
- RelTraitSet traits = rel.getCluster().traitSetOf(IgniteConvention.INSTANCE)
- .replace(RewindabilityTrait.REWINDABLE)
- .replace(distribution)
- .replace(collation);
-
Set<CorrelationId> corrIds = RexUtils.extractCorrelationIds(rel.condition());
- if (!corrIds.isEmpty()) {
- traits = traits.replace(CorrelationTrait.correlations(corrIds));
+ if (!CollectionUtils.nullOrEmpty(rel.projects())) {
+ corrIds = new HashSet<>(CollectionUtils.union(corrIds, RexUtils.extractCorrelationIds(rel.projects())));
}
+ RelTraitSet traits = rel.getCluster().traitSetOf(IgniteConvention.INSTANCE)
+ .replace(RewindabilityTrait.REWINDABLE)
+ .replace(distribution)
+ .replace(collation)
+ .replace(corrIds.isEmpty() ? CorrelationTrait.UNCORRELATED : CorrelationTrait.correlations(corrIds));
+
return new IgniteIndexScan(
cluster,
traits,
@@ -127,15 +130,17 @@ public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTa
distribution = distribution.apply(mapping);
}
- RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
- .replace(RewindabilityTrait.REWINDABLE)
- .replace(distribution);
-
Set<CorrelationId> corrIds = RexUtils.extractCorrelationIds(rel.condition());
- if (!corrIds.isEmpty()) {
- traits = traits.replace(CorrelationTrait.correlations(corrIds));
+
+ if (!CollectionUtils.nullOrEmpty(rel.projects())) {
+ corrIds = new HashSet<>(CollectionUtils.union(corrIds, RexUtils.extractCorrelationIds(rel.projects())));
}
+ RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
+ .replace(RewindabilityTrait.REWINDABLE)
+ .replace(distribution)
+ .replace(corrIds.isEmpty() ? CorrelationTrait.UNCORRELATED : CorrelationTrait.correlations(corrIds));
+
return new IgniteTableScan(rel.getCluster(), traits,
rel.getTable(), rel.projects(), rel.condition(), rel.requiredColumns());
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/RewindabilityTraitDef.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/RewindabilityTraitDef.java
index fdbca8d..84190c1 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/RewindabilityTraitDef.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/RewindabilityTraitDef.java
@@ -43,7 +43,7 @@ public class RewindabilityTraitDef extends RelTraitDef<RewindabilityTrait> {
/** {@inheritDoc} */
@Override
public RelNode convert(RelOptPlanner planner, RelNode rel, RewindabilityTrait toTrait, boolean allowInfiniteCostConverters) {
- return TraitUtils.convertRewindability(planner, toTrait, rel);
+ return TraitUtils.convertRewindability(toTrait, rel);
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
index 6a3f816..a3d8026 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
@@ -128,7 +128,7 @@ public class TraitUtils {
} else if (converter == DistributionTraitDef.INSTANCE) {
return convertDistribution(planner, (IgniteDistribution) toTrait, rel);
} else if (converter == RewindabilityTraitDef.INSTANCE) {
- return convertRewindability(planner, (RewindabilityTrait) toTrait, rel);
+ return convertRewindability((RewindabilityTrait) toTrait, rel);
} else {
return convertOther(planner, converter, toTrait, rel);
}
@@ -203,8 +203,7 @@ public class TraitUtils {
* Convert rewindability. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
@Nullable
- public static RelNode convertRewindability(RelOptPlanner planner,
- RewindabilityTrait toTrait, RelNode rel) {
+ public static RelNode convertRewindability(RewindabilityTrait toTrait, RelNode rel) {
RewindabilityTrait fromTrait = rewindability(rel);
if (fromTrait.satisfies(toTrait)) {
@@ -212,9 +211,15 @@ public class TraitUtils {
}
RelTraitSet traits = rel.getTraitSet()
- .replace(toTrait);
-
- return new IgniteTableSpool(rel.getCluster(), traits, Spool.Type.LAZY, rel);
+ .replace(toTrait)
+ .replace(CorrelationTrait.UNCORRELATED);
+
+ return new IgniteTableSpool(
+ rel.getCluster(),
+ traits,
+ Spool.Type.LAZY,
+ RelOptRule.convert(rel, rel.getTraitSet().replace(CorrelationTrait.UNCORRELATED)))
+ ;
}
@SuppressWarnings({"unchecked", "rawtypes"})
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java
new file mode 100644
index 0000000..d0dc3ab
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitExecutionTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test LimitNode execution.
+ */
+public class LimitExecutionTest extends AbstractExecutionTest {
+ /** Tests correct results fetched with Limit node. */
+ @Test
+ public void testLimit() {
+ int bufSize = Commons.IN_BUFFER_SIZE;
+
+ checkLimit(0, 1);
+ checkLimit(1, 0);
+ checkLimit(1, 1);
+ checkLimit(0, bufSize);
+ checkLimit(bufSize, 0);
+ checkLimit(bufSize, bufSize);
+ checkLimit(bufSize - 1, 1);
+ checkLimit(2000, 0);
+ checkLimit(0, 3000);
+ checkLimit(2000, 3000);
+ }
+
+ /**
+ * Check correct result size fetched.
+ *
+ * @param offset Rows offset.
+ * @param fetch Fetch rows count (zero means unlimited).
+ */
+ private void checkLimit(int offset, int fetch) {
+ ExecutionContext<Object[]> ctx = executionContext(true);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class);
+
+ RootNode<Object[]> rootNode = new RootNode<>(ctx, rowType);
+ LimitNode<Object[]> limitNode = new LimitNode<>(ctx, rowType, () -> offset, fetch == 0 ? null : () -> fetch);
+ SourceNode srcNode = new SourceNode(ctx, rowType);
+
+ rootNode.register(limitNode);
+ limitNode.register(srcNode);
+
+ if (fetch > 0) {
+ for (int i = offset; i < offset + fetch; i++) {
+ assertTrue(rootNode.hasNext());
+ assertEquals(i, rootNode.next()[0]);
+ }
+
+ assertFalse(rootNode.hasNext());
+ assertEquals(srcNode.requested.get(), offset + fetch);
+ } else {
+ assertTrue(rootNode.hasNext());
+ assertEquals(offset, rootNode.next()[0]);
+ assertTrue(srcNode.requested.get() > offset);
+ }
+ }
+
+ private static class SourceNode extends AbstractNode<Object[]> {
+
+ AtomicInteger requested = new AtomicInteger();
+
+ public SourceNode(ExecutionContext<Object[]> ctx, RelDataType rowType) {
+ super(ctx, rowType);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void rewindInternal() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Downstream<Object[]> requestDownstream(int idx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void request(int rowsCnt) {
+ int r = requested.getAndAdd(rowsCnt);
+
+ context().execute(() -> {
+ for (int i = 0; i < rowsCnt; i++) {
+ downstream().push(new Object[]{r + i});
+ }
+ }, this::onError);
+ }
+ }
+}