You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/05/20 08:43:15 UTC

[ignite] branch sql-calcite updated: IGNITE-14553 Calcite engine. Fix duplicated result on insert

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new c5286a6  IGNITE-14553 Calcite engine. Fix duplicated result on insert
c5286a6 is described below

commit c5286a695c9e6b9e74d9caa1ffd640debbedb3ef
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Thu May 20 11:28:33 2021 +0300

    IGNITE-14553 Calcite engine. Fix duplicated result on insert
---
 .../query/calcite/exec/ExecutionServiceImpl.java   |  64 +----
 .../query/calcite/exec/LogicalRelImplementor.java  |   3 +-
 .../query/calcite/exec/PlannerHelper.java          | 230 ++++++++++++++++
 .../query/calcite/exec/ddl/DdlCommandHandler.java  |   7 +-
 .../query/calcite/exec/rel/TableSpoolNode.java     |  50 ++--
 .../query/calcite/prepare/IgnitePlanner.java       |   8 +-
 .../query/calcite/prepare/IgniteSqlValidator.java  |  11 +-
 .../prepare/ddl/DdlSqlToCommandConverter.java      |   2 +-
 .../query/calcite/rel/IgniteTableModify.java       |  34 +--
 .../query/calcite/rel/IgniteTableSpool.java        |  17 +-
 .../query/calcite/schema/TableDescriptor.java      |   4 +-
 .../query/calcite/schema/TableDescriptorImpl.java  |   5 +
 .../processors/query/calcite/trait/TraitUtils.java |   3 +-
 .../processors/query/calcite/util/HintUtils.java   |  11 +-
 .../query/calcite/CalciteQueryProcessorTest.java   | 111 --------
 .../calcite/exec/rel/TableSpoolExecutionTest.java  |  73 +++++-
 .../AbstractBasicIntegrationTest.java              |   4 +-
 .../AggregatesIntegrationTest.java                 |   2 +-
 .../CalciteErrorHandlilngIntegrationTest.java      |   2 +-
 .../IndexSpoolIntegrationTest.java                 |   2 +-
 .../{ => integration}/MetadataIntegrationTest.java |   2 +-
 .../SortAggregateIntegrationTest.java              |   2 +-
 .../{ => integration}/TableDdlIntegrationTest.java |   3 +-
 .../integration/TableDmlIntegrationTest.java       | 288 +++++++++++++++++++++
 .../query/calcite/planner/AbstractPlannerTest.java | 284 ++++++++++++++++++--
 .../calcite/planner/JoinColocationPlannerTest.java |  68 -----
 .../query/calcite/planner/TableDmlPlannerTest.java | 174 +++++++++++++
 .../ignite/testsuites/IgniteCalciteTestSuite.java  |  35 +--
 ...iteTestSuite.java => IntegrationTestSuite.java} |  27 +-
 .../apache/ignite/testsuites/PlannerTestSuite.java |   2 +
 .../types/string/test_scan_big_varchar.test_slow   |  24 ++
 .../string/test_scan_big_varchar.test_slow_ignored | 153 -----------
 parent/pom.xml                                     |   2 +
 33 files changed, 1169 insertions(+), 538 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index ab3bb4e..7420ed6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -34,15 +34,8 @@ import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlDdl;
 import org.apache.calcite.sql.SqlExplain;
 import org.apache.calcite.sql.SqlExplainLevel;
@@ -52,7 +45,6 @@ import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.ValidationException;
-import org.apache.calcite.util.Pair;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
@@ -98,7 +90,6 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner
 import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepDmlPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepQueryPlan;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
@@ -106,18 +97,14 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.QueryTemplate
 import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.prepare.ValidationResult;
 import org.apache.ignite.internal.processors.query.calcite.prepare.ddl.DdlSqlToCommandConverter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
 import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.query.calcite.util.HintUtils;
 import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
@@ -131,6 +118,7 @@ import org.jetbrains.annotations.Nullable;
 import static java.util.Collections.singletonList;
 import static org.apache.calcite.rel.type.RelDataType.PRECISION_NOT_SPECIFIED;
 import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+import static org.apache.ignite.internal.processors.query.calcite.exec.PlannerHelper.optimize;
 import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader.fromJson;
 
 /**
@@ -594,7 +582,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
 
         sqlNode = validated.sqlNode();
 
-        IgniteRel igniteRel = optimize(sqlNode, planner);
+        IgniteRel igniteRel = optimize(sqlNode, planner, log);
 
         // Split query plan to query fragments.
         List<Fragment> fragments = new Splitter().go(igniteRel);
@@ -612,7 +600,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
         sqlNode = planner.validate(sqlNode);
 
         // Convert to Relational operators graph
-        IgniteRel igniteRel = optimize(sqlNode, planner);
+        IgniteRel igniteRel = optimize(sqlNode, planner, log);
 
         // Split query plan to query fragments.
         List<Fragment> fragments = new Splitter().go(igniteRel);
@@ -632,58 +620,16 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
     }
 
     /** */
