You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/05/20 08:43:15 UTC
[ignite] branch sql-calcite updated: IGNITE-14553 Calcite engine.
Fix duplicated result on insert
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new c5286a6 IGNITE-14553 Calcite engine. Fix duplicated result on insert
c5286a6 is described below
commit c5286a695c9e6b9e74d9caa1ffd640debbedb3ef
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Thu May 20 11:28:33 2021 +0300
IGNITE-14553 Calcite engine. Fix duplicated result on insert
---
.../query/calcite/exec/ExecutionServiceImpl.java | 64 +----
.../query/calcite/exec/LogicalRelImplementor.java | 3 +-
.../query/calcite/exec/PlannerHelper.java | 230 ++++++++++++++++
.../query/calcite/exec/ddl/DdlCommandHandler.java | 7 +-
.../query/calcite/exec/rel/TableSpoolNode.java | 50 ++--
.../query/calcite/prepare/IgnitePlanner.java | 8 +-
.../query/calcite/prepare/IgniteSqlValidator.java | 11 +-
.../prepare/ddl/DdlSqlToCommandConverter.java | 2 +-
.../query/calcite/rel/IgniteTableModify.java | 34 +--
.../query/calcite/rel/IgniteTableSpool.java | 17 +-
.../query/calcite/schema/TableDescriptor.java | 4 +-
.../query/calcite/schema/TableDescriptorImpl.java | 5 +
.../processors/query/calcite/trait/TraitUtils.java | 3 +-
.../processors/query/calcite/util/HintUtils.java | 11 +-
.../query/calcite/CalciteQueryProcessorTest.java | 111 --------
.../calcite/exec/rel/TableSpoolExecutionTest.java | 73 +++++-
.../AbstractBasicIntegrationTest.java | 4 +-
.../AggregatesIntegrationTest.java | 2 +-
.../CalciteErrorHandlilngIntegrationTest.java | 2 +-
.../IndexSpoolIntegrationTest.java | 2 +-
.../{ => integration}/MetadataIntegrationTest.java | 2 +-
.../SortAggregateIntegrationTest.java | 2 +-
.../{ => integration}/TableDdlIntegrationTest.java | 3 +-
.../integration/TableDmlIntegrationTest.java | 288 +++++++++++++++++++++
.../query/calcite/planner/AbstractPlannerTest.java | 284 ++++++++++++++++++--
.../calcite/planner/JoinColocationPlannerTest.java | 68 -----
.../query/calcite/planner/TableDmlPlannerTest.java | 174 +++++++++++++
.../ignite/testsuites/IgniteCalciteTestSuite.java | 35 +--
...iteTestSuite.java => IntegrationTestSuite.java} | 27 +-
.../apache/ignite/testsuites/PlannerTestSuite.java | 2 +
.../types/string/test_scan_big_varchar.test_slow | 24 ++
.../string/test_scan_big_varchar.test_slow_ignored | 153 -----------
parent/pom.xml | 2 +
33 files changed, 1169 insertions(+), 538 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index ab3bb4e..7420ed6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -34,15 +34,8 @@ import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.hint.Hintable;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlDdl;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlExplainLevel;
@@ -52,7 +45,6 @@ import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.ValidationException;
-import org.apache.calcite.util.Pair;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
@@ -98,7 +90,6 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepDmlPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
@@ -106,18 +97,14 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate
import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
import org.apache.ignite.internal.processors.query.calcite.prepare.ValidationResult;
import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.DdlSqlToCommandConverter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTraitDef;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.query.calcite.util.HintUtils;
import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
@@ -131,6 +118,7 @@ import org.jetbrains.annotations.Nullable;
import static java.util.Collections.singletonList;
import static org.apache.calcite.rel.type.RelDataType.PRECISION_NOT_SPECIFIED;
import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+import static org.apache.ignite.internal.processors.query.calcite.exec.PlannerHelper.optimize;
import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader.fromJson;
/**
@@ -594,7 +582,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
sqlNode = validated.sqlNode();
- IgniteRel igniteRel = optimize(sqlNode, planner);
+ IgniteRel igniteRel = optimize(sqlNode, planner, log);
// Split query plan to query fragments.
List<Fragment> fragments = new Splitter().go(igniteRel);
@@ -612,7 +600,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
sqlNode = planner.validate(sqlNode);
// Convert to Relational operators graph
- IgniteRel igniteRel = optimize(sqlNode, planner);
+ IgniteRel igniteRel = optimize(sqlNode, planner, log);
// Split query plan to query fragments.
List<Fragment> fragments = new Splitter().go(igniteRel);
@@ -632,58 +620,16 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
}
/** */
- private IgniteRel optimize(SqlNode sqlNode, IgnitePlanner planner) {
- try {
- // Convert to Relational operators graph
- RelRoot root = planner.rel(sqlNode);
-
- RelNode rel = root.rel;
-
- if (rel instanceof Hintable)
- planner.setDisabledRules(HintUtils.disabledRules((Hintable)rel));
-
- // Transformation chain
- rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
-
- RelTraitSet desired = rel.getCluster().traitSet()
- .replace(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single())
- .replace(root.collation == null ? RelCollations.EMPTY : root.collation)
- .simplify();
-
- IgniteRel igniteRel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
-
- if (!root.isRefTrivial()) {
- final List<RexNode> projects = new ArrayList<>();
- final RexBuilder rexBuilder = igniteRel.getCluster().getRexBuilder();
-
- for (int field : Pair.left(root.fields))
- projects.add(rexBuilder.makeInputRef(igniteRel, field));
-
- igniteRel = new IgniteProject(igniteRel.getCluster(), desired, igniteRel, projects, root.validatedRowType);
- }
-
- return igniteRel;
- }
- catch (Throwable ex) {
- log.error("Unexpected error at query optimizer.", ex);
- log.error(planner.dump());
-
- throw ex;
- }
- }
-
- /** */
private QueryPlan prepareExplain(SqlNode explain, PlanningContext ctx) throws ValidationException {
IgnitePlanner planner = ctx.planner();
SqlNode sql = ((SqlExplain)explain).getExplicandum();
// Validate
- explain = planner.validate(sql);
+ sql = planner.validate(sql);
// Convert to Relational operators graph
- IgniteRel igniteRel = optimize(explain, planner);
+ IgniteRel igniteRel = optimize(sql, planner, log);
String plan = RelOptUtil.toString(igniteRel, SqlExplainLevel.ALL_ATTRIBUTES);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 15a1d6f..dc559f5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -27,6 +27,7 @@ import java.util.function.Supplier;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Spool;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
@@ -373,7 +374,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteTableSpool rel) {
- TableSpoolNode<Row> node = new TableSpoolNode<>(ctx, rel.getRowType());
+ TableSpoolNode<Row> node = new TableSpoolNode<>(ctx, rel.getRowType(), rel.readType == Spool.Type.LAZY);
Node<Row> input = visit(rel.getInput());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PlannerHelper.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PlannerHelper.java
new file mode 100644
index 0000000..0cf58ec
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PlannerHelper.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.core.Spool;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteRelShuttle;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
+import org.apache.ignite.internal.processors.query.calcite.rel.AbstractIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
+import org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.HintUtils;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** */
+public class PlannerHelper {
+ /**
+ * Default constructor.
+ */
+ private PlannerHelper() {
+
+ }
+
+ /**
+ * @param sqlNode Sql node.
+ * @param planner Planner.
+ * @param log Logger.
+ */
+ public static IgniteRel optimize(SqlNode sqlNode, IgnitePlanner planner, IgniteLogger log) {
+ try {
+ // Convert to Relational operators graph
+ RelRoot root = planner.rel(sqlNode);
+
+ RelNode rel = root.rel;
+
+ if (!F.isEmpty(root.hints))
+ planner.setDisabledRules(HintUtils.disabledRules(root.hints));
+
+ // Transformation chain
+ rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+ RelTraitSet desired = rel.getCluster().traitSet()
+ .replace(IgniteConvention.INSTANCE)
+ .replace(IgniteDistributions.single())
+ .replace(root.collation == null ? RelCollations.EMPTY : root.collation)
+ .simplify();
+
+ IgniteRel igniteRel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+ if (!root.isRefTrivial()) {
+ final List<RexNode> projects = new ArrayList<>();
+ final RexBuilder rexBuilder = igniteRel.getCluster().getRexBuilder();
+
+ for (int field : Pair.left(root.fields))
+ projects.add(rexBuilder.makeInputRef(igniteRel, field));
+
+ igniteRel = new IgniteProject(igniteRel.getCluster(), desired, igniteRel, projects, root.validatedRowType);
+ }
+
+ if (sqlNode.isA(ImmutableSet.of(SqlKind.INSERT, SqlKind.UPDATE, SqlKind.MERGE)))
+ igniteRel = new FixDependentModifyNodeShuttle().visit(igniteRel);
+
+ return igniteRel;
+ }
+ catch (Throwable ex) {
+ log.error("Unexpected error at query optimizer.", ex);
+ log.error(planner.dump());
+
+ throw ex;
+ }
+ }
+
+ /**
+ * This shuttle analyzes a relational tree and inserts an eager spool node
+ * just under the TableModify node in case latter depends upon a table used
+ * to query the data for modify node to avoid the double processing
+ * of the retrieved rows.
+ * <p/>
+ * It considers two cases: <ol>
+ * <li>
+ * Modify node produces rows to insert, then a spool is required.
+ * </li>
+ * <li>
+ * Modify node updates rows only, then a spool is required if 1) we
+ * are scaning an index and 2) any of the indexed column is updated
+ * by modify node.
+ * </li>
+ * <ol/>
+ *
+ */
+ private static class FixDependentModifyNodeShuttle extends IgniteRelShuttle {
+ /**
+ * Flag indicates whether we should insert a spool or not.
+ */
+ private boolean spoolNeeded;
+
+ /** Current modify node. */
+ private IgniteTableModify modifyNode;
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTableModify rel) {
+ assert modifyNode == null;
+
+ modifyNode = rel;
+
+ if (rel.isDelete())
+ return rel;
+
+ processNode(rel);
+
+ if (spoolNeeded) {
+ IgniteTableSpool spool = new IgniteTableSpool(
+ rel.getCluster(),
+ rel.getInput().getTraitSet(),
+ Spool.Type.EAGER,
+ rel.getInput()
+ );
+
+ rel.replaceInput(0, spool);
+ }
+
+ return rel;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteTableScan rel) {
+ return processScan(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteRel visit(IgniteIndexScan rel) {
+ return processScan(rel);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteRel processNode(IgniteRel rel) {
+ List<IgniteRel> inputs = Commons.cast(rel.getInputs());
+
+ for (int i = 0; i < inputs.size(); i++) {
+ if (spoolNeeded)
+ break;
+
+ visitChild(rel, i, inputs.get(i));
+ }
+
+ return rel;
+ }
+
+ /**
+ * Process a scan node and raise a {@link #spoolNeeded flag} if needed.
+ *
+ * @param scan TableScan to analize.
+ * @return The input rel.
+ */
+ private IgniteRel processScan(TableScan scan) {
+ IgniteTable tbl = modifyNode != null ? modifyNode.getTable().unwrap(IgniteTable.class) : null;
+
+ if (tbl == null || scan.getTable().unwrap(IgniteTable.class) != tbl)
+ return (IgniteRel)scan;
+
+ if (modifyNodeInsertsData()) {
+ spoolNeeded = true;
+
+ return (IgniteRel)scan;
+ }
+
+ // for update-only node the spool needed if any of the updated
+ // column is part of the index we are going to scan
+ if (scan instanceof IgniteTableScan)
+ return (IgniteRel)scan;
+
+ ImmutableSet<Integer> indexedCols = ImmutableSet.copyOf(
+ tbl.getIndex(((AbstractIndexScan)scan).indexName()).collation().getKeys());
+
+ spoolNeeded = modifyNode.getUpdateColumnList().stream()
+ .map(tbl.descriptor()::columnDescriptor)
+ .map(ColumnDescriptor::fieldIndex)
+ .anyMatch(indexedCols::contains);
+
+ return (IgniteRel)scan;
+ }
+
+ /**
+ * @return {@code true} in case {@link #modifyNode} produces any insert.
+ */
+ private boolean modifyNodeInsertsData() {
+ return modifyNode.isInsert(); // MERGE should be analyzed too
+ // but currently it is not implemented
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java
index 7303e84..d07dbcc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.exec.ddl;
+import java.lang.reflect.Type;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -171,7 +172,11 @@ public class DdlCommandHandler {
for (ColumnDefinition col : cmd.columns()) {
String name = col.name();
- res.addQueryField(name, tf.getJavaClass(col.type()).getTypeName(), null);
+ Type javaType = tf.getJavaClass(col.type());
+
+ String typeName = javaType instanceof Class ? ((Class<?>)javaType).getName() : javaType.getTypeName();
+
+ res.addQueryField(name, typeName, null);
if (!col.nullable()) {
if (notNullFields == null)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
index 7da2e58..8a81492 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
@@ -40,6 +40,12 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
private final List<Row> rows;
/**
+ * If {@code true} this spool should emit rows as soon as it stored.
+ * If {@code false} the spool have to collect all rows from underlying input.
+ */
+ private final boolean lazyRead;
+
+ /**
* Flag indicates that spool pushes row to downstream.
* Need to check a case when a downstream produces requests on push.
*/
@@ -48,9 +54,11 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
/**
* @param ctx Execution context.
*/
- public TableSpoolNode(ExecutionContext<Row> ctx, RelDataType rowType) {
+ public TableSpoolNode(ExecutionContext<Row> ctx, RelDataType rowType, boolean lazyRead) {
super(ctx, rowType);
+ this.lazyRead = lazyRead;
+
rows = new ArrayList<>();
}
@@ -90,29 +98,32 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
/** */
private void doPush() throws Exception {
- if (rowIdx >= rows.size() && waiting == -1 && requested > 0) {
- downstream().end();
-
+ if (isClosed())
return;
- }
- while (requested > 0 && rowIdx < rows.size())
- pushToDownstream();
- }
+ if (!lazyRead && waiting != -1)
+ return;
- /** */
- private void pushToDownstream() throws Exception {
+ int processed = 0;
inLoop = true;
+ try {
+ while (requested > 0 && rowIdx < rows.size() && processed++ < IN_BUFFER_SIZE) {
+ downstream().push(rows.get(rowIdx));
- downstream().push(rows.get(rowIdx));
-
- inLoop = false;
-
- rowIdx++;
- requested--;
+ rowIdx++;
+ requested--;
+ }
+ }
+ finally {
+ inLoop = false;
+ }
- if (rowIdx >= rows.size() && waiting == -1 && requested > 0)
+ if (rowIdx >= rows.size() && waiting == -1 && requested > 0) {
+ requested = 0;
downstream().end();
+ }
+ else if (requested > 0 && processed >= IN_BUFFER_SIZE)
+ context().execute(this::doPush, this::onError);
}
/** {@inheritDoc} */
@@ -130,7 +141,7 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
source().request(waiting = IN_BUFFER_SIZE);
if (requested > 0 && rowIdx < rows.size())
- pushToDownstream();
+ doPush();
}
/** {@inheritDoc} */
@@ -142,7 +153,6 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
waiting = -1;
- if (rowIdx >= rows.size() && requested > 0)
- downstream().end();
+ context().execute(this::doPush, this::onError);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index fdf9b73..ca3b6d2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -188,7 +188,13 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
return Pair.of(validatedNode, type);
}
- public RelDataType conver(SqlDataTypeSpec typeSpec) {
+ /**
+ * Converts a SQL data type specification to a relational data type.
+ *
+ * @param typeSpec Spec to convert from.
+ * @return Relational type representation of given SQL type.
+ */
+ public RelDataType convert(SqlDataTypeSpec typeSpec) {
return typeSpec.deriveType(validator());
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
index ac46fe4..f39cb07 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
@@ -53,6 +53,7 @@ import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.sql.validate.SqlValidatorTable;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTableImpl;
import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
@@ -128,9 +129,13 @@ public class IgniteSqlValidator extends SqlValidatorImpl {
/** {@inheritDoc} */
@Override protected SqlSelect createSourceSelectForUpdate(SqlUpdate call) {
- final SqlNodeList selectList = SqlNodeList.of(
- new SqlIdentifier(QueryUtils.KEY_FIELD_NAME, SqlParserPos.ZERO),
- new SqlIdentifier(QueryUtils.VAL_FIELD_NAME, SqlParserPos.ZERO));
+ final SqlNodeList selectList = new SqlNodeList(SqlParserPos.ZERO);
+ final SqlValidatorTable table = getCatalogReader().getTable(((SqlIdentifier)call.getTargetTable()).names);
+
+ table.unwrap(IgniteTable.class).descriptor().selectForUpdateRowType((IgniteTypeFactory)typeFactory)
+ .getFieldNames().stream()
+ .map(name -> new SqlIdentifier(name, SqlParserPos.ZERO))
+ .forEach(selectList::add);
int ordinal = 0;
// Force unique aliases to avoid a duplicate for Y with SET X=Y
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/DdlSqlToCommandConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/DdlSqlToCommandConverter.java
index 8c0e0c3..984ffb9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/DdlSqlToCommandConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/DdlSqlToCommandConverter.java
@@ -166,7 +166,7 @@ public class DdlSqlToCommandConverter {
"querySql=\"" + ctx.query() + "\"]", IgniteQueryErrorCode.PARSING);
String name = col.name.getSimple();
- RelDataType type = planner.conver(col.dataType);
+ RelDataType type = planner.convert(col.dataType);
Object dflt = null;
if (col.expression != null)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
index 0ff966a..66c274d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
@@ -27,9 +27,7 @@ import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rex.RexNode;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-/**
- *
- */
+/** */
public class IgniteTableModify extends TableModify implements IgniteRel {
/**
* Creates a {@code TableModify}.
@@ -38,28 +36,34 @@ public class IgniteTableModify extends TableModify implements IgniteRel {
* <blockquote>
* <pre>UPDATE table SET iden1 = exp1, ident2 = exp2 WHERE condition</pre>
* </blockquote>
- * @param cluster Cluster this relational expression belongs to
- * @param traitSet Traits of this relational expression
- * @param table Target table to modify
- * @param input Sub-query or filter condition
- * @param operation Modify operation (INSERT, UPDATE, DELETE)
- * @param updateColumnList List of column identifiers to be updated
- * (e.g. ident1, ident2); null if not UPDATE
- * @param sourceExpressionList List of value expressions to be set
- * (e.g. exp1, exp2); null if not UPDATE
- * @param flattened Whether set flattens the input row type
+ *
+ * @param cluster Cluster this relational expression belongs to.
+ * @param traitSet Traits of this relational expression.
+ * @param table Target table to modify.
+ * @param input Sub-query or filter condition.
+ * @param operation Modify operation (INSERT, UPDATE, DELETE).
+ * @param updateColumnList List of column identifiers to be updated (e.g. ident1, ident2); null if not UPDATE.
+ * @param sourceExpressionList List of value expressions to be set (e.g. exp1, exp2); null if not UPDATE.
+ * @param flattened Whether set flattens the input row type.
*/
- public IgniteTableModify(RelOptCluster cluster,
+ public IgniteTableModify(
+ RelOptCluster cluster,
RelTraitSet traitSet,
RelOptTable table,
RelNode input,
Operation operation,
List<String> updateColumnList,
List<RexNode> sourceExpressionList,
- boolean flattened) {
+ boolean flattened
+ ) {
super(cluster, traitSet, table, Commons.context(cluster).catalogReader(), input, operation, updateColumnList, sourceExpressionList, flattened);
}
+ /**
+ * Creates a {@code TableModify} from serialized {@link RelInput input}.
+ *
+ * @param input The input to create node from.
+ */
public IgniteTableModify(RelInput input) {
this(
input.getCluster(),
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
index fc5d095..9c5a266 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.rel;
import java.util.List;
-
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
@@ -30,8 +29,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
-import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
-
/**
* Relational operator that returns the contents of a table.
*/
@@ -40,9 +37,10 @@ public class IgniteTableSpool extends Spool implements IgniteRel {
public IgniteTableSpool(
RelOptCluster cluster,
RelTraitSet traits,
+ Spool.Type readType,
RelNode input
) {
- super(cluster, traits, input, Type.LAZY, Type.EAGER);
+ super(cluster, traits, input, readType, Type.EAGER);
}
/**
@@ -52,9 +50,10 @@ public class IgniteTableSpool extends Spool implements IgniteRel {
*/
public IgniteTableSpool(RelInput input) {
this(
- changeTraits(input, IgniteConvention.INSTANCE).getCluster(),
- changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(),
- changeTraits(input, IgniteConvention.INSTANCE).getInput()
+ input.getCluster(),
+ input.getTraitSet().replace(IgniteConvention.INSTANCE),
+ input.getEnum("readType", Spool.Type.class),
+ input.getInput()
);
}
@@ -65,12 +64,12 @@ public class IgniteTableSpool extends Spool implements IgniteRel {
/** */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteTableSpool(cluster, getTraitSet(), inputs.get(0));
+ return new IgniteTableSpool(cluster, getTraitSet(), readType, inputs.get(0));
}
/** {@inheritDoc} */
@Override protected Spool copy(RelTraitSet traitSet, RelNode input, Type readType, Type writeType) {
- return new IgniteTableSpool(getCluster(), traitSet, input);
+ return new IgniteTableSpool(getCluster(), traitSet, readType, input);
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
index bacafd8..3adfbfb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
@@ -83,12 +83,12 @@ public interface TableDescriptor extends RelProtoDataType, InitializerExpression
}
/**
- * Returns row type including effectively virtual fields.
+ * Returns row type of internal fields (either virtual or real) only.
*
* @param factory Type factory.
* @return Row type for SELECT operation.
*/
- default RelDataType selectRowType(IgniteTypeFactory factory) {
+ default RelDataType selectForUpdateRowType(IgniteTypeFactory factory) {
return rowType(factory, null);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
index 44c8437..9aa2692 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
@@ -211,6 +211,11 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
}
/** {@inheritDoc} */
+ @Override public RelDataType selectForUpdateRowType(IgniteTypeFactory factory) {
+ return rowType(factory, ImmutableBitSet.of(keyField, valField));
+ }
+
+ /** {@inheritDoc} */
@Override public GridCacheContext cacheContext() {
return cacheInfo.cacheContext();
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 04094fd..7f20df0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -43,6 +43,7 @@ import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Spool;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
@@ -176,7 +177,7 @@ public class TraitUtils {
RelTraitSet traits = rel.getTraitSet()
.replace(toTrait);
- return new IgniteTableSpool(rel.getCluster(), traits, rel);
+ return new IgniteTableSpool(rel.getCluster(), traits, Spool.Type.LAZY, rel);
}
/** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/HintUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/HintUtils.java
index 4076f5f..35d9ada 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/HintUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/HintUtils.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.query.calcite.util;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.calcite.rel.hint.Hintable;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.ignite.internal.util.typedef.F;
/** */
public class HintUtils {
@@ -30,11 +33,11 @@ public class HintUtils {
}
/** */
- public static Set<String> disabledRules(Hintable rel) {
- if (rel.getHints().isEmpty())
+ public static Set<String> disabledRules(ImmutableList<RelHint> hints) {
+ if (F.isEmpty(hints))
return Collections.emptySet();
- return rel.getHints().stream()
+ return hints.stream()
.filter(h -> "DISABLE_RULE".equals(h.hintName))
.flatMap(h -> h.listOptions.stream())
.collect(Collectors.toSet());
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index edb10a2..1ff9268 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -1038,117 +1038,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
assertEqualsCollections(Arrays.asList("Roman", 0, "Ignite"), F.first(query.get(1).getAll()));
}
- /** */
- @Test
- public void testInsertPrimitiveKey() throws Exception {
- grid(1).getOrCreateCache(new CacheConfiguration<Integer, Developer>()
- .setName("developer")
- .setSqlSchema("PUBLIC")
- .setIndexedTypes(Integer.class, Developer.class)
- .setBackups(2)
- );
-
- QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
-
- List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
- "INSERT INTO DEVELOPER(_key, name, projectId) VALUES (?, ?, ?)", 0, "Igor", 1);
-
- assertEquals(1, query.size());
-
- List<List<?>> rows = query.get(0).getAll();
-
- assertEquals(1, rows.size());
-
- List<?> row = rows.get(0);
-
- assertNotNull(row);
-
- assertEqualsCollections(F.asList(1L), row);
-
- query = engine.query(null, "PUBLIC", "select _key, * from DEVELOPER");
-
- assertEquals(1, query.size());
-
- row = F.first(query.get(0).getAll());
-
- assertNotNull(row);
-
- assertEqualsCollections(Arrays.asList(0, "Igor", 1), row);
- }
-
- /** */
- @Test
- public void testInsertUpdateDeleteNonPrimitiveKey() throws Exception {
- client.getOrCreateCache(new CacheConfiguration<Key, Developer>()
- .setName("developer")
- .setSqlSchema("PUBLIC")
- .setIndexedTypes(Key.class, Developer.class)
- .setBackups(2)
- );
-
- awaitPartitionMapExchange(true, true, null);
-
- QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
-
- List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", "INSERT INTO DEVELOPER VALUES (?, ?, ?, ?)", 0, 0, "Igor", 1);
-
- assertEquals(1, query.size());
-
- List<?> row = F.first(query.get(0).getAll());
-
- assertNotNull(row);
-
- assertEqualsCollections(F.asList(1L), row);
-
- query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
-
- assertEquals(1, query.size());
-
- row = F.first(query.get(0).getAll());
-
- assertNotNull(row);
-
- assertEqualsCollections(F.asList(0, 0, "Igor", 1), row);
-
- query = engine.query(null, "PUBLIC", "UPDATE DEVELOPER d SET name = 'Roman' WHERE id = ?", 0);
-
- assertEquals(1, query.size());
-
- row = F.first(query.get(0).getAll());
-
- assertNotNull(row);
-
- assertEqualsCollections(F.asList(1L), row);
-
- query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
-
- assertEquals(1, query.size());
-
- row = F.first(query.get(0).getAll());
-
- assertNotNull(row);
-
- assertEqualsCollections(F.asList(0, 0, "Roman", 1), row);
-
- query = engine.query(null, "PUBLIC", "DELETE FROM DEVELOPER WHERE id = ?", 0);
-
- assertEquals(1, query.size());
-
- row = F.first(query.get(0).getAll());
-
- assertNotNull(row);
-
- assertEqualsCollections(F.asList(1L), row);
-
- query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
-
- assertEquals(1, query.size());
-
- row = F.first(query.get(0).getAll());
-
- assertNull(row);
- }
-
/**
* Test verifies that table has a distribution function over valid keys.
*/
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java
index 6a08401..23e2012 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java
@@ -17,11 +17,14 @@
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.stream.IntStream;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
@@ -32,6 +35,8 @@ import org.jetbrains.annotations.NotNull;
import org.junit.Before;
import org.junit.Test;
+import static java.util.Collections.singletonList;
+
/**
*
*/
@@ -47,11 +52,71 @@ public class TableSpoolExecutionTest extends AbstractExecutionTest {
super.setup();
}
+ /** */
+ @Test
+ public void testLazyTableSpool() throws Exception {
+ checkTableSpool(
+ (ctx, rowType) -> new TableSpoolNode<>(ctx, rowType, true)
+ );
+ }
+
+ /** */
+ @Test
+ public void testEagerTableSpool() throws Exception {
+ checkTableSpool(
+ (ctx, rowType) -> new TableSpoolNode<>(ctx, rowType, false)
+ );
+ }
+
/**
+ * Ensure eager spool reads underlying input till the end before emmitting
+ * the very first row.
*
+ * @throws IgniteCheckedException In case if error.
*/
@Test
- public void testTableSpool() throws Exception {
+ public void testEagerSpoolReadsWholeInput() throws IgniteCheckedException {
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
+
+ int inBufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+ int[] sizes = {inBufSize / 2, inBufSize, inBufSize + 1, inBufSize * 2};
+
+ for (int size : sizes) {
+ log.info("Check: size=" + size);
+
+ AtomicReference<Iterator<Object[]>> itRef = new AtomicReference<>();
+
+ ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, new Iterable<Object[]>() {
+ @NotNull @Override public Iterator<Object[]> iterator() {
+ if (itRef.get() != null)
+ throw new AssertionError();
+
+ itRef.set(IntStream.range(0, size).boxed().map(i -> new Object[]{i}).iterator());
+
+ return itRef.get();
+ }
+ });
+
+ TableSpoolNode<Object[]> spool = new TableSpoolNode<>(ctx, rowType, false);
+
+ spool.register(singletonList(scan));
+
+ RootNode<Object[]> root = new RootNode<>(ctx, rowType);
+ root.register(spool);
+
+ assertTrue(root.hasNext());
+
+ root.next();
+
+ assertFalse(itRef.get().hasNext());
+ }
+ }
+
+ /** */
+ public void checkTableSpool(BiFunction<ExecutionContext<Object[]>, RelDataType, TableSpoolNode<Object[]>> spoolFactory) throws Exception {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
@@ -76,9 +141,9 @@ public class TableSpoolExecutionTest extends AbstractExecutionTest {
}
});
- TableSpoolNode<Object[]> spool = new TableSpoolNode<>(ctx, rowType);
+ TableSpoolNode<Object[]> spool = spoolFactory.apply(ctx, rowType);
- spool.register(Arrays.asList(right));
+ spool.register(singletonList(right));
RootRewindable<Object[]> root = new RootRewindable<>(ctx, rowType);
root.register(spool);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
similarity index 94%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 29f7786..94fe7a7 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -24,6 +24,8 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java
similarity index 98%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java
index f55d5d8..d69e35d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
import org.junit.Test;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CalciteErrorHandlilngIntegrationTest.java
similarity index 99%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CalciteErrorHandlilngIntegrationTest.java
index bbe1592..c1a9614 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CalciteErrorHandlilngIntegrationTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexSpoolIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexSpoolIntegrationTest.java
similarity index 98%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexSpoolIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexSpoolIntegrationTest.java
index a040acc..61360d2 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexSpoolIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexSpoolIntegrationTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.ArrayList;
import java.util.Collection;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/MetadataIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MetadataIntegrationTest.java
similarity index 97%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/MetadataIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MetadataIntegrationTest.java
index 3cbf275..51d8bac 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/MetadataIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MetadataIntegrationTest.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
import org.junit.Test;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/SortAggregateIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SortAggregateIntegrationTest.java
similarity index 98%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/SortAggregateIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SortAggregateIntegrationTest.java
index 718b2e2..1de1a81 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/SortAggregateIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SortAggregateIntegrationTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.Arrays;
import java.util.Collections;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/TableDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
similarity index 98%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/TableDdlIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
index 67c1e2e..67b34e9 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/TableDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -38,6 +38,7 @@ import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.GridTestUtils;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDmlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDmlIntegrationTest.java
new file mode 100644
index 0000000..3091dba
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDmlIntegrationTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/** */
+public class TableDmlIntegrationTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CLIENT_NODE_NAME = "client";
+
+ /** */
+ private IgniteEx client;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(2);
+
+ client = startClientGrid(CLIENT_NODE_NAME);
+ }
+
+ /** */
+ @Before
+ public void init() {
+ client = grid(CLIENT_NODE_NAME);
+ }
+
+ /** */
+ @After
+ public void cleanUp() {
+ client.destroyCaches(client.cacheNames());
+ }
+
+ /**
+ * Test verifies that already inserted by the current query data
+ * is not processed by this query again.
+ */
+ @Test
+ public void testInsertAsSelect() {
+ executeSql("CREATE TABLE test (epoch_cur int, epoch_copied int)");
+ executeSql("INSERT INTO test VALUES (0, 0)");
+
+ final String insertAsSelectSql = "INSERT INTO test SELECT ?, epoch_cur FROM test";
+
+ for (int i = 1; i < 16; i++) {
+ executeSql(insertAsSelectSql, i);
+
+ List<List<?>> rows = executeSql("SELECT * FROM test WHERE epoch_copied = ?", i);
+
+ assertEquals("Unexpected rows for epoch " + i, 0, rows.size());
+ }
+ }
+
+ /**
+ * Test verifies that cuncurrent updates does not affect (in terms of its size)
+ * a result set provided for insertion.
+ */
+ @Test
+ public void testInsertAsSelectWithConcurrentDataModification() throws IgniteCheckedException {
+ executeSql("CREATE TABLE test (id int primary key, val int) with cache_name=\"test\", value_type=\"my_type\"");
+ IgniteCache<Integer, Object> cache = grid(0).cache("test").withKeepBinary();
+
+ BinaryObjectBuilder builder = grid(0).binary().builder("my_type");
+
+ for (int i = 0; i < 128; i++)
+ cache.put(i, builder.setField("val", i).build());
+
+ AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ while (!stop.get())
+ cache.put(ThreadLocalRandom.current().nextInt(128), builder.setField("val", 0).build());
+ });
+
+ for (int i = 8; i < 18; i++) {
+ int off = 1 << (i - 1);
+
+ executeSql("INSERT INTO test SELECT id + ?::INT, val FROM test", off);
+
+ long cnt = (Long)executeSql("SELECT count(*) FROM test").get(0).get(0);
+
+ assertEquals("Unexpected rows count", 1 << i, cnt);
+ }
+
+ stop.set(true);
+ fut.get(getTestTimeout());
+ }
+
+ /**
+ * Ensure that update node updates each row only once.
+ */
+ @Test
+ public void testUpdate() {
+ executeSql("CREATE TABLE test (val integer)");
+
+ client.context().query().querySqlFields(
+ new SqlFieldsQuery("CREATE INDEX test_val_idx ON test (val)").setSchema("PUBLIC"), false).getAll();
+
+ for (int i = 1; i <= 4096; i++)
+ executeSql("INSERT INTO test VALUES (?)", i);
+
+ final String updateSql = "UPDATE test SET val = val * 10";
+
+ int mul = 1;
+ for (int i = 1; i < 5; i++) {
+ mul *= 10;
+ executeSql(updateSql);
+ }
+
+ final int fMul = mul;
+
+ List<List<?>> rows = executeSql("SELECT val FROM test ORDER BY val");
+
+ List<Integer> vals = rows.stream().map(r -> (Integer)r.get(0)).collect(Collectors.toList());
+
+ for (int rowNum = 1; rowNum <= rows.size(); rowNum++) {
+ assertEquals(
+ "Unexpected results: " + S.compact(vals, i -> i + fMul),
+ rowNum * fMul,
+ rows.get(rowNum - 1).get(0)
+ );
+ }
+ }
+
+ /** */
+ @Test
+ public void testInsertPrimitiveKey() {
+ grid(1).getOrCreateCache(new CacheConfiguration<Integer, CalciteQueryProcessorTest.Developer>()
+ .setName("developer")
+ .setSqlSchema("PUBLIC")
+ .setIndexedTypes(Integer.class, CalciteQueryProcessorTest.Developer.class)
+ .setBackups(2)
+ );
+
+ QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+
+ List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
+ "INSERT INTO DEVELOPER(_key, name, projectId) VALUES (?, ?, ?)", 0, "Igor", 1);
+
+ assertEquals(1, query.size());
+
+ List<List<?>> rows = query.get(0).getAll();
+
+ assertEquals(1, rows.size());
+
+ List<?> row = rows.get(0);
+
+ assertNotNull(row);
+
+ assertEqualsCollections(F.asList(1L), row);
+
+ query = engine.query(null, "PUBLIC", "select _key, * from DEVELOPER");
+
+ assertEquals(1, query.size());
+
+ row = F.first(query.get(0).getAll());
+
+ assertNotNull(row);
+
+ assertEqualsCollections(Arrays.asList(0, "Igor", 1), row);
+ }
+
+ /** */
+ @Test
+ public void testInsertUpdateDeleteNonPrimitiveKey() throws Exception {
+ client.getOrCreateCache(new CacheConfiguration<CalciteQueryProcessorTest.Key, CalciteQueryProcessorTest.Developer>()
+ .setName("developer")
+ .setSqlSchema("PUBLIC")
+ .setIndexedTypes(CalciteQueryProcessorTest.Key.class, CalciteQueryProcessorTest.Developer.class)
+ .setBackups(2)
+ );
+
+ awaitPartitionMapExchange(true, true, null);
+
+ QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+
+ List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", "INSERT INTO DEVELOPER VALUES (?, ?, ?, ?)", 0, 0, "Igor", 1);
+
+ assertEquals(1, query.size());
+
+ List<?> row = F.first(query.get(0).getAll());
+
+ assertNotNull(row);
+
+ assertEqualsCollections(F.asList(1L), row);
+
+ query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
+
+ assertEquals(1, query.size());
+
+ row = F.first(query.get(0).getAll());
+
+ assertNotNull(row);
+
+ assertEqualsCollections(F.asList(0, 0, "Igor", 1), row);
+
+ query = engine.query(null, "PUBLIC", "UPDATE DEVELOPER d SET name = 'Roman' WHERE id = ?", 0);
+
+ assertEquals(1, query.size());
+
+ row = F.first(query.get(0).getAll());
+
+ assertNotNull(row);
+
+ assertEqualsCollections(F.asList(1L), row);
+
+ query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
+
+ assertEquals(1, query.size());
+
+ row = F.first(query.get(0).getAll());
+
+ assertNotNull(row);
+
+ assertEqualsCollections(F.asList(0, 0, "Roman", 1), row);
+
+ query = engine.query(null, "PUBLIC", "DELETE FROM DEVELOPER WHERE id = ?", 0);
+
+ assertEquals(1, query.size());
+
+ row = F.first(query.get(0).getAll());
+
+ assertNotNull(row);
+
+ assertEqualsCollections(F.asList(1L), row);
+
+ query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
+
+ assertEquals(1, query.size());
+
+ row = F.first(query.get(0).getAll());
+
+ assertNull(row);
+ }
+
+ /** */
+ private List<List<?>> executeSql(String sql, Object... args) {
+ List<FieldsQueryCursor<List<?>>> cur = queryProcessor().query(null, "PUBLIC", sql, args);
+
+ try (QueryCursor<List<?>> srvCursor = cur.get(0)) {
+ return srvCursor.getAll();
+ }
+ }
+
+ /** */
+ private CalciteQueryProcessor queryProcessor() {
+ return Commons.lookupComponent(client.context(), CalciteQueryProcessor.class);
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index f8fa2f5..5fa68ee 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -24,8 +24,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableSet;
@@ -46,23 +48,36 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelReferentialConstraint;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeImpl;
import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ColumnStrategy;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql2rel.InitializerContext;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.PlannerHelper;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
@@ -71,26 +86,26 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGr
import org.apache.ignite.internal.processors.query.calcite.prepare.Cloner;
import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptor;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
-import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
@@ -112,6 +127,12 @@ import static org.apache.ignite.internal.processors.query.calcite.externalize.Re
@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
/** */
+ protected static final IgniteTypeFactory TYPE_FACTORY = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+ /** */
+ protected static final int DEFAULT_TBL_SIZE = 500_000;
+
+ /** */
protected List<UUID> nodes;
/** */
@@ -235,11 +256,11 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
.topologyVersion(AffinityTopologyVersion.NONE)
.build();
- RelRoot relRoot;
-
try (IgnitePlanner planner = ctx.planner()) {
assertNotNull(planner);
+ planner.setDisabledRules(ImmutableSet.copyOf(disabledRules));
+
String qry = ctx.query();
assertNotNull(qry);
@@ -250,29 +271,12 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
// Validate
sqlNode = planner.validate(sqlNode);
- // Convert to Relational operators graph
- relRoot = planner.rel(sqlNode);
-
- RelNode rel = relRoot.rel;
-
- assertNotNull(rel);
-
- // Transformation chain
- RelTraitSet desired = rel.getTraitSet()
- .replace(IgniteConvention.INSTANCE)
- .replace(IgniteDistributions.single())
- .replace(CorrelationTrait.UNCORRELATED)
- .replace(RewindabilityTrait.ONE_WAY)
- .simplify();
-
- planner.setDisabledRules(ImmutableSet.copyOf(disabledRules));
-
try {
- IgniteRel res = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+ IgniteRel rel = PlannerHelper.optimize(sqlNode, planner, log);
-// System.out.println(planner.dump());
+// System.out.println(RelOptUtil.toString(rel));
- return res;
+ return rel;
}
catch (Throwable ex) {
System.err.println(planner.dump());
@@ -561,6 +565,62 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
};
}
+ /**
+ * Creates test table with given params.
+ *
+ * @param name Name of the table.
+ * @param distr Distribution of the table.
+ * @param fields List of the required fields. Every odd item should be a string
+ * representing a column name, every even item should be a class representing column's type.
+ * E.g. {@code createTable("MY_TABLE", distribution, "ID", Integer.class, "VAL", String.class)}.
+ * @return Instance of the {@link TestTable}.
+ */
+ protected static TestTable createTable(String name, IgniteDistribution distr, Object... fields) {
+ return createTable(name, DEFAULT_TBL_SIZE, distr, fields);
+ }
+
+ /**
+ * Creates test table with given params.
+ *
+ * @param name Name of the table.
+ * @param size Required size of the table.
+ * @param distr Distribution of the table.
+ * @param fields List of the required fields. Every odd item should be a string
+ * representing a column name, every even item should be a class representing column's type.
+ * E.g. {@code createTable("MY_TABLE", 500, distribution, "ID", Integer.class, "VAL", String.class)}.
+ * @return Instance of the {@link TestTable}.
+ */
+ protected static TestTable createTable(String name, int size, IgniteDistribution distr, Object... fields) {
+ if (F.isEmpty(fields) || fields.length % 2 != 0)
+ throw new IllegalArgumentException("'fields' should be non-null array with even number of elements");
+
+ RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(TYPE_FACTORY);
+
+ for (int i = 0; i < fields.length; i += 2)
+ b.add((String)fields[i], TYPE_FACTORY.createJavaType((Class<?>)fields[i + 1]));
+
+ return new TestTable(name, b.build(), RewindabilityTrait.REWINDABLE, size) {
+ @Override public IgniteDistribution distribution() {
+ return distr;
+ }
+ };
+ }
+
+ /**
+ * Creates public schema from provided tables.
+ *
+ * @param tbls Tables to create schema for.
+ * @return Public schema.
+ */
+ protected static IgniteSchema createSchema(TestTable... tbls) {
+ IgniteSchema schema = new IgniteSchema("PUBLIC");
+
+ for (TestTable tbl : tbls)
+ schema.addTable(tbl.name(), tbl);
+
+ return schema;
+ }
+
/** */
abstract static class TestTable implements IgniteTable {
/** */
@@ -579,6 +639,9 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
private final double rowCnt;
/** */
+ private final TableDescriptor desc;
+
+ /** */
TestTable(RelDataType type) {
this(type, RewindabilityTrait.REWINDABLE);
}
@@ -599,6 +662,8 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
this.rewindable = rewindable;
this.rowCnt = rowCnt;
this.name = name;
+
+ desc = new TestTableDescriptor(this::distribution, type);
}
/** {@inheritDoc} */
@@ -711,7 +776,7 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public TableDescriptor descriptor() {
- throw new AssertionError();
+ return desc;
}
/** {@inheritDoc} */
@@ -748,6 +813,171 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
}
/** */
+ static class TestTableDescriptor implements TableDescriptor {
+ /** */
+ private final Supplier<IgniteDistribution> distributionSupp;
+
+ /** */
+ private final RelDataType rowType;
+
+ /** */
+ public TestTableDescriptor(Supplier<IgniteDistribution> distribution, RelDataType rowType) {
+ this.distributionSupp = distribution;
+ this.rowType = rowType;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheContextInfo cacheInfo() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheContext cacheContext() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteDistribution distribution() {
+ return distributionSupp.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDataType rowType(IgniteTypeFactory factory, ImmutableBitSet usedColumns) {
+ return rowType;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isUpdateAllowed(RelOptTable tbl, int colIdx) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean match(CacheDataRow row) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <Row> Row toRow(ExecutionContext<Row> ectx, CacheDataRow row, RowHandler.RowFactory<Row> factory,
+ @Nullable ImmutableBitSet requiredColunms) throws IgniteCheckedException {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <Row> IgniteBiTuple toTuple(ExecutionContext<Row> ectx, Row row, TableModify.Operation op,
+ @Nullable Object arg) throws IgniteCheckedException {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ColumnDescriptor columnDescriptor(String fieldName) {
+ RelDataTypeField field = rowType.getField(fieldName, false, false);
+ return new TestColumnDescriptor(field.getIndex(), fieldName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridQueryTypeDescriptor typeDescription() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isGeneratedAlways(RelOptTable table, int iColumn) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ColumnStrategy generationStrategy(RelOptTable table, int iColumn) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RexNode newColumnDefaultValue(RelOptTable table, int iColumn, InitializerContext context) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public BiFunction<InitializerContext, RelNode, RelNode> postExpressionConversionHook() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RexNode newAttributeInitializer(RelDataType type, SqlFunction constructor, int iAttribute,
+ List<RexNode> constructorArgs, InitializerContext context) {
+ throw new AssertionError();
+ }
+ }
+
+ /** */
+ static class TestColumnDescriptor implements ColumnDescriptor {
+ /** */
+ private final int idx;
+
+ /** */
+ private final String name;
+
+ /** */
+ public TestColumnDescriptor(int idx, String name) {
+ this.idx = idx;
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean field() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean key() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasDefaultValue() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int fieldIndex() {
+ return idx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelDataType logicalType(IgniteTypeFactory f) {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Class<?> storageType() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object value(ExecutionContext<?> ectx, GridCacheContext<?, ?> cctx,
+ CacheDataRow src) throws IgniteCheckedException {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object defaultValue() {
+ throw new AssertionError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void set(Object dst, Object val) throws IgniteCheckedException {
+ throw new AssertionError();
+ }
+ }
+
+ /** */
static class TestMessageServiceImpl extends MessageServiceImpl {
/** */
private final TestIoManager mgr;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java
index d833245..cf01a88 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
@@ -30,12 +29,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.junit.Test;
@@ -51,12 +45,6 @@ import static org.junit.Assert.assertThat;
* Test suite to verify join colocation.
*/
public class JoinColocationPlannerTest extends AbstractPlannerTest {
- /** */
- private static final IgniteTypeFactory TYPE_FACTORY = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
- /** */
- private static final int DEFAULT_TBL_SIZE = 500_000;
-
/**
* Join of the same tables with a simple affinity is expected to be colocated.
*/
@@ -213,60 +201,4 @@ public class JoinColocationPlannerTest extends AbstractPlannerTest {
assertThat(invalidPlanMsg, exchange, nullValue());
}
-
- /**
- * Creates test table with given params.
- *
- * @param name Name of the table.
- * @param distr Distribution of the table.
- * @param fields List of the required fields. Every odd item should be a string
- * representing a column name, every even item should be a class representing column's type.
- * E.g. {@code createTable("MY_TABLE", distribution, "ID", Integer.class, "VAL", String.class)}.
- * @return Instance of the {@link TestTable}.
- */
- private static TestTable createTable(String name, IgniteDistribution distr, Object... fields) {
- return createTable(name, DEFAULT_TBL_SIZE, distr, fields);
- }
-
- /**
- * Creates test table with given params.
- *
- * @param name Name of the table.
- * @param size Required size of the table.
- * @param distr Distribution of the table.
- * @param fields List of the required fields. Every odd item should be a string
- * representing a column name, every even item should be a class representing column's type.
- * E.g. {@code createTable("MY_TABLE", 500, distribution, "ID", Integer.class, "VAL", String.class)}.
- * @return Instance of the {@link TestTable}.
- */
- private static TestTable createTable(String name, int size, IgniteDistribution distr, Object... fields) {
- if (F.isEmpty(fields) || fields.length % 2 != 0)
- throw new IllegalArgumentException("'fields' should be non-null array with even number of elements");
-
- RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(TYPE_FACTORY);
-
- for (int i = 0; i < fields.length; i += 2)
- b.add((String)fields[i], TYPE_FACTORY.createJavaType((Class<?>)fields[i + 1]));
-
- return new TestTable(name, b.build(), RewindabilityTrait.REWINDABLE, size) {
- @Override public IgniteDistribution distribution() {
- return distr;
- }
- };
- }
-
- /**
- * Creates public schema from provided tables.
- *
- * @param tbls Tables to create schema for.
- * @return Public schema.
- */
- private static IgniteSchema createSchema(TestTable... tbls) {
- IgniteSchema schema = new IgniteSchema("PUBLIC");
-
- for (TestTable tbl : tbls)
- schema.addTable(tbl.name(), tbl);
-
- return schema;
- }
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java
new file mode 100644
index 0000000..5cd634d
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Spool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Table spool test.
+ */
+@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
+public class TableDmlPlannerTest extends AbstractPlannerTest {
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void insertCachesTableScan() throws Exception {
+ IgniteSchema schema = createSchema(
+ createTable("TEST", IgniteDistributions.random(), "VAL", Integer.class)
+ );
+
+ String sql = "insert into test select 2 * val from test";
+
+ RelNode phys = physicalPlan(sql, schema, "LogicalIndexScanConverterRule");
+
+ assertNotNull(phys);
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ IgniteTableModify modifyNode = findFirstNode(phys, byClass(IgniteTableModify.class));
+
+ assertThat(invalidPlanMsg, modifyNode, notNullValue());
+ assertThat(invalidPlanMsg, modifyNode.getInput(), instanceOf(Spool.class));
+
+ Spool spool = (Spool)modifyNode.getInput();
+
+ assertThat(invalidPlanMsg, spool.readType, equalTo(Spool.Type.EAGER));
+ assertThat(invalidPlanMsg, findFirstNode(phys, byClass(IgniteTableScan.class)), notNullValue());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void insertCachesIndexScan() throws Exception {
+ TestTable tbl = createTable("TEST", IgniteDistributions.random(), "VAL", Integer.class);
+
+ tbl.addIndex(new IgniteIndex(RelCollations.of(0), "IDX", null, tbl));
+
+ IgniteSchema schema = createSchema(tbl);
+
+ String sql = "insert into test select 2 * val from test";
+
+ RelNode phys = physicalPlan(sql, schema, "LogicalTableScanConverterRule");
+
+ assertNotNull(phys);
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ IgniteTableModify modifyNode = findFirstNode(phys, byClass(IgniteTableModify.class));
+
+ assertThat(invalidPlanMsg, modifyNode, notNullValue());
+ assertThat(invalidPlanMsg, modifyNode.getInput(), instanceOf(Spool.class));
+
+ Spool spool = (Spool)modifyNode.getInput();
+
+ assertThat(invalidPlanMsg, spool.readType, equalTo(Spool.Type.EAGER));
+ assertThat(invalidPlanMsg, findFirstNode(phys, byClass(IgniteIndexScan.class)), notNullValue());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void updateNotCachesTableScan() throws Exception {
+ IgniteSchema schema = createSchema(
+ createTable("TEST", IgniteDistributions.random(), "VAL", Integer.class)
+ );
+
+ String sql = "update test set val = 2 * val";
+
+ RelNode phys = physicalPlan(sql, schema, "LogicalIndexScanConverterRule");
+
+ assertNotNull(phys);
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ assertThat(invalidPlanMsg, findFirstNode(phys, byClass(Spool.class)), nullValue());
+ assertThat(invalidPlanMsg, findFirstNode(phys, byClass(IgniteTableScan.class)), notNullValue());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void updateNotCachesNonDependentIndexScan() throws Exception {
+ TestTable tbl = createTable("TEST", IgniteDistributions.random(), "VAL", Integer.class, "IDX_VAL", Integer.class);
+
+ tbl.addIndex(new IgniteIndex(RelCollations.of(1), "IDX", null, tbl));
+
+ IgniteSchema schema = createSchema(tbl);
+
+ String sql = "update test set val = 2 * val where idx_val between 2 and 10";
+
+ RelNode phys = physicalPlan(sql, schema, "LogicalTableScanConverterRule");
+
+ assertNotNull(phys);
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ assertThat(invalidPlanMsg, findFirstNode(phys, byClass(Spool.class)), nullValue());
+ assertThat(invalidPlanMsg, findFirstNode(phys, byClass(IgniteIndexScan.class)), notNullValue());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void updateCachesDependentIndexScan() throws Exception {
+ TestTable tbl = createTable("TEST", IgniteDistributions.random(), "VAL", Integer.class);
+
+ tbl.addIndex(new IgniteIndex(RelCollations.of(0), "IDX", null, tbl));
+
+ IgniteSchema schema = createSchema(tbl);
+
+ String sql = "update test set val = 2 * val where val between 2 and 10";
+
+ RelNode phys = physicalPlan(sql, schema, "LogicalTableScanConverterRule");
+
+ assertNotNull(phys);
+
+ String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+ IgniteTableModify modifyNode = findFirstNode(phys, byClass(IgniteTableModify.class));
+
+ assertThat(invalidPlanMsg, modifyNode, notNullValue());
+ assertThat(invalidPlanMsg, modifyNode.getInput(), instanceOf(Spool.class));
+
+ Spool spool = (Spool)modifyNode.getInput();
+
+ assertThat(invalidPlanMsg, spool.readType, equalTo(Spool.Type.EAGER));
+ assertThat(invalidPlanMsg, findFirstNode(phys, byClass(IgniteIndexScan.class)), notNullValue());
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 93b2d3e..9880404 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -17,25 +17,9 @@
package org.apache.ignite.testsuites;
-import org.apache.ignite.internal.processors.query.calcite.AggregatesIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondaryIndexIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.CalciteErrorHandlilngIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
-import org.apache.ignite.internal.processors.query.calcite.CancelTest;
-import org.apache.ignite.internal.processors.query.calcite.DataTypesTest;
-import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
-import org.apache.ignite.internal.processors.query.calcite.FunctionsTest;
-import org.apache.ignite.internal.processors.query.calcite.LimitOffsetTest;
-import org.apache.ignite.internal.processors.query.calcite.MetadataIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
-import org.apache.ignite.internal.processors.query.calcite.SortAggregateIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest;
-import org.apache.ignite.internal.processors.query.calcite.TableDdlIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
-import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest;
-import org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest;
-import org.apache.ignite.internal.processors.query.calcite.rules.ProjectScanMergeRuleTest;
import org.apache.ignite.internal.processors.query.calcite.sql.SqlDdlParserTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -47,27 +31,12 @@ import org.junit.runners.Suite;
@Suite.SuiteClasses({
PlannerTestSuite.class,
ExecutionTestSuite.class,
- OrToUnionRuleTest.class,
- ProjectScanMergeRuleTest.class,
+ IntegrationTestSuite.class,
+
ClosableIteratorsHolderTest.class,
ContinuousExecutionTest.class,
- CalciteQueryProcessorTest.class,
- CalciteErrorHandlilngIntegrationTest.class,
- JdbcQueryTest.class,
- CalciteBasicSecondaryIndexIntegrationTest.class,
- CancelTest.class,
QueryCheckerTest.class,
- DateTimeTest.class,
- DataTypesTest.class,
- LimitOffsetTest.class,
- SqlFieldsQueryUsageTest.class,
- AggregatesIntegrationTest.class,
- MetadataIntegrationTest.class,
- SortAggregateIntegrationTest.class,
-
SqlDdlParserTest.class,
- TableDdlIntegrationTest.class,
- FunctionsTest.class,
})
public class IgniteCalciteTestSuite {
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
similarity index 70%
copy from modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
copy to modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 93b2d3e..90c39e6 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -17,26 +17,23 @@
package org.apache.ignite.testsuites;
-import org.apache.ignite.internal.processors.query.calcite.AggregatesIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondaryIndexIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.CalciteErrorHandlilngIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
import org.apache.ignite.internal.processors.query.calcite.CancelTest;
import org.apache.ignite.internal.processors.query.calcite.DataTypesTest;
import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
import org.apache.ignite.internal.processors.query.calcite.FunctionsTest;
import org.apache.ignite.internal.processors.query.calcite.LimitOffsetTest;
-import org.apache.ignite.internal.processors.query.calcite.MetadataIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
-import org.apache.ignite.internal.processors.query.calcite.SortAggregateIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest;
-import org.apache.ignite.internal.processors.query.calcite.TableDdlIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.AggregatesIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.CalciteErrorHandlilngIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.SortAggregateIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.TableDdlIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.TableDmlIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest;
import org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest;
import org.apache.ignite.internal.processors.query.calcite.rules.ProjectScanMergeRuleTest;
-import org.apache.ignite.internal.processors.query.calcite.sql.SqlDdlParserTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -45,29 +42,23 @@ import org.junit.runners.Suite;
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
- PlannerTestSuite.class,
- ExecutionTestSuite.class,
OrToUnionRuleTest.class,
ProjectScanMergeRuleTest.class,
- ClosableIteratorsHolderTest.class,
- ContinuousExecutionTest.class,
CalciteQueryProcessorTest.class,
CalciteErrorHandlilngIntegrationTest.class,
JdbcQueryTest.class,
CalciteBasicSecondaryIndexIntegrationTest.class,
CancelTest.class,
- QueryCheckerTest.class,
DateTimeTest.class,
- DataTypesTest.class,
LimitOffsetTest.class,
SqlFieldsQueryUsageTest.class,
AggregatesIntegrationTest.class,
MetadataIntegrationTest.class,
SortAggregateIntegrationTest.class,
-
- SqlDdlParserTest.class,
TableDdlIntegrationTest.class,
FunctionsTest.class,
+ TableDmlIntegrationTest.class,
+ DataTypesTest.class,
})
-public class IgniteCalciteTestSuite {
+public class IntegrationTestSuite {
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index 6c8ac6e..47136f3 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocatio
import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.SortedIndexSpoolPlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.TableDmlPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.TableFunctionTest;
import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
import org.junit.runner.RunWith;
@@ -49,6 +50,7 @@ import org.junit.runners.Suite;
JoinColocationPlannerTest.class,
ExceptPlannerTest.class,
TableFunctionTest.class,
+ TableDmlPlannerTest.class,
})
public class PlannerTestSuite {
}
diff --git a/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow b/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow
index 58cef35..791e66c 100644
--- a/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow
+++ b/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow
@@ -126,3 +126,27 @@ SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FR
----
2048 2048 10000 20480000.000000
+statement ok
+INSERT INTO bigtable SELECT * FROM bigtable
+
+query IIIR
+SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
+----
+4096 4096 10000 40960000.000000
+
+statement ok
+INSERT INTO bigtable SELECT * FROM bigtable
+
+query IIIR
+SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
+----
+8192 8192 10000 81920000.000000
+
+statement ok
+INSERT INTO bigtable SELECT * FROM bigtable
+
+query IIIR
+SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
+----
+16384 16384 10000 163840000.000000
+
diff --git a/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow_ignored b/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow_ignored
deleted file mode 100644
index 9cfeba6..0000000
--- a/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow_ignored
+++ /dev/null
@@ -1,153 +0,0 @@
-# name: test/sql/types/string/test_scan_big_varchar.test_slow
-# description: Test scanning many big varchar strings with limited memory
-# group: [string]
-# Ignore: https://issues.apache.org/jira/browse/IGNITE-14553
-
-statement error
-PRAGMA memory_limit=100000000
-
-statement ok
-CREATE TABLE test (a VARCHAR);
-
-# create a big varchar (10K characters)
-statement ok
-INSERT INTO test VALUES ('aaaaaaaaaa')
-
-# sizes: 10, 100, 1000, 10000
-statement ok
-INSERT INTO test SELECT a||a||a||a||a||a||a||a||a||a FROM test WHERE CHARACTER_LENGTH(a)=(SELECT MAX(CHARACTER_LENGTH(a)) FROM test)
-
-statement ok
-INSERT INTO test SELECT a||a||a||a||a||a||a||a||a||a FROM test WHERE CHARACTER_LENGTH(a)=(SELECT MAX(CHARACTER_LENGTH(a)) FROM test)
-
-statement ok
-INSERT INTO test SELECT a||a||a||a||a||a||a||a||a||a FROM test WHERE CHARACTER_LENGTH(a)=(SELECT MAX(CHARACTER_LENGTH(a)) FROM test)
-
-# now create a second table, we only insert the big varchar string in there
-statement ok
-CREATE TABLE bigtable (a VARCHAR);
-
-statement ok
-INSERT INTO bigtable SELECT a FROM test WHERE CHARACTER_LENGTH(a)=(SELECT MAX(CHARACTER_LENGTH(a)) FROM test)
-
-# verify that the append worked
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-1 1 10000 10000.000000
-
-# we create a total of 16K entries in the big table
-# the total size of this table is 16K*10K = 160MB
-# we then scan the table at every step, as our buffer pool is limited to 100MB not all strings fit in memory
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-2 2 10000 20000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-4 4 10000 40000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-8 8 10000 80000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-16 16 10000 160000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-32 32 10000 320000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-64 64 10000 640000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-128 128 10000 1280000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-256 256 10000 2560000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-512 512 10000 5120000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-1024 1024 10000 10240000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-2048 2048 10000 20480000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable
-----
-4096 4096 10000 40960000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable
-----
-8192 8192 10000 81920000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable
-----
-16384 16384 10000 163840000.000000
-
diff --git a/parent/pom.xml b/parent/pom.xml
index 518c2d5..90de634 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -983,6 +983,8 @@
<exclude>modules/platforms/python/requirements/**/*.txt</exclude><!--plain text can not be commented-->
<!--Packaging -->
<exclude>packaging/**</exclude>
+ <!-- Calcite test scripts -->
+ <exclude>src/test/sql/**</exclude>
<!-- Ignite Documentation-->
<exclude>docs/_site/**</exclude>
<exclude>docs/assets/images/**</exclude>