-    private IgniteRel optimize(SqlNode sqlNode, IgnitePlanner planner) {
-        try {
-            // Convert to Relational operators graph
-            RelRoot root = planner.rel(sqlNode);
-
-            RelNode rel = root.rel;
-
-            if (rel instanceof Hintable)
-                planner.setDisabledRules(HintUtils.disabledRules((Hintable)rel));
-
-            // Transformation chain
-            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
-
-            RelTraitSet desired = rel.getCluster().traitSet()
-                .replace(IgniteConvention.INSTANCE)
-                .replace(IgniteDistributions.single())
-                .replace(root.collation == null ? RelCollations.EMPTY : root.collation)
-                .simplify();
-
-            IgniteRel igniteRel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
-
-            if (!root.isRefTrivial()) {
-                final List<RexNode> projects = new ArrayList<>();
-                final RexBuilder rexBuilder = igniteRel.getCluster().getRexBuilder();
-
-                for (int field : Pair.left(root.fields))
-                    projects.add(rexBuilder.makeInputRef(igniteRel, field));
-
-                igniteRel = new IgniteProject(igniteRel.getCluster(), desired, igniteRel, projects, root.validatedRowType);
-            }
-
-            return igniteRel;
-        }
-        catch (Throwable ex) {
-            log.error("Unexpected error at query optimizer.", ex);
-            log.error(planner.dump());
-
-            throw ex;
-        }
-    }
-
-    /** */
     private QueryPlan prepareExplain(SqlNode explain, PlanningContext ctx) throws ValidationException {
         IgnitePlanner planner = ctx.planner();
 
         SqlNode sql = ((SqlExplain)explain).getExplicandum();
 
         // Validate
-        explain = planner.validate(sql);
+        sql = planner.validate(sql);
 
         // Convert to Relational operators graph
-        IgniteRel igniteRel = optimize(explain, planner);
+        IgniteRel igniteRel = optimize(sql, planner, log);
 
         String plan = RelOptUtil.toString(igniteRel, SqlExplainLevel.ALL_ATTRIBUTES);
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 15a1d6f..dc559f5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -27,6 +27,7 @@ import java.util.function.Supplier;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Spool;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
@@ -373,7 +374,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
 
     /** {@inheritDoc} */
     @Override public Node<Row> visit(IgniteTableSpool rel) {
-        TableSpoolNode<Row> node = new TableSpoolNode<>(ctx, rel.getRowType());
+        TableSpoolNode<Row> node = new TableSpoolNode<>(ctx, rel.getRowType(), rel.readType == Spool.Type.LAZY);
 
         Node<Row> input = visit(rel.getInput());
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PlannerHelper.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PlannerHelper.java
new file mode 100644
index 0000000..0cf58ec
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/PlannerHelper.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.core.Spool;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteRelShuttle;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
+import org.apache.ignite.internal.processors.query.calcite.rel.AbstractIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
+import org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.HintUtils;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** */
+public class PlannerHelper {
+    /**
+     * Default constructor.
+     */
+    private PlannerHelper() {
+
+    }
+
+    /**
+     * @param sqlNode Sql node.
+     * @param planner Planner.
+     * @param log Logger.
+     */
+    public static IgniteRel optimize(SqlNode sqlNode, IgnitePlanner planner, IgniteLogger log) {
+        try {
+            // Convert to Relational operators graph
+            RelRoot root = planner.rel(sqlNode);
+
+            RelNode rel = root.rel;
+
+            if (!F.isEmpty(root.hints))
+                planner.setDisabledRules(HintUtils.disabledRules(root.hints));
+
+            // Transformation chain
+            rel = planner.transform(PlannerPhase.HEURISTIC_OPTIMIZATION, rel.getTraitSet(), rel);
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .replace(root.collation == null ? RelCollations.EMPTY : root.collation)
+                .simplify();
+
+            IgniteRel igniteRel = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+            if (!root.isRefTrivial()) {
+                final List<RexNode> projects = new ArrayList<>();
+                final RexBuilder rexBuilder = igniteRel.getCluster().getRexBuilder();
+
+                for (int field : Pair.left(root.fields))
+                    projects.add(rexBuilder.makeInputRef(igniteRel, field));
+
+                igniteRel = new IgniteProject(igniteRel.getCluster(), desired, igniteRel, projects, root.validatedRowType);
+            }
+
+            if (sqlNode.isA(ImmutableSet.of(SqlKind.INSERT, SqlKind.UPDATE, SqlKind.MERGE)))
+                igniteRel = new FixDependentModifyNodeShuttle().visit(igniteRel);
+
+            return igniteRel;
+        }
+        catch (Throwable ex) {
+            log.error("Unexpected error at query optimizer.", ex);
+            log.error(planner.dump());
+
+            throw ex;
+        }
+    }
+
+    /**
+     * This shuttle analyzes a relational tree and inserts an eager spool node
+     * just under the TableModify node in case latter depends upon a table used
+     * to query the data for modify node to avoid the double processing
+     * of the retrieved rows.
+     * <p/>
+     * It considers two cases: <ol>
+     *     <li>
+     *         Modify node produces rows to insert, then a spool is required.
+     *     </li>
+     *     <li>
+     *         Modify node updates rows only, then a spool is required if 1) we
+     *         are scaning an index and 2) any of the indexed column is updated
+     *         by modify node.
+     *     </li>
+     * <ol/>
+     *
+     */
+    private static class FixDependentModifyNodeShuttle extends IgniteRelShuttle {
+        /**
+         * Flag indicates whether we should insert a spool or not.
+         */
+        private boolean spoolNeeded;
+
+        /** Current modify node. */
+        private IgniteTableModify modifyNode;
+
+        /** {@inheritDoc} */
+        @Override public IgniteRel visit(IgniteTableModify rel) {
+            assert modifyNode == null;
+
+            modifyNode = rel;
+
+            if (rel.isDelete())
+                return rel;
+
+            processNode(rel);
+
+            if (spoolNeeded) {
+                IgniteTableSpool spool = new IgniteTableSpool(
+                    rel.getCluster(),
+                    rel.getInput().getTraitSet(),
+                    Spool.Type.EAGER,
+                    rel.getInput()
+                );
+
+                rel.replaceInput(0, spool);
+            }
+
+            return rel;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteRel visit(IgniteTableScan rel) {
+            return processScan(rel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteRel visit(IgniteIndexScan rel) {
+            return processScan(rel);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected IgniteRel processNode(IgniteRel rel) {
+            List<IgniteRel> inputs = Commons.cast(rel.getInputs());
+
+            for (int i = 0; i < inputs.size(); i++) {
+                if (spoolNeeded)
+                    break;
+
+                visitChild(rel, i, inputs.get(i));
+            }
+
+            return rel;
+        }
+
+        /**
+         * Process a scan node and raise a {@link #spoolNeeded flag} if needed.
+         *
+         * @param scan TableScan to analize.
+         * @return The input rel.
+         */
+        private IgniteRel processScan(TableScan scan) {
+            IgniteTable tbl = modifyNode != null ? modifyNode.getTable().unwrap(IgniteTable.class) : null;
+
+            if (tbl == null || scan.getTable().unwrap(IgniteTable.class) != tbl)
+                return (IgniteRel)scan;
+
+            if (modifyNodeInsertsData()) {
+                spoolNeeded = true;
+
+                return (IgniteRel)scan;
+            }
+
+            // for update-only node the spool needed if any of the updated
+            // column is part of the index we are going to scan
+            if (scan instanceof IgniteTableScan)
+                return (IgniteRel)scan;
+
+            ImmutableSet<Integer> indexedCols = ImmutableSet.copyOf(
+                tbl.getIndex(((AbstractIndexScan)scan).indexName()).collation().getKeys());
+
+            spoolNeeded = modifyNode.getUpdateColumnList().stream()
+                .map(tbl.descriptor()::columnDescriptor)
+                .map(ColumnDescriptor::fieldIndex)
+                .anyMatch(indexedCols::contains);
+
+            return (IgniteRel)scan;
+        }
+
+        /**
+         * @return {@code true} in case {@link #modifyNode} produces any insert.
+         */
+        private boolean modifyNodeInsertsData() {
+            return modifyNode.isInsert(); // MERGE should be analyzed too
+            // but currently it is not implemented
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java
index 7303e84..d07dbcc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ddl/DdlCommandHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec.ddl;
 
+import java.lang.reflect.Type;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -171,7 +172,11 @@ public class DdlCommandHandler {
         for (ColumnDefinition col : cmd.columns()) {
             String name = col.name();
 
-            res.addQueryField(name, tf.getJavaClass(col.type()).getTypeName(), null);
+            Type javaType = tf.getJavaClass(col.type());
+
+            String typeName = javaType instanceof Class ? ((Class<?>)javaType).getName() : javaType.getTypeName();
+
+            res.addQueryField(name, typeName, null);
 
             if (!col.nullable()) {
                 if (notNullFields == null)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
index 7da2e58..8a81492 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
@@ -40,6 +40,12 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
     private final List<Row> rows;
 
     /**
+     * If {@code true} this spool should emit rows as soon as it stored.
+     * If {@code false} the spool have to collect all rows from underlying input.
+     */
+    private final boolean lazyRead;
+
+    /**
      * Flag indicates that spool pushes row to downstream.
      * Need to check a case when a downstream produces requests on push.
      */
@@ -48,9 +54,11 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
     /**
      * @param ctx Execution context.
      */
-    public TableSpoolNode(ExecutionContext<Row> ctx, RelDataType rowType) {
+    public TableSpoolNode(ExecutionContext<Row> ctx, RelDataType rowType, boolean lazyRead) {
         super(ctx, rowType);
 
+        this.lazyRead = lazyRead;
+
         rows = new ArrayList<>();
     }
 
@@ -90,29 +98,32 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
 
     /** */
     private void doPush() throws Exception {
-        if (rowIdx >= rows.size() && waiting == -1 && requested > 0) {
-            downstream().end();
-
+        if (isClosed())
             return;
-        }
 
-        while (requested > 0 && rowIdx < rows.size())
-            pushToDownstream();
-    }
+        if (!lazyRead && waiting != -1)
+            return;
 
-    /** */
-    private void pushToDownstream() throws Exception {
+        int processed = 0;
         inLoop = true;
+        try {
+            while (requested > 0 && rowIdx < rows.size() && processed++ < IN_BUFFER_SIZE) {
+                downstream().push(rows.get(rowIdx));
 
-        downstream().push(rows.get(rowIdx));
-
-        inLoop = false;
-
-        rowIdx++;
-        requested--;
+                rowIdx++;
+                requested--;
+            }
+        }
+        finally {
+            inLoop = false;
+        }
 
-        if (rowIdx >= rows.size() && waiting == -1 && requested > 0)
+        if (rowIdx >= rows.size() && waiting == -1 && requested > 0) {
+            requested = 0;
             downstream().end();
+        }
+        else if (requested > 0 && processed >= IN_BUFFER_SIZE)
+            context().execute(this::doPush, this::onError);
     }
 
     /** {@inheritDoc} */
@@ -130,7 +141,7 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
             source().request(waiting = IN_BUFFER_SIZE);
 
         if (requested > 0 && rowIdx < rows.size())
-            pushToDownstream();
+            doPush();
     }
 
     /** {@inheritDoc} */
@@ -142,7 +153,6 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
 
         waiting = -1;
 
-        if (rowIdx >= rows.size() && requested > 0)
-            downstream().end();
+        context().execute(this::doPush, this::onError);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index fdf9b73..ca3b6d2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -188,7 +188,13 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
         return Pair.of(validatedNode, type);
     }
 
-    public RelDataType conver(SqlDataTypeSpec typeSpec) {
+    /**
+     * Converts a SQL data type specification to a relational data type.
+     *
+     * @param typeSpec Spec to convert from.
+     * @return Relational type representation of given SQL type.
+     */
+    public RelDataType convert(SqlDataTypeSpec typeSpec) {
         return typeSpec.deriveType(validator());
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
index ac46fe4..f39cb07 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteSqlValidator.java
@@ -53,6 +53,7 @@ import org.apache.calcite.sql.validate.SqlValidatorScope;
 import org.apache.calcite.sql.validate.SqlValidatorTable;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTableImpl;
 import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
@@ -128,9 +129,13 @@ public class IgniteSqlValidator extends SqlValidatorImpl {
 
     /** {@inheritDoc} */
     @Override protected SqlSelect createSourceSelectForUpdate(SqlUpdate call) {
-        final SqlNodeList selectList = SqlNodeList.of(
-            new SqlIdentifier(QueryUtils.KEY_FIELD_NAME, SqlParserPos.ZERO),
-            new SqlIdentifier(QueryUtils.VAL_FIELD_NAME, SqlParserPos.ZERO));
+        final SqlNodeList selectList = new SqlNodeList(SqlParserPos.ZERO);
+        final SqlValidatorTable table = getCatalogReader().getTable(((SqlIdentifier)call.getTargetTable()).names);
+
+        table.unwrap(IgniteTable.class).descriptor().selectForUpdateRowType((IgniteTypeFactory)typeFactory)
+            .getFieldNames().stream()
+            .map(name -> new SqlIdentifier(name, SqlParserPos.ZERO))
+            .forEach(selectList::add);
 
         int ordinal = 0;
         // Force unique aliases to avoid a duplicate for Y with SET X=Y
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/DdlSqlToCommandConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/DdlSqlToCommandConverter.java
index 8c0e0c3..984ffb9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/DdlSqlToCommandConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ddl/DdlSqlToCommandConverter.java
@@ -166,7 +166,7 @@ public class DdlSqlToCommandConverter {
                     "querySql=\"" + ctx.query() + "\"]", IgniteQueryErrorCode.PARSING);
 
             String name = col.name.getSimple();
-            RelDataType type = planner.conver(col.dataType);
+            RelDataType type = planner.convert(col.dataType);
 
             Object dflt = null;
             if (col.expression != null)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
index 0ff966a..66c274d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableModify.java
@@ -27,9 +27,7 @@ import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rex.RexNode;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
-/**
- *
- */
+/** */
 public class IgniteTableModify extends TableModify implements IgniteRel {
     /**
      * Creates a {@code TableModify}.
@@ -38,28 +36,34 @@ public class IgniteTableModify extends TableModify implements IgniteRel {
      * <blockquote>
      * <pre>UPDATE table SET iden1 = exp1, ident2 = exp2  WHERE condition</pre>
      * </blockquote>
-     *  @param cluster              Cluster this relational expression belongs to
-     * @param traitSet             Traits of this relational expression
-     * @param table                Target table to modify
-     * @param input                Sub-query or filter condition
-     * @param operation            Modify operation (INSERT, UPDATE, DELETE)
-     * @param updateColumnList     List of column identifiers to be updated
-     *                             (e.g. ident1, ident2); null if not UPDATE
-     * @param sourceExpressionList List of value expressions to be set
-     *                             (e.g. exp1, exp2); null if not UPDATE
-     * @param flattened            Whether set flattens the input row type
+     *
+     * @param cluster Cluster this relational expression belongs to.
+     * @param traitSet Traits of this relational expression.
+     * @param table Target table to modify.
+     * @param input Sub-query or filter condition.
+     * @param operation Modify operation (INSERT, UPDATE, DELETE).
+     * @param updateColumnList List of column identifiers to be updated (e.g. ident1, ident2); null if not UPDATE.
+     * @param sourceExpressionList List of value expressions to be set (e.g. exp1, exp2); null if not UPDATE.
+     * @param flattened Whether set flattens the input row type.
      */
-    public IgniteTableModify(RelOptCluster cluster,
+    public IgniteTableModify(
+        RelOptCluster cluster,
         RelTraitSet traitSet,
         RelOptTable table,
         RelNode input,
         Operation operation,
         List<String> updateColumnList,
         List<RexNode> sourceExpressionList,
-        boolean flattened) {
+        boolean flattened
+    ) {
         super(cluster, traitSet, table, Commons.context(cluster).catalogReader(), input, operation, updateColumnList, sourceExpressionList, flattened);
     }
 
+    /**
+     * Creates a {@code TableModify} from serialized {@link RelInput input}.
+     *
+     * @param input The input to create node from.
+     */
     public IgniteTableModify(RelInput input) {
         this(
             input.getCluster(),
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
index fc5d095..9c5a266 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableSpool.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
-
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -30,8 +29,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
 
-import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
-
 /**
  * Relational operator that returns the contents of a table.
  */
@@ -40,9 +37,10 @@ public class IgniteTableSpool extends Spool implements IgniteRel {
     public IgniteTableSpool(
         RelOptCluster cluster,
         RelTraitSet traits,
+        Spool.Type readType,
         RelNode input
     ) {
-        super(cluster, traits, input, Type.LAZY, Type.EAGER);
+        super(cluster, traits, input, readType, Type.EAGER);
     }
 
     /**
@@ -52,9 +50,10 @@ public class IgniteTableSpool extends Spool implements IgniteRel {
      */
     public IgniteTableSpool(RelInput input) {
         this(
-            changeTraits(input, IgniteConvention.INSTANCE).getCluster(),
-            changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(),
-            changeTraits(input, IgniteConvention.INSTANCE).getInput()
+            input.getCluster(),
+            input.getTraitSet().replace(IgniteConvention.INSTANCE),
+            input.getEnum("readType", Spool.Type.class),
+            input.getInput()
         );
     }
 
@@ -65,12 +64,12 @@ public class IgniteTableSpool extends Spool implements IgniteRel {
 
     /** */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteTableSpool(cluster, getTraitSet(), inputs.get(0));
+        return new IgniteTableSpool(cluster, getTraitSet(), readType, inputs.get(0));
     }
 
     /** {@inheritDoc} */
     @Override protected Spool copy(RelTraitSet traitSet, RelNode input, Type readType, Type writeType) {
-        return new IgniteTableSpool(getCluster(), traitSet, input);
+        return new IgniteTableSpool(getCluster(), traitSet, readType, input);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
index bacafd8..3adfbfb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java
@@ -83,12 +83,12 @@ public interface TableDescriptor extends RelProtoDataType, InitializerExpression
     }
 
     /**
-     * Returns row type including effectively virtual fields.
+     * Returns row type of internal fields (either virtual or real) only.
      *
      * @param factory Type factory.
      * @return Row type for SELECT operation.
      */
-    default RelDataType selectRowType(IgniteTypeFactory factory) {
+    default RelDataType selectForUpdateRowType(IgniteTypeFactory factory) {
         return rowType(factory, null);
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
index 44c8437..9aa2692 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
@@ -211,6 +211,11 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
     }
 
     /** {@inheritDoc} */
+    @Override public RelDataType selectForUpdateRowType(IgniteTypeFactory factory) {
+        return rowType(factory, ImmutableBitSet.of(keyField, valField));
+    }
+
+    /** {@inheritDoc} */
     @Override public GridCacheContext cacheContext() {
         return cacheInfo.cacheContext();
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 04094fd..7f20df0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -43,6 +43,7 @@ import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Spool;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
@@ -176,7 +177,7 @@ public class TraitUtils {
         RelTraitSet traits = rel.getTraitSet()
             .replace(toTrait);
 
-        return new IgniteTableSpool(rel.getCluster(), traits, rel);
+        return new IgniteTableSpool(rel.getCluster(), traits, Spool.Type.LAZY, rel);
     }
 
     /** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/HintUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/HintUtils.java
index 4076f5f..35d9ada 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/HintUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/HintUtils.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.query.calcite.util;
 import java.util.Collections;
 import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.calcite.rel.hint.Hintable;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.ignite.internal.util.typedef.F;
 
 /** */
 public class HintUtils {
@@ -30,11 +33,11 @@ public class HintUtils {
     }
 
     /** */
-    public static Set<String> disabledRules(Hintable rel) {
-        if (rel.getHints().isEmpty())
+    public static Set<String> disabledRules(ImmutableList<RelHint> hints) {
+        if (F.isEmpty(hints))
             return Collections.emptySet();
 
-        return rel.getHints().stream()
+        return hints.stream()
             .filter(h -> "DISABLE_RULE".equals(h.hintName))
             .flatMap(h -> h.listOptions.stream())
             .collect(Collectors.toSet());
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index edb10a2..1ff9268 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -1038,117 +1038,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         assertEqualsCollections(Arrays.asList("Roman", 0, "Ignite"), F.first(query.get(1).getAll()));
     }
 
-    /** */
-    @Test
-    public void testInsertPrimitiveKey() throws Exception {
-        grid(1).getOrCreateCache(new CacheConfiguration<Integer, Developer>()
-            .setName("developer")
-            .setSqlSchema("PUBLIC")
-            .setIndexedTypes(Integer.class, Developer.class)
-            .setBackups(2)
-        );
-
-        QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
-
-        List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
-            "INSERT INTO DEVELOPER(_key, name, projectId) VALUES (?, ?, ?)", 0, "Igor", 1);
-
-        assertEquals(1, query.size());
-
-        List<List<?>> rows = query.get(0).getAll();
-
-        assertEquals(1, rows.size());
-
-        List<?> row = rows.get(0);
-
-        assertNotNull(row);
-
-        assertEqualsCollections(F.asList(1L), row);
-
-        query = engine.query(null, "PUBLIC", "select _key, * from DEVELOPER");
-
-        assertEquals(1, query.size());
-
-        row = F.first(query.get(0).getAll());
-
-        assertNotNull(row);
-
-        assertEqualsCollections(Arrays.asList(0, "Igor", 1), row);
-    }
-
-    /** */
-    @Test
-    public void testInsertUpdateDeleteNonPrimitiveKey() throws Exception {
-        client.getOrCreateCache(new CacheConfiguration<Key, Developer>()
-            .setName("developer")
-            .setSqlSchema("PUBLIC")
-            .setIndexedTypes(Key.class, Developer.class)
-            .setBackups(2)
-        );
-
-        awaitPartitionMapExchange(true, true, null);
-
-        QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
-
-        List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", "INSERT INTO DEVELOPER VALUES (?, ?, ?, ?)", 0, 0, "Igor", 1);
-
-        assertEquals(1, query.size());
-
-        List<?> row = F.first(query.get(0).getAll());
-
-        assertNotNull(row);
-
-        assertEqualsCollections(F.asList(1L), row);
-
-        query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
-
-        assertEquals(1, query.size());
-
-        row = F.first(query.get(0).getAll());
-
-        assertNotNull(row);
-
-        assertEqualsCollections(F.asList(0, 0, "Igor", 1), row);
-
-        query = engine.query(null, "PUBLIC", "UPDATE DEVELOPER d SET name = 'Roman' WHERE id = ?", 0);
-
-        assertEquals(1, query.size());
-
-        row = F.first(query.get(0).getAll());
-
-        assertNotNull(row);
-
-        assertEqualsCollections(F.asList(1L), row);
-
-        query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
-
-        assertEquals(1, query.size());
-
-        row = F.first(query.get(0).getAll());
-
-        assertNotNull(row);
-
-        assertEqualsCollections(F.asList(0, 0, "Roman", 1), row);
-
-        query = engine.query(null, "PUBLIC", "DELETE FROM DEVELOPER WHERE id = ?", 0);
-
-        assertEquals(1, query.size());
-
-        row = F.first(query.get(0).getAll());
-
-        assertNotNull(row);
-
-        assertEqualsCollections(F.asList(1L), row);
-
-        query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
-
-        assertEquals(1, query.size());
-
-        row = F.first(query.get(0).getAll());
-
-        assertNull(row);
-    }
-
     /**
      * Test verifies that table has a distribution function over valid keys.
      */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java
index 6a08401..23e2012 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java
@@ -17,11 +17,14 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.stream.IntStream;
 
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
@@ -32,6 +35,8 @@ import org.jetbrains.annotations.NotNull;
 import org.junit.Before;
 import org.junit.Test;
 
+import static java.util.Collections.singletonList;
+
 /**
  *
  */
@@ -47,11 +52,71 @@ public class TableSpoolExecutionTest extends AbstractExecutionTest {
         super.setup();
     }
 
+    /** */
+    @Test
+    public void testLazyTableSpool() throws Exception {
+        checkTableSpool(
+            (ctx, rowType) -> new TableSpoolNode<>(ctx, rowType, true)
+        );
+    }
+
+    /** */
+    @Test
+    public void testEagerTableSpool() throws Exception {
+        checkTableSpool(
+            (ctx, rowType) -> new TableSpoolNode<>(ctx, rowType, false)
+        );
+    }
+
     /**
+     * Ensure eager spool reads underlying input till the end before emmitting
+     * the very first row.
      *
+     * @throws IgniteCheckedException In case if error.
      */
     @Test
-    public void testTableSpool() throws Exception {
+    public void testEagerSpoolReadsWholeInput() throws IgniteCheckedException {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
+
+        int inBufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+        int[] sizes = {inBufSize / 2, inBufSize, inBufSize + 1, inBufSize * 2};
+
+        for (int size : sizes) {
+            log.info("Check: size=" + size);
+
+            AtomicReference<Iterator<Object[]>> itRef = new AtomicReference<>();
+
+            ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, new Iterable<Object[]>() {
+                @NotNull @Override public Iterator<Object[]> iterator() {
+                    if (itRef.get() != null)
+                        throw new AssertionError();
+
+                    itRef.set(IntStream.range(0, size).boxed().map(i -> new Object[]{i}).iterator());
+
+                    return itRef.get();
+                }
+            });
+
+            TableSpoolNode<Object[]> spool = new TableSpoolNode<>(ctx, rowType, false);
+
+            spool.register(singletonList(scan));
+
+            RootNode<Object[]> root = new RootNode<>(ctx, rowType);
+            root.register(spool);
+
+            assertTrue(root.hasNext());
+
+            root.next();
+
+            assertFalse(itRef.get().hasNext());
+        }
+    }
+
+    /** */
+    public void checkTableSpool(BiFunction<ExecutionContext<Object[]>, RelDataType, TableSpoolNode<Object[]>> spoolFactory) throws Exception {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
@@ -76,9 +141,9 @@ public class TableSpoolExecutionTest extends AbstractExecutionTest {
                 }
             });
 
-            TableSpoolNode<Object[]> spool = new TableSpoolNode<>(ctx, rowType);
+            TableSpoolNode<Object[]> spool = spoolFactory.apply(ctx, rowType);
 
-            spool.register(Arrays.asList(right));
+            spool.register(singletonList(right));
 
             RootRewindable<Object[]> root = new RootRewindable<>(ctx, rowType);
             root.register(spool);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
similarity index 94%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 29f7786..94fe7a7 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AbstractBasicIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -24,6 +24,8 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java
similarity index 98%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java
index f55d5d8..d69e35d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/AggregatesIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AggregatesIntegrationTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import org.junit.Test;
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CalciteErrorHandlilngIntegrationTest.java
similarity index 99%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CalciteErrorHandlilngIntegrationTest.java
index bbe1592..c1a9614 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteErrorHandlilngIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CalciteErrorHandlilngIntegrationTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexSpoolIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexSpoolIntegrationTest.java
similarity index 98%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexSpoolIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexSpoolIntegrationTest.java
index a040acc..61360d2 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexSpoolIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexSpoolIntegrationTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import java.util.ArrayList;
 import java.util.Collection;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/MetadataIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MetadataIntegrationTest.java
similarity index 97%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/MetadataIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MetadataIntegrationTest.java
index 3cbf275..51d8bac 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/MetadataIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/MetadataIntegrationTest.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import org.junit.Test;
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/SortAggregateIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SortAggregateIntegrationTest.java
similarity index 98%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/SortAggregateIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SortAggregateIntegrationTest.java
index 718b2e2..1de1a81 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/SortAggregateIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SortAggregateIntegrationTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import java.util.Arrays;
 import java.util.Collections;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/TableDdlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
similarity index 98%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/TableDdlIntegrationTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
index 67c1e2e..67b34e9 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/TableDdlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.integration;
 
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -38,6 +38,7 @@ import org.apache.ignite.configuration.SqlConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.testframework.GridTestUtils;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDmlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDmlIntegrationTest.java
new file mode 100644
index 0000000..3091dba
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDmlIntegrationTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/** */
+public class TableDmlIntegrationTest extends GridCommonAbstractTest {
+    /** */
+    private static final String CLIENT_NODE_NAME = "client";
+
+    /** */
+    private IgniteEx client;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(2);
+
+        client = startClientGrid(CLIENT_NODE_NAME);
+    }
+
+    /** */
+    @Before
+    public void init() {
+        client = grid(CLIENT_NODE_NAME);
+    }
+
+    /** */
+    @After
+    public void cleanUp() {
+        client.destroyCaches(client.cacheNames());
+    }
+
+    /**
+     * Test verifies that already inserted by the current query data
+     * is not processed by this query again.
+     */
+    @Test
+    public void testInsertAsSelect() {
+        executeSql("CREATE TABLE test (epoch_cur int, epoch_copied int)");
+        executeSql("INSERT INTO test VALUES (0, 0)");
+
+        final String insertAsSelectSql = "INSERT INTO test SELECT ?, epoch_cur FROM test";
+
+        for (int i = 1; i < 16; i++) {
+            executeSql(insertAsSelectSql, i);
+
+            List<List<?>> rows = executeSql("SELECT * FROM test WHERE epoch_copied = ?", i);
+
+            assertEquals("Unexpected rows for epoch " + i, 0, rows.size());
+        }
+    }
+
+    /**
+     * Test verifies that cuncurrent updates does not affect (in terms of its size)
+     * a result set provided for insertion.
+     */
+    @Test
+    public void testInsertAsSelectWithConcurrentDataModification() throws IgniteCheckedException {
+        executeSql("CREATE TABLE test (id int primary key, val int) with cache_name=\"test\", value_type=\"my_type\"");
+        IgniteCache<Integer, Object> cache = grid(0).cache("test").withKeepBinary();
+
+        BinaryObjectBuilder builder = grid(0).binary().builder("my_type");
+
+        for (int i = 0; i < 128; i++)
+            cache.put(i, builder.setField("val", i).build());
+
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+            while (!stop.get())
+                cache.put(ThreadLocalRandom.current().nextInt(128), builder.setField("val", 0).build());
+        });
+
+        for (int i = 8; i < 18; i++) {
+            int off = 1 << (i - 1);
+
+            executeSql("INSERT INTO test SELECT id + ?::INT, val FROM test", off);
+
+            long cnt = (Long)executeSql("SELECT count(*) FROM test").get(0).get(0);
+
+            assertEquals("Unexpected rows count", 1 << i, cnt);
+        }
+
+        stop.set(true);
+        fut.get(getTestTimeout());
+    }
+
+    /**
+     * Ensure that update node updates each row only once.
+     */
+    @Test
+    public void testUpdate() {
+        executeSql("CREATE TABLE test (val integer)");
+
+        client.context().query().querySqlFields(
+            new SqlFieldsQuery("CREATE INDEX test_val_idx ON test (val)").setSchema("PUBLIC"), false).getAll();
+
+        for (int i = 1; i <= 4096; i++)
+            executeSql("INSERT INTO test VALUES (?)", i);
+
+        final String updateSql = "UPDATE test SET val = val * 10";
+
+        int mul = 1;
+        for (int i = 1; i < 5; i++) {
+            mul *= 10;
+            executeSql(updateSql);
+        }
+
+        final int fMul = mul;
+
+        List<List<?>> rows = executeSql("SELECT val FROM test ORDER BY val");
+
+        List<Integer> vals = rows.stream().map(r -> (Integer)r.get(0)).collect(Collectors.toList());
+
+        for (int rowNum = 1; rowNum <= rows.size(); rowNum++) {
+            assertEquals(
+                "Unexpected results: " + S.compact(vals, i -> i + fMul),
+                rowNum * fMul,
+                rows.get(rowNum - 1).get(0)
+            );
+        }
+    }
+
+    /** */
+    @Test
+    public void testInsertPrimitiveKey() {
+        grid(1).getOrCreateCache(new CacheConfiguration<Integer, CalciteQueryProcessorTest.Developer>()
+            .setName("developer")
+            .setSqlSchema("PUBLIC")
+            .setIndexedTypes(Integer.class, CalciteQueryProcessorTest.Developer.class)
+            .setBackups(2)
+        );
+
+        QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+
+        List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
+            "INSERT INTO DEVELOPER(_key, name, projectId) VALUES (?, ?, ?)", 0, "Igor", 1);
+
+        assertEquals(1, query.size());
+
+        List<List<?>> rows = query.get(0).getAll();
+
+        assertEquals(1, rows.size());
+
+        List<?> row = rows.get(0);
+
+        assertNotNull(row);
+
+        assertEqualsCollections(F.asList(1L), row);
+
+        query = engine.query(null, "PUBLIC", "select _key, * from DEVELOPER");
+
+        assertEquals(1, query.size());
+
+        row = F.first(query.get(0).getAll());
+
+        assertNotNull(row);
+
+        assertEqualsCollections(Arrays.asList(0, "Igor", 1), row);
+    }
+
+    /** */
+    @Test
+    public void testInsertUpdateDeleteNonPrimitiveKey() throws Exception {
+        client.getOrCreateCache(new CacheConfiguration<CalciteQueryProcessorTest.Key, CalciteQueryProcessorTest.Developer>()
+            .setName("developer")
+            .setSqlSchema("PUBLIC")
+            .setIndexedTypes(CalciteQueryProcessorTest.Key.class, CalciteQueryProcessorTest.Developer.class)
+            .setBackups(2)
+        );
+
+        awaitPartitionMapExchange(true, true, null);
+
+        QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+
+        List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC", "INSERT INTO DEVELOPER VALUES (?, ?, ?, ?)", 0, 0, "Igor", 1);
+
+        assertEquals(1, query.size());
+
+        List<?> row = F.first(query.get(0).getAll());
+
+        assertNotNull(row);
+
+        assertEqualsCollections(F.asList(1L), row);
+
+        query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
+
+        assertEquals(1, query.size());
+
+        row = F.first(query.get(0).getAll());
+
+        assertNotNull(row);
+
+        assertEqualsCollections(F.asList(0, 0, "Igor", 1), row);
+
+        query = engine.query(null, "PUBLIC", "UPDATE DEVELOPER d SET name = 'Roman' WHERE id = ?", 0);
+
+        assertEquals(1, query.size());
+
+        row = F.first(query.get(0).getAll());
+
+        assertNotNull(row);
+
+        assertEqualsCollections(F.asList(1L), row);
+
+        query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
+
+        assertEquals(1, query.size());
+
+        row = F.first(query.get(0).getAll());
+
+        assertNotNull(row);
+
+        assertEqualsCollections(F.asList(0, 0, "Roman", 1), row);
+
+        query = engine.query(null, "PUBLIC", "DELETE FROM DEVELOPER WHERE id = ?", 0);
+
+        assertEquals(1, query.size());
+
+        row = F.first(query.get(0).getAll());
+
+        assertNotNull(row);
+
+        assertEqualsCollections(F.asList(1L), row);
+
+        query = engine.query(null, "PUBLIC", "select * from DEVELOPER");
+
+        assertEquals(1, query.size());
+
+        row = F.first(query.get(0).getAll());
+
+        assertNull(row);
+    }
+
+    /** */
+    private List<List<?>> executeSql(String sql, Object... args) {
+        List<FieldsQueryCursor<List<?>>> cur = queryProcessor().query(null, "PUBLIC", sql, args);
+
+        try (QueryCursor<List<?>> srvCursor = cur.get(0)) {
+            return srvCursor.getAll();
+        }
+    }
+
+    /** */
+    private CalciteQueryProcessor queryProcessor() {
+        return Commons.lookupComponent(client.context(), CalciteQueryProcessor.class);
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index f8fa2f5..5fa68ee 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -24,8 +24,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableSet;
@@ -46,23 +48,36 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelReferentialConstraint;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rel.type.RelDataTypeImpl;
 import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ColumnStrategy;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Statistic;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql2rel.InitializerContext;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.PlannerHelper;
 import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
 import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
@@ -71,26 +86,26 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGr
 import org.apache.ignite.internal.processors.query.calcite.prepare.Cloner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptor;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
-import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
@@ -112,6 +127,12 @@ import static org.apache.ignite.internal.processors.query.calcite.externalize.Re
 @SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
 public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
     /** */
+    protected static final IgniteTypeFactory TYPE_FACTORY = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+    /** */
+    protected static final int DEFAULT_TBL_SIZE = 500_000;
+
+    /** */
     protected List<UUID> nodes;
 
     /** */
@@ -235,11 +256,11 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
             .topologyVersion(AffinityTopologyVersion.NONE)
             .build();
 
-        RelRoot relRoot;
-
         try (IgnitePlanner planner = ctx.planner()) {
             assertNotNull(planner);
 
+            planner.setDisabledRules(ImmutableSet.copyOf(disabledRules));
+
             String qry = ctx.query();
 
             assertNotNull(qry);
@@ -250,29 +271,12 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
             // Validate
             sqlNode = planner.validate(sqlNode);
 
-            // Convert to Relational operators graph
-            relRoot = planner.rel(sqlNode);
-
-            RelNode rel = relRoot.rel;
-
-            assertNotNull(rel);
-
-            // Transformation chain
-            RelTraitSet desired = rel.getTraitSet()
-                .replace(IgniteConvention.INSTANCE)
-                .replace(IgniteDistributions.single())
-                .replace(CorrelationTrait.UNCORRELATED)
-                .replace(RewindabilityTrait.ONE_WAY)
-                .simplify();
-
-            planner.setDisabledRules(ImmutableSet.copyOf(disabledRules));
-
             try {
-                IgniteRel res = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+                IgniteRel rel = PlannerHelper.optimize(sqlNode, planner, log);
 
-//                System.out.println(planner.dump());
+//                System.out.println(RelOptUtil.toString(rel));
 
-                return res;
+                return rel;
             }
             catch (Throwable ex) {
                 System.err.println(planner.dump());
@@ -561,6 +565,62 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
         };
     }
 
+    /**
+     * Creates test table with given params.
+     *
+     * @param name Name of the table.
+     * @param distr Distribution of the table.
+     * @param fields List of the required fields. Every odd item should be a string
+     *               representing a column name, every even item should be a class representing column's type.
+     *               E.g. {@code createTable("MY_TABLE", distribution, "ID", Integer.class, "VAL", String.class)}.
+     * @return Instance of the {@link TestTable}.
+     */
+    protected static TestTable createTable(String name, IgniteDistribution distr, Object... fields) {
+        return createTable(name, DEFAULT_TBL_SIZE, distr, fields);
+    }
+
+    /**
+     * Creates test table with given params.
+     *
+     * @param name Name of the table.
+     * @param size Required size of the table.
+     * @param distr Distribution of the table.
+     * @param fields List of the required fields. Every odd item should be a string
+     *               representing a column name, every even item should be a class representing column's type.
+     *               E.g. {@code createTable("MY_TABLE", 500, distribution, "ID", Integer.class, "VAL", String.class)}.
+     * @return Instance of the {@link TestTable}.
+     */
+    protected static TestTable createTable(String name, int size, IgniteDistribution distr, Object... fields) {
+        if (F.isEmpty(fields) || fields.length % 2 != 0)
+            throw new IllegalArgumentException("'fields' should be non-null array with even number of elements");
+
+        RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(TYPE_FACTORY);
+
+        for (int i = 0; i < fields.length; i += 2)
+            b.add((String)fields[i], TYPE_FACTORY.createJavaType((Class<?>)fields[i + 1]));
+
+        return new TestTable(name, b.build(), RewindabilityTrait.REWINDABLE, size) {
+            @Override public IgniteDistribution distribution() {
+                return distr;
+            }
+        };
+    }
+
+    /**
+     * Creates public schema from provided tables.
+     *
+     * @param tbls Tables to create schema for.
+     * @return Public schema.
+     */
+    protected static IgniteSchema createSchema(TestTable... tbls) {
+        IgniteSchema schema = new IgniteSchema("PUBLIC");
+
+        for (TestTable tbl : tbls)
+            schema.addTable(tbl.name(), tbl);
+
+        return schema;
+    }
+
     /** */
     abstract static class TestTable implements IgniteTable {
         /** */
@@ -579,6 +639,9 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
         private final double rowCnt;
 
         /** */
+        private final TableDescriptor desc;
+
+        /** */
         TestTable(RelDataType type) {
             this(type, RewindabilityTrait.REWINDABLE);
         }
@@ -599,6 +662,8 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
             this.rewindable = rewindable;
             this.rowCnt = rowCnt;
             this.name = name;
+
+            desc = new TestTableDescriptor(this::distribution, type);
         }
 
         /** {@inheritDoc} */
@@ -711,7 +776,7 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public TableDescriptor descriptor() {
-            throw new AssertionError();
+            return desc;
         }
 
         /** {@inheritDoc} */
@@ -748,6 +813,171 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
     }
 
     /** */
+    static class TestTableDescriptor implements TableDescriptor {
+        /** */
+        private final Supplier<IgniteDistribution> distributionSupp;
+
+        /** */
+        private final RelDataType rowType;
+
+        /** */
+        public TestTableDescriptor(Supplier<IgniteDistribution> distribution, RelDataType rowType) {
+            this.distributionSupp = distribution;
+            this.rowType = rowType;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheContextInfo cacheInfo() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridCacheContext cacheContext() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteDistribution distribution() {
+            return distributionSupp.get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ColocationGroup colocationGroup(PlanningContext ctx) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType rowType(IgniteTypeFactory factory, ImmutableBitSet usedColumns) {
+            return rowType;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isUpdateAllowed(RelOptTable tbl, int colIdx) {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean match(CacheDataRow row) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public <Row> Row toRow(ExecutionContext<Row> ectx, CacheDataRow row, RowHandler.RowFactory<Row> factory,
+            @Nullable ImmutableBitSet requiredColunms) throws IgniteCheckedException {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public <Row> IgniteBiTuple toTuple(ExecutionContext<Row> ectx, Row row, TableModify.Operation op,
+            @Nullable Object arg) throws IgniteCheckedException {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ColumnDescriptor columnDescriptor(String fieldName) {
+            RelDataTypeField field = rowType.getField(fieldName, false, false);
+            return new TestColumnDescriptor(field.getIndex(), fieldName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridQueryTypeDescriptor typeDescription() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isGeneratedAlways(RelOptTable table, int iColumn) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ColumnStrategy generationStrategy(RelOptTable table, int iColumn) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public RexNode newColumnDefaultValue(RelOptTable table, int iColumn, InitializerContext context) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public BiFunction<InitializerContext, RelNode, RelNode> postExpressionConversionHook() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public RexNode newAttributeInitializer(RelDataType type, SqlFunction constructor, int iAttribute,
+            List<RexNode> constructorArgs, InitializerContext context) {
+            throw new AssertionError();
+        }
+    }
+
+    /** */
+    static class TestColumnDescriptor implements ColumnDescriptor {
+        /** */
+        private final int idx;
+
+        /** */
+        private final String name;
+
+        /** */
+        public TestColumnDescriptor(int idx, String name) {
+            this.idx = idx;
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean field() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean key() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasDefaultValue() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String name() {
+            return name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int fieldIndex() {
+            return idx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType logicalType(IgniteTypeFactory f) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Class<?> storageType() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object value(ExecutionContext<?> ectx, GridCacheContext<?, ?> cctx,
+            CacheDataRow src) throws IgniteCheckedException {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object defaultValue() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void set(Object dst, Object val) throws IgniteCheckedException {
+            throw new AssertionError();
+        }
+    }
+
+    /** */
     static class TestMessageServiceImpl extends MessageServiceImpl {
         /** */
         private final TestIoManager mgr;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java
index d833245..cf01a88 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java
@@ -22,7 +22,6 @@ import java.util.List;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
@@ -30,12 +29,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
-import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.junit.Test;
 
@@ -51,12 +45,6 @@ import static org.junit.Assert.assertThat;
  * Test suite to verify join colocation.
  */
 public class JoinColocationPlannerTest extends AbstractPlannerTest {
-    /** */
-    private static final IgniteTypeFactory TYPE_FACTORY = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
-    /** */
-    private static final int DEFAULT_TBL_SIZE = 500_000;
-
     /**
      * Join of the same tables with a simple affinity is expected to be colocated.
      */
@@ -213,60 +201,4 @@ public class JoinColocationPlannerTest extends AbstractPlannerTest {
 
         assertThat(invalidPlanMsg, exchange, nullValue());
     }
-
-    /**
-     * Creates test table with given params.
-     *
-     * @param name Name of the table.
-     * @param distr Distribution of the table.
-     * @param fields List of the required fields. Every odd item should be a string
-     *               representing a column name, every even item should be a class representing column's type.
-     *               E.g. {@code createTable("MY_TABLE", distribution, "ID", Integer.class, "VAL", String.class)}.
-     * @return Instance of the {@link TestTable}.
-     */
-    private static TestTable createTable(String name, IgniteDistribution distr, Object... fields) {
-        return createTable(name, DEFAULT_TBL_SIZE, distr, fields);
-    }
-
-    /**
-     * Creates test table with given params.
-     *
-     * @param name Name of the table.
-     * @param size Required size of the table.
-     * @param distr Distribution of the table.
-     * @param fields List of the required fields. Every odd item should be a string
-     *               representing a column name, every even item should be a class representing column's type.
-     *               E.g. {@code createTable("MY_TABLE", 500, distribution, "ID", Integer.class, "VAL", String.class)}.
-     * @return Instance of the {@link TestTable}.
-     */
-    private static TestTable createTable(String name, int size, IgniteDistribution distr, Object... fields) {
-        if (F.isEmpty(fields) || fields.length % 2 != 0)
-            throw new IllegalArgumentException("'fields' should be non-null array with even number of elements");
-
-        RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(TYPE_FACTORY);
-
-        for (int i = 0; i < fields.length; i += 2)
-            b.add((String)fields[i], TYPE_FACTORY.createJavaType((Class<?>)fields[i + 1]));
-
-        return new TestTable(name, b.build(), RewindabilityTrait.REWINDABLE, size) {
-            @Override public IgniteDistribution distribution() {
-                return distr;
-            }
-        };
-    }
-
-    /**
-     * Creates public schema from provided tables.
-     *
-     * @param tbls Tables to create schema for.
-     * @return Public schema.
-     */
-    private static IgniteSchema createSchema(TestTable... tbls) {
-        IgniteSchema schema = new IgniteSchema("PUBLIC");
-
-        for (TestTable tbl : tbls)
-            schema.addTable(tbl.name(), tbl);
-
-        return schema;
-    }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java
new file mode 100644
index 0000000..5cd634d
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableDmlPlannerTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Spool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Table spool test.
+ */
+@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
+public class TableDmlPlannerTest extends AbstractPlannerTest {
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void insertCachesTableScan() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.random(), "VAL", Integer.class)
+        );
+
+        String sql = "insert into test select 2 * val from test";
+
+        RelNode phys = physicalPlan(sql, schema, "LogicalIndexScanConverterRule");
+
+        assertNotNull(phys);
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        IgniteTableModify modifyNode = findFirstNode(phys, byClass(IgniteTableModify.class));
+
+        assertThat(invalidPlanMsg, modifyNode, notNullValue());
+        assertThat(invalidPlanMsg, modifyNode.getInput(), instanceOf(Spool.class));
+
+        Spool spool = (Spool)modifyNode.getInput();
+
+        assertThat(invalidPlanMsg, spool.readType, equalTo(Spool.Type.EAGER));
+        assertThat(invalidPlanMsg, findFirstNode(phys, byClass(IgniteTableScan.class)), notNullValue());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void insertCachesIndexScan() throws Exception {
+        TestTable tbl = createTable("TEST", IgniteDistributions.random(), "VAL", Integer.class);
+
+        tbl.addIndex(new IgniteIndex(RelCollations.of(0), "IDX", null, tbl));
+
+        IgniteSchema schema = createSchema(tbl);
+
+        String sql = "insert into test select 2 * val from test";
+
+        RelNode phys = physicalPlan(sql, schema, "LogicalTableScanConverterRule");
+
+        assertNotNull(phys);
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        IgniteTableModify modifyNode = findFirstNode(phys, byClass(IgniteTableModify.class));
+
+        assertThat(invalidPlanMsg, modifyNode, notNullValue());
+        assertThat(invalidPlanMsg, modifyNode.getInput(), instanceOf(Spool.class));
+
+        Spool spool = (Spool)modifyNode.getInput();
+
+        assertThat(invalidPlanMsg, spool.readType, equalTo(Spool.Type.EAGER));
+        assertThat(invalidPlanMsg, findFirstNode(phys, byClass(IgniteIndexScan.class)), notNullValue());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void updateNotCachesTableScan() throws Exception {
+        IgniteSchema schema = createSchema(
+            createTable("TEST", IgniteDistributions.random(), "VAL", Integer.class)
+        );
+
+        String sql = "update test set val = 2 * val";
+
+        RelNode phys = physicalPlan(sql, schema, "LogicalIndexScanConverterRule");
+
+        assertNotNull(phys);
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        assertThat(invalidPlanMsg, findFirstNode(phys, byClass(Spool.class)), nullValue());
+        assertThat(invalidPlanMsg, findFirstNode(phys, byClass(IgniteTableScan.class)), notNullValue());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void updateNotCachesNonDependentIndexScan() throws Exception {
+        TestTable tbl = createTable("TEST", IgniteDistributions.random(), "VAL", Integer.class, "IDX_VAL", Integer.class);
+
+        tbl.addIndex(new IgniteIndex(RelCollations.of(1), "IDX", null, tbl));
+
+        IgniteSchema schema = createSchema(tbl);
+
+        String sql = "update test set val = 2 * val where idx_val between 2 and 10";
+
+        RelNode phys = physicalPlan(sql, schema, "LogicalTableScanConverterRule");
+
+        assertNotNull(phys);
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        assertThat(invalidPlanMsg, findFirstNode(phys, byClass(Spool.class)), nullValue());
+        assertThat(invalidPlanMsg, findFirstNode(phys, byClass(IgniteIndexScan.class)), notNullValue());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void updateCachesDependentIndexScan() throws Exception {
+        TestTable tbl = createTable("TEST", IgniteDistributions.random(), "VAL", Integer.class);
+
+        tbl.addIndex(new IgniteIndex(RelCollations.of(0), "IDX", null, tbl));
+
+        IgniteSchema schema = createSchema(tbl);
+
+        String sql = "update test set val = 2 * val where val between 2 and 10";
+
+        RelNode phys = physicalPlan(sql, schema, "LogicalTableScanConverterRule");
+
+        assertNotNull(phys);
+
+        String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys);
+
+        IgniteTableModify modifyNode = findFirstNode(phys, byClass(IgniteTableModify.class));
+
+        assertThat(invalidPlanMsg, modifyNode, notNullValue());
+        assertThat(invalidPlanMsg, modifyNode.getInput(), instanceOf(Spool.class));
+
+        Spool spool = (Spool)modifyNode.getInput();
+
+        assertThat(invalidPlanMsg, spool.readType, equalTo(Spool.Type.EAGER));
+        assertThat(invalidPlanMsg, findFirstNode(phys, byClass(IgniteIndexScan.class)), notNullValue());
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 93b2d3e..9880404 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -17,25 +17,9 @@
 
 package org.apache.ignite.testsuites;
 
-import org.apache.ignite.internal.processors.query.calcite.AggregatesIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondaryIndexIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.CalciteErrorHandlilngIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
-import org.apache.ignite.internal.processors.query.calcite.CancelTest;
-import org.apache.ignite.internal.processors.query.calcite.DataTypesTest;
-import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
-import org.apache.ignite.internal.processors.query.calcite.FunctionsTest;
-import org.apache.ignite.internal.processors.query.calcite.LimitOffsetTest;
-import org.apache.ignite.internal.processors.query.calcite.MetadataIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
-import org.apache.ignite.internal.processors.query.calcite.SortAggregateIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest;
-import org.apache.ignite.internal.processors.query.calcite.TableDdlIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
-import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest;
-import org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest;
-import org.apache.ignite.internal.processors.query.calcite.rules.ProjectScanMergeRuleTest;
 import org.apache.ignite.internal.processors.query.calcite.sql.SqlDdlParserTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
@@ -47,27 +31,12 @@ import org.junit.runners.Suite;
 @Suite.SuiteClasses({
     PlannerTestSuite.class,
     ExecutionTestSuite.class,
-    OrToUnionRuleTest.class,
-    ProjectScanMergeRuleTest.class,
+    IntegrationTestSuite.class,
+
     ClosableIteratorsHolderTest.class,
     ContinuousExecutionTest.class,
-    CalciteQueryProcessorTest.class,
-    CalciteErrorHandlilngIntegrationTest.class,
-    JdbcQueryTest.class,
-    CalciteBasicSecondaryIndexIntegrationTest.class,
-    CancelTest.class,
     QueryCheckerTest.class,
-    DateTimeTest.class,
-    DataTypesTest.class,
-    LimitOffsetTest.class,
-    SqlFieldsQueryUsageTest.class,
-    AggregatesIntegrationTest.class,
-    MetadataIntegrationTest.class,
-    SortAggregateIntegrationTest.class,
-
     SqlDdlParserTest.class,
-    TableDdlIntegrationTest.class,
-    FunctionsTest.class,
 })
 public class IgniteCalciteTestSuite {
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
similarity index 70%
copy from modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
copy to modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 93b2d3e..90c39e6 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -17,26 +17,23 @@
 
 package org.apache.ignite.testsuites;
 
-import org.apache.ignite.internal.processors.query.calcite.AggregatesIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondaryIndexIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.CalciteErrorHandlilngIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
 import org.apache.ignite.internal.processors.query.calcite.CancelTest;
 import org.apache.ignite.internal.processors.query.calcite.DataTypesTest;
 import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
 import org.apache.ignite.internal.processors.query.calcite.FunctionsTest;
 import org.apache.ignite.internal.processors.query.calcite.LimitOffsetTest;
-import org.apache.ignite.internal.processors.query.calcite.MetadataIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
-import org.apache.ignite.internal.processors.query.calcite.SortAggregateIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest;
-import org.apache.ignite.internal.processors.query.calcite.TableDdlIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.AggregatesIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.CalciteErrorHandlilngIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.SortAggregateIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.TableDdlIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.TableDmlIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest;
 import org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest;
 import org.apache.ignite.internal.processors.query.calcite.rules.ProjectScanMergeRuleTest;
-import org.apache.ignite.internal.processors.query.calcite.sql.SqlDdlParserTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -45,29 +42,23 @@ import org.junit.runners.Suite;
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-    PlannerTestSuite.class,
-    ExecutionTestSuite.class,
     OrToUnionRuleTest.class,
     ProjectScanMergeRuleTest.class,
-    ClosableIteratorsHolderTest.class,
-    ContinuousExecutionTest.class,
     CalciteQueryProcessorTest.class,
     CalciteErrorHandlilngIntegrationTest.class,
     JdbcQueryTest.class,
     CalciteBasicSecondaryIndexIntegrationTest.class,
     CancelTest.class,
-    QueryCheckerTest.class,
     DateTimeTest.class,
-    DataTypesTest.class,
     LimitOffsetTest.class,
     SqlFieldsQueryUsageTest.class,
     AggregatesIntegrationTest.class,
     MetadataIntegrationTest.class,
     SortAggregateIntegrationTest.class,
-
-    SqlDdlParserTest.class,
     TableDdlIntegrationTest.class,
     FunctionsTest.class,
+    TableDmlIntegrationTest.class,
+    DataTypesTest.class,
 })
-public class IgniteCalciteTestSuite {
+public class IntegrationTestSuite {
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index 6c8ac6e..47136f3 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocatio
 import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.SortedIndexSpoolPlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.TableDmlPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.TableFunctionTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
 import org.junit.runner.RunWith;
@@ -49,6 +50,7 @@ import org.junit.runners.Suite;
     JoinColocationPlannerTest.class,
     ExceptPlannerTest.class,
     TableFunctionTest.class,
+    TableDmlPlannerTest.class,
 })
 public class PlannerTestSuite {
 }
diff --git a/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow b/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow
index 58cef35..791e66c 100644
--- a/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow
+++ b/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow
@@ -126,3 +126,27 @@ SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FR
 ----
 2048	2048	10000	20480000.000000
 
+statement ok
+INSERT INTO bigtable SELECT * FROM bigtable
+
+query IIIR
+SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
+----
+4096	4096	10000	40960000.000000
+
+statement ok
+INSERT INTO bigtable SELECT * FROM bigtable
+
+query IIIR
+SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
+----
+8192	8192	10000	81920000.000000
+
+statement ok
+INSERT INTO bigtable SELECT * FROM bigtable
+
+query IIIR
+SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
+----
+16384	16384	10000	163840000.000000
+
diff --git a/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow_ignored b/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow_ignored
deleted file mode 100644
index 9cfeba6..0000000
--- a/modules/calcite/src/test/sql/types/string/test_scan_big_varchar.test_slow_ignored
+++ /dev/null
@@ -1,153 +0,0 @@
-# name: test/sql/types/string/test_scan_big_varchar.test_slow
-# description: Test scanning many big varchar strings with limited memory
-# group: [string]
-# Ignore: https://issues.apache.org/jira/browse/IGNITE-14553
-
-statement error
-PRAGMA memory_limit=100000000
-
-statement ok
-CREATE TABLE test (a VARCHAR);
-
-# create a big varchar (10K characters)
-statement ok
-INSERT INTO test VALUES ('aaaaaaaaaa')
-
-# sizes: 10, 100, 1000, 10000
-statement ok
-INSERT INTO test SELECT a||a||a||a||a||a||a||a||a||a FROM test WHERE CHARACTER_LENGTH(a)=(SELECT MAX(CHARACTER_LENGTH(a)) FROM test)
-
-statement ok
-INSERT INTO test SELECT a||a||a||a||a||a||a||a||a||a FROM test WHERE CHARACTER_LENGTH(a)=(SELECT MAX(CHARACTER_LENGTH(a)) FROM test)
-
-statement ok
-INSERT INTO test SELECT a||a||a||a||a||a||a||a||a||a FROM test WHERE CHARACTER_LENGTH(a)=(SELECT MAX(CHARACTER_LENGTH(a)) FROM test)
-
-# now create a second table, we only insert the big varchar string in there
-statement ok
-CREATE TABLE bigtable (a VARCHAR);
-
-statement ok
-INSERT INTO bigtable SELECT a FROM test WHERE CHARACTER_LENGTH(a)=(SELECT MAX(CHARACTER_LENGTH(a)) FROM test)
-
-# verify that the append worked
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-1	1	10000	10000.000000
-
-# we create a total of 16K entries in the big table
-# the total size of this table is 16K*10K = 160MB
-# we then scan the table at every step, as our buffer pool is limited to 100MB not all strings fit in memory
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-2	2	10000	20000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-4	4	10000	40000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-8	8	10000	80000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-16	16	10000	160000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-32	32	10000	320000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-64	64	10000	640000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-128	128	10000	1280000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-256	256	10000	2560000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-512	512	10000	5120000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-1024	1024	10000	10240000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(CHARACTER_LENGTH(a)), SUM(CHARACTER_LENGTH(a)) FROM bigtable
-----
-2048	2048	10000	20480000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable
-----
-4096	4096	10000	40960000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable
-----
-8192	8192	10000	81920000.000000
-
-statement ok
-INSERT INTO bigtable SELECT * FROM bigtable
-
-query IIIR
-SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable
-----
-16384	16384	10000	163840000.000000
-
diff --git a/parent/pom.xml b/parent/pom.xml
index 518c2d5..90de634 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -983,6 +983,8 @@
                                         <exclude>modules/platforms/python/requirements/**/*.txt</exclude><!--plain text can not be commented-->
                                         <!--Packaging -->
                                         <exclude>packaging/**</exclude>
+                                        <!-- Calcite test scripts -->
+                                        <exclude>src/test/sql/**</exclude>
                                         <!-- Ignite Documentation-->
                                         <exclude>docs/_site/**</exclude>
                                         <exclude>docs/assets/images/**</exclude>