You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2024/02/12 14:56:58 UTC

(ignite-3) branch main updated: IGNITE-21341 Sql. Optimize way to execute insert (#3188)

This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a88346ba07 IGNITE-21341 Sql. Optimize way to execute insert (#3188)
a88346ba07 is described below

commit a88346ba07df29bd4cbb3835b434d2183d74559c
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Mon Feb 12 16:56:52 2024 +0200

    IGNITE-21341 Sql. Optimize way to execute insert (#3188)
---
 .../internal/sql/engine/ItKeyValueGetTest.java     |  14 +-
 .../internal/sql/engine/ItKeyValuePutTest.java     | 143 ++++++++++++++++
 .../internal/sql/engine/ItSecondaryIndexTest.java  |   2 +-
 .../sql/engine/InternalSqlRowSingleLong.java       |  72 ++++++++
 .../internal/sql/engine/exec/ExecutablePlan.java   |  47 ++++++
 .../engine/exec/ExecutableTableRegistryImpl.java   |   2 +-
 .../sql/engine/exec/ExecutionServiceImpl.java      | 111 +++----------
 .../sql/engine/exec/LogicalRelImplementor.java     |  18 +-
 .../internal/sql/engine/exec/UpdatableTable.java   |  17 ++
 .../sql/engine/exec/UpdatableTableImpl.java        |  33 +++-
 .../sql/engine/exec/mapping/FragmentMapper.java    |  14 +-
 .../sql/engine/prepare/IgniteRelShuttle.java       |  14 ++
 .../sql/engine/prepare/KeyValueGetPlan.java        | 137 ++++++++++++++--
 .../sql/engine/prepare/KeyValueModifyPlan.java     | 146 +++++++++++++++++
 .../internal/sql/engine/prepare/PlannerHelper.java |  16 +-
 .../internal/sql/engine/prepare/PlannerPhase.java  |  15 ++
 .../sql/engine/prepare/PrepareServiceImpl.java     |  19 ++-
 .../internal/sql/engine/rel/IgniteKeyValueGet.java | 113 +++++++++++++
 .../sql/engine/rel/IgniteKeyValueModify.java       | 125 ++++++++++++++
 .../internal/sql/engine/rel/IgniteRelVisitor.java  |  10 ++
 .../engine/rule/TableModifyToKeyValuePutRule.java  | 142 ++++++++++++++++
 .../TableScanToKeyValueGetRule.java}               | 157 +++++++-----------
 .../sql/engine/exec/ExecutionServiceImplTest.java  |   2 +-
 .../internal/sql/engine/framework/TestNode.java    |   3 -
 .../sql/engine/planner/AbstractPlannerTest.java    |  11 +-
 .../sql/engine/planner/DmlPlannerTest.java         |  10 +-
 .../sql/engine/planner/DynamicParametersTest.java  |   7 +
 .../sql/engine/planner/ImplicitCastsTest.java      |   3 +-
 .../engine/planner/KeyValueModifyPlannerTest.java  | 181 +++++++++++++++++++++
 .../internal/sql/engine/util/StatementChecker.java |  18 +-
 .../sql-engine/src/test/resources/mapping/dml.test |   2 +-
 31 files changed, 1367 insertions(+), 237 deletions(-)

diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValueGetTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValueGetTest.java
index 9abf355103..70d27b1163 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValueGetTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValueGetTest.java
@@ -55,7 +55,7 @@ public class ItKeyValueGetTest extends BaseSqlIntegrationTest {
         int key = randomKey();
 
         assertQuery("SELECT * FROM simple_key WHERE id = ?")
-                .matches(containsSubPlan("IgnitePkLookup"))
+                .matches(containsSubPlan("IgniteKeyValueGet"))
                 .withParams(key)
                 .returns(key, key)
                 .check();
@@ -66,7 +66,7 @@ public class ItKeyValueGetTest extends BaseSqlIntegrationTest {
         int key = randomKey();
 
         assertQuery("SELECT * FROM complex_key_normal_order WHERE id1 = ? AND id2 = ?")
-                .matches(containsSubPlan("IgnitePkLookup"))
+                .matches(containsSubPlan("IgniteKeyValueGet"))
                 .withParams(key, 2 * key)
                 .returns(key, 2 * key, key)
                 .check();
@@ -78,7 +78,7 @@ public class ItKeyValueGetTest extends BaseSqlIntegrationTest {
         int key = randomKey();
 
         assertQuery("SELECT * FROM complex_key_revers_order WHERE id1 = ? AND id2 = ?")
-                .matches(containsSubPlan("IgnitePkLookup"))
+                .matches(containsSubPlan("IgniteKeyValueGet"))
                 .withParams(key, 2 * key)
                 .returns(key, 2 * key, key)
                 .check();
@@ -87,13 +87,13 @@ public class ItKeyValueGetTest extends BaseSqlIntegrationTest {
     @Test
     void lookupBySimpleKeyWithPostFiltration() {
         assertQuery("SELECT * FROM simple_key WHERE id = ? AND val > 5")
-                .matches(containsSubPlan("IgnitePkLookup"))
+                .matches(containsSubPlan("IgniteKeyValueGet"))
                 .withParams(1)
                 .returnNothing()
                 .check();
 
         assertQuery("SELECT * FROM simple_key WHERE id = ? AND val > 5")
-                .matches(containsSubPlan("IgnitePkLookup"))
+                .matches(containsSubPlan("IgniteKeyValueGet"))
                 .withParams(6)
                 .returns(6, 6)
                 .check();
@@ -104,13 +104,13 @@ public class ItKeyValueGetTest extends BaseSqlIntegrationTest {
         int key = randomKey();
 
         assertQuery("SELECT val FROM simple_key WHERE id = ?")
-                .matches(containsSubPlan("IgnitePkLookup"))
+                .matches(containsSubPlan("IgniteKeyValueGet"))
                 .withParams(key)
                 .returns(key)
                 .check();
 
         assertQuery("SELECT id, val * 10 FROM simple_key WHERE id = ?")
-                .matches(containsSubPlan("IgnitePkLookup"))
+                .matches(containsSubPlan("IgniteKeyValueGet"))
                 .withParams(key)
                 .returns(key, key * 10)
                 .check();
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValuePutTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValuePutTest.java
new file mode 100644
index 0000000000..c10bcf1a7b
--- /dev/null
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItKeyValuePutTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsSubPlan;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests to verify e2e cases of optimized insert.
+ */
+public class ItKeyValuePutTest extends BaseSqlIntegrationTest {
+    private static final int TABLE_SIZE = 10;
+
+    @BeforeAll
+    @SuppressWarnings({"ConcatenationWithEmptyString", "resource"})
+    static void initSchema() {
+        try (Session session = CLUSTER.aliveNode().sql().createSession()) {
+            session.executeScript(""
+                    + "CREATE TABLE simple_key (id INT PRIMARY KEY, val INT);"
+                    + "CREATE TABLE complex_key (id1 INT, id2 INT, val INT, PRIMARY KEY(id1, id2));"
+            );
+        }
+    }
+
+    @AfterEach
+    void clearTables() {
+        sql("DELETE FROM simple_key");
+        sql("DELETE FROM complex_key");
+    }
+
+    @Test
+    void insertConstantSimpleKey() {
+        for (int i = 0; i < TABLE_SIZE; i++) {
+            assertQuery(format("INSERT INTO simple_key VALUES ({}, {})", i, i))
+                    .matches(containsSubPlan("IgniteKeyValueModify"))
+                    .returns(1L)
+                    .check();
+        }
+
+        for (int i = 0; i < TABLE_SIZE; i++) {
+            assertQuery("SELECT * FROM simple_key WHERE id = ?")
+                    .withParams(i)
+                    .returns(i, i)
+                    .check();
+        }
+    }
+
+    @Test
+    void insertDynamicParamsSimpleKey() {
+        for (int i = 0; i < TABLE_SIZE; i++) {
+            assertQuery("INSERT INTO simple_key VALUES (?, ?)")
+                    .matches(containsSubPlan("IgniteKeyValueModify"))
+                    .withParams(i, i)
+                    .returns(1L)
+                    .check();
+        }
+
+        for (int i = 0; i < TABLE_SIZE; i++) {
+            assertQuery("SELECT * FROM simple_key WHERE id = ?")
+                    .withParams(i)
+                    .returns(i, i)
+                    .check();
+        }
+    }
+
+    @Test
+    void insertSimpleKeyWithCast() {
+        for (int i = 0; i < TABLE_SIZE; i++) {
+            assertQuery("INSERT INTO simple_key VALUES (?, ?)")
+                    .matches(containsSubPlan("IgniteKeyValueModify"))
+                    .withParams((byte) i, (byte) i)
+                    .returns(1L)
+                    .check();
+        }
+
+        for (int i = 0; i < TABLE_SIZE; i++) {
+            assertQuery("SELECT * FROM simple_key WHERE id = ?")
+                    .withParams(i)
+                    .returns(i, i)
+                    .check();
+        }
+    }
+
+    @Test
+    void insertComplexKey() {
+        for (int i = 0; i < TABLE_SIZE; i++) {
+            assertQuery("INSERT INTO complex_key VALUES (?, ?, ?)")
+                    .matches(containsSubPlan("IgniteKeyValueModify"))
+                    .withParams(i, 2 * i, i)
+                    .returns(1L)
+                    .check();
+        }
+
+        for (int i = 0; i < TABLE_SIZE; i++) {
+            assertQuery("SELECT * FROM complex_key WHERE id1 = ? AND id2 = ?")
+                    .withParams(i, 2 * i)
+                    .returns(i, 2 * i, i)
+                    .check();
+        }
+    }
+
+    @Test
+    @SuppressWarnings("ThrowableNotThrown")
+    void exceptionIsThrownOnKeyViolation() {
+        String insertStatement = "INSERT INTO simple_key VALUES (1, 1)";
+        sql(insertStatement);
+
+        assertThrows(
+                SqlException.class,
+                () -> sql(insertStatement),
+                "PK unique constraint is violated"
+        );
+
+        assertThrows(
+                SqlException.class,
+                () -> sql("INSERT INTO complex_key(id1, val) VALUES (1, 1)"),
+                "Column 'ID2' does not allow NULLs"
+        );
+    }
+}
diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
index 8144190cef..b26d95720e 100644
--- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
+++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
@@ -202,7 +202,7 @@ public class ItSecondaryIndexTest extends BaseSqlIntegrationTest {
     @Test
     public void testKeyEqualsFilter() {
         assertQuery("SELECT * FROM Developer WHERE id=2")
-                .matches(containsSubPlan("IgnitePkLookup(table=[[PUBLIC, DEVELOPER]]"))
+                .matches(containsSubPlan("IgniteKeyValueGet(table=[[PUBLIC, DEVELOPER]]"))
                 .returns(2, "Beethoven", 2, "Vienna", 44)
                 .check();
     }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InternalSqlRowSingleLong.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InternalSqlRowSingleLong.java
new file mode 100644
index 0000000000..3d798daf16
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InternalSqlRowSingleLong.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.schema.BinaryTuple;
+
+/**
+ * Implementation of {@code InternalSqlRow} allowing to represent a SQL row with a single long column.
+ */
+public class InternalSqlRowSingleLong implements InternalSqlRow {
+    private final long val;
+    private BinaryTuple row;
+
+    /**
+     * Constructor.
+     *
+     * @param val Value for single column row.
+     */
+    public InternalSqlRowSingleLong(long val) {
+        this.val = val;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Object get(int idx) {
+        assert idx == 0;
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int fieldCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public BinaryTuple asBinaryTuple() {
+        if (row == null) {
+            row = new BinaryTuple(1, new BinaryTupleBuilder(1, estimateSize(val)).appendLong(val).build());
+        }
+        return row;
+    }
+
+    private static int estimateSize(long value) {
+        if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
+            return Byte.BYTES;
+        } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
+            return Short.BYTES;
+        } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
+            return Integer.BYTES;
+        }
+
+        return Long.BYTES;
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutablePlan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutablePlan.java
new file mode 100644
index 0000000000..de68db0e9e
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutablePlan.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.AsyncCursor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Denotes a plan that can evaluates itself.
+ */
+@FunctionalInterface
+public interface ExecutablePlan {
+    /**
+     * Evaluates plan and returns cursor over result.
+     *
+     * @param ctx An execution context.
+     * @param tx A transaction to use to access the data.
+     * @param tableRegistry A registry to resolve executable table to evaluate the plan.
+     * @param firstPageReadyCallback A callback to notify when first page has been prefetched.
+     * @param <RowT> A type of the sql row.
+     * @return Cursor over result of the evaluation.
+     */
+    <RowT> AsyncCursor<InternalSqlRow> execute(
+            ExecutionContext<RowT> ctx,
+            InternalTransaction tx,
+            ExecutableTableRegistry tableRegistry,
+            @Nullable QueryPrefetchCallback firstPageReadyCallback
+    );
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
index 1492aa415d..8575412e39 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutableTableRegistryImpl.java
@@ -93,7 +93,7 @@ public class ExecutableTableRegistryImpl implements ExecutableTableRegistry {
                     TableRowConverter rowConverter = converterFactory.create(null);
 
                     UpdatableTableImpl updatableTable = new UpdatableTableImpl(sqlTable.id(), tableDescriptor, internalTable.partitions(),
-                            replicaService, clock, rowConverter);
+                            internalTable, replicaService, clock, rowConverter);
 
                     return new ExecutableTableImpl(scannableTable, updatableTable, sqlTable.partitionCalculator());
                 });
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index ccf603e995..4d68e0bf49 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -25,11 +25,11 @@ import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -52,14 +52,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
 import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.configuration.ConfigurationChangeException;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
@@ -76,7 +70,6 @@ import org.apache.ignite.internal.sql.engine.NodeLeftException;
 import org.apache.ignite.internal.sql.engine.QueryCancelledException;
 import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
 import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
@@ -85,7 +78,6 @@ import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
 import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.AsyncRootNode;
 import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
-import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.apache.ignite.internal.sql.engine.message.ErrorMessage;
 import org.apache.ignite.internal.sql.engine.message.MessageService;
 import org.apache.ignite.internal.sql.engine.message.QueryCloseMessage;
@@ -97,7 +89,6 @@ import org.apache.ignite.internal.sql.engine.prepare.DdlPlan;
 import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan;
 import org.apache.ignite.internal.sql.engine.prepare.Fragment;
 import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
-import org.apache.ignite.internal.sql.engine.prepare.KeyValueGetPlan;
 import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
 import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
@@ -138,6 +129,10 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
 
     private static final List<InternalSqlRow> NOT_APPLIED_ANSWER = List.of(new InternalSqlRowSingleBoolean(false));
 
+    private static final FragmentDescription DUMMY_DESCRIPTION = new FragmentDescription(
+            0, true, Long2ObjectMaps.emptyMap(), null, null
+    );
+
     private final MessageService messageService;
 
     private final TopologyService topSrvc;
@@ -304,10 +299,9 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
 
         switch (queryType) {
             case DML:
-                return executeQuery(tx, ctx, (MultiStepPlan) plan);
             case QUERY:
-                if (plan instanceof KeyValueGetPlan) {
-                    return executeKeyValueGetOperation(tx, ctx, (KeyValueGetPlan) plan, ctx.prefetchCallback());
+                if (plan instanceof ExecutablePlan) {
+                    return executeExecutablePlan(tx, ctx, (ExecutablePlan) plan, ctx.prefetchCallback());
                 }
 
                 assert plan instanceof MultiStepPlan : plan.getClass();
@@ -334,84 +328,24 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
         return mgr.close(true);
     }
 
-    private AsyncCursor<InternalSqlRow> executeKeyValueGetOperation(
+    private AsyncCursor<InternalSqlRow> executeExecutablePlan(
             InternalTransaction tx,
             BaseQueryContext ctx,
-            KeyValueGetPlan plan,
+            ExecutablePlan plan,
             @Nullable QueryPrefetchCallback callback
     ) {
-        IgniteTable sqlTable = plan.table();
-        CompletableFuture<Iterator<InternalSqlRow>> result = tableRegistry.getTable(ctx.schemaVersion(), sqlTable.id())
-                .thenCompose(execTable -> {
-                    ExecutionContext<RowT> ectx = new ExecutionContext<>(
-                            taskExecutor,
-                            ctx.queryId(),
-                            localNode,
-                            localNode.name(),
-                            null,
-                            handler,
-                            Commons.parametersMap(ctx.parameters()),
-                            TxAttributes.fromTx(tx)
-                    );
-
-                    ImmutableBitSet requiredColumns = plan.lookupNode().requiredColumns();
-                    RexNode filterExpr = plan.lookupNode().condition();
-                    List<RexNode> projectionExpr = plan.lookupNode().projects();
-                    List<RexNode> keyExpressions = plan.lookupNode().keyExpressions();
-
-                    RelDataType rowType = sqlTable.getRowType(Commons.typeFactory(), requiredColumns);
-
-                    Supplier<RowT> keySupplier = ectx.expressionFactory()
-                            .rowSource(keyExpressions);
-                    Predicate<RowT> filter = filterExpr == null ? null : ectx.expressionFactory()
-                            .predicate(filterExpr, rowType);
-                    Function<RowT, RowT> projection = projectionExpr == null ? null : ectx.expressionFactory()
-                            .project(projectionExpr, rowType);
-
-                    RowHandler<RowT> rowHandler = ectx.rowHandler();
-                    RowSchema rowSchema = TypeUtils.rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
-                    RowFactory<RowT> rowFactory = rowHandler.factory(rowSchema);
-
-                    RelDataType resultType = plan.lookupNode().getRowType();
-                    BiFunction<Integer, Object, Object> internalTypeConverter = TypeUtils.resultTypeConverter(ectx, resultType);
-
-                    ScannableTable scannableTable = execTable.scannableTable();
-                    Function<RowT, Iterator<InternalSqlRow>> postProcess = row -> {
-                        if (row == null) {
-                            return Collections.emptyIterator();
-                        }
-
-                        if (filter != null && !filter.test(row)) {
-                            return Collections.emptyIterator();
-                        }
-
-                        if (projection != null) {
-                            row = projection.apply(row);
-                        }
-
-                        return List.<InternalSqlRow>of(
-                                new InternalSqlRowImpl<>(row, rowHandler, internalTypeConverter)
-                        ).iterator();
-                    };
-
-                    CompletableFuture<RowT> lookupResult = scannableTable.primaryKeyLookup(
-                            ectx, tx, rowFactory, keySupplier.get(), requiredColumns.toBitSet()
-                    );
-
-                    if (projection == null && filter == null) {
-                        // no arbitrary computations, should be safe to proceed execution on
-                        // thread that completes the future
-                        return lookupResult.thenApply(postProcess);
-                    } else {
-                        return lookupResult.thenApplyAsync(postProcess, taskExecutor);
-                    }
-                });
-
-        if (callback != null) {
-            result.whenCompleteAsync((res, err) -> callback.onPrefetchComplete(err), taskExecutor);
-        }
+        ExecutionContext<RowT> ectx = new ExecutionContext<>(
+                taskExecutor,
+                ctx.queryId(),
+                localNode,
+                localNode.name(),
+                DUMMY_DESCRIPTION,
+                handler,
+                Commons.parametersMap(ctx.parameters()),
+                TxAttributes.fromTx(tx)
+        );
 
-        return new AsyncWrapper<>(result, Runnable::run);
+        return plan.execute(ectx, tx, tableRegistry, callback);
     }
 
     private AsyncCursor<InternalSqlRow> executeDdl(DdlPlan plan, @Nullable QueryPrefetchCallback callback) {
@@ -1075,6 +1009,11 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve
         }
 
         private void enlistPartitions(MappedFragment mappedFragment, InternalTransaction tx) {
+            // no need to traverse the tree if fragment has no tables
+            if (mappedFragment.fragment().tables().isEmpty()) {
+                return;
+            }
+
             new IgniteRelShuttle() {
                 @Override
                 public IgniteRel visit(IgniteIndexScan rel) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 520224b86e..2fcf7182b0 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -82,6 +82,8 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
 import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
 import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteNestedLoopJoin;
@@ -207,7 +209,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
     public Node<RowT> visit(IgniteTrimExchange rel) {
         assert TraitUtils.distribution(rel).getType() == HASH_DISTRIBUTED;
 
-        ColocationGroup targetGroup = ctx.target();
+        ColocationGroup targetGroup = ctx.group(rel.sourceId());
 
         assert targetGroup != null;
 
@@ -877,7 +879,19 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
     /** {@inheritDoc} */
     @Override
     public Node<RowT> visit(IgniteExchange rel) {
-        throw new AssertionError();
+        throw new AssertionError(rel.getClass());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Node<RowT> visit(IgniteKeyValueGet rel) {
+        throw new AssertionError(rel.getClass());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Node<RowT> visit(IgniteKeyValueModify rel) {
+        throw new AssertionError(rel.getClass());
     }
 
     private Node<RowT> visit(RelNode rel) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTable.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTable.java
index 7e021c096a..f7e49b4a4e 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTable.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTable.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.exec.rel.ModifyNode;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.tx.InternalTransaction;
 
 /**
  * The interface describe a table that could be updated by {@link ModifyNode}.
@@ -50,6 +51,22 @@ public interface UpdatableTable {
             ColocationGroup colocationGroup
     );
 
+    /**
+     * Insert given row into the table.
+     *
+     * <p>This method accepts instance of the transaction, thus MUST be issued on initiator node.
+     *
+     * @param tx A transaction within which the insert is issued.
+     * @param ectx An execution context. Used mainly to acquire {@link RowHandler}.
+     * @param row A row to insert.
+     * @param <RowT> A type of sql row.
+     * @return Future representing result of operation. Future will be completed successfully
+     *      iif row has been inserted, will be completed exceptionally otherwise.
+     */
+    <RowT> CompletableFuture<Void> insert(
+            InternalTransaction tx, ExecutionContext<RowT> ectx, RowT row
+    );
+
     /**
      * Updates rows if they are exists, inserts the rows otherwise.
      *
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index 58c112d939..8a508b490e 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -42,11 +42,13 @@ import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.RowBatch;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.sql.SqlException;
 
@@ -65,26 +67,26 @@ public final class UpdatableTableImpl implements UpdatableTable {
 
     private final HybridClock clock;
 
+    private final InternalTable table;
+
     private final ReplicaService replicaService;
 
     private final PartitionExtractor partitionExtractor;
 
     private final TableRowConverter rowConverter;
 
-    /**
-     * Constructor.
-     *
-     * @param desc Table descriptor.
-     */
-    public UpdatableTableImpl(
+    /** Constructor. */
+    UpdatableTableImpl(
             int tableId,
             TableDescriptor desc,
             int partitions,
+            InternalTable table,
             ReplicaService replicaService,
             HybridClock clock,
             TableRowConverter rowConverter
     ) {
         this.tableId = tableId;
+        this.table = table;
         this.desc = desc;
         this.replicaService = replicaService;
         this.clock = clock;
@@ -112,6 +114,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
             rowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new ArrayList<>()).add(binaryRow);
         }
 
+        @SuppressWarnings("unchecked")
         CompletableFuture<List<RowT>>[] futures = new CompletableFuture[rowsByPartition.size()];
 
         int batchNum = 0;
@@ -172,6 +175,23 @@ public final class UpdatableTableImpl implements UpdatableTable {
         return desc;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public <RowT> CompletableFuture<Void> insert(InternalTransaction tx, ExecutionContext<RowT> ectx, RowT row) {
+        BinaryRowEx tableRow = rowConverter.toBinaryRow(ectx, row, false);
+
+        return table.insert(tableRow, tx)
+                .thenApply(success -> {
+                    if (success) {
+                        return null;
+                    }
+
+                    RowHandler<RowT> rowHandler = ectx.rowHandler();
+
+                    throw conflictKeysException(List.of(rowHandler.toString(row)));
+                });
+    }
+
     /** {@inheritDoc} */
     @Override
     public <RowT> CompletableFuture<?> insertAll(
@@ -253,6 +273,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
             keyRowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new ArrayList<>()).add(binaryRow);
         }
 
+        @SuppressWarnings("unchecked")
         CompletableFuture<List<RowT>>[] futures = new CompletableFuture[keyRowsByPartition.size()];
 
         int batchNum = 0;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
index f42e56ef25..d9d6129a7b 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
@@ -37,6 +37,8 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
 import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
 import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteNestedLoopJoin;
@@ -183,7 +185,17 @@ class FragmentMapper {
 
         @Override
         public Mapping visit(IgniteExchange rel) {
-            throw new AssertionError("Unexpected call: " + rel);
+            throw new AssertionError(rel.getClass());
+        }
+
+        @Override
+        public Mapping visit(IgniteKeyValueGet rel) {
+            throw new AssertionError(rel.getClass());
+        }
+
+        @Override
+        public Mapping visit(IgniteKeyValueModify rel) {
+            throw new AssertionError(rel.getClass());
         }
 
         @Override
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java
index 8ff0c846f6..6a980e9332 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteRelShuttle.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteExchange;
 import org.apache.ignite.internal.sql.engine.rel.IgniteFilter;
 import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
 import org.apache.ignite.internal.sql.engine.rel.IgniteLimit;
 import org.apache.ignite.internal.sql.engine.rel.IgniteMergeJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteNestedLoopJoin;
@@ -103,6 +105,18 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
         return processNode(rel);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public IgniteRel visit(IgniteKeyValueGet rel) {
+        return processNode(rel);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteRel visit(IgniteKeyValueModify rel) {
+        return processNode(rel);
+    }
+
     /** {@inheritDoc} */
     @Override
     public IgniteRel visit(IgniteColocatedHashAggregate rel) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
index 891cc93186..e61f437255 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueGetPlan.java
@@ -17,31 +17,66 @@
 
 package org.apache.ignite.internal.sql.engine.prepare;
 
-import java.util.concurrent.ExecutorService;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
 import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.InternalSqlRowImpl;
+import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
-import org.apache.ignite.internal.sql.engine.rel.IgnitePkLookup;
+import org.apache.ignite.internal.sql.engine.exec.ExecutablePlan;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.util.Cloner;
 import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.AsyncCursor;
+import org.apache.ignite.internal.util.AsyncWrapper;
 import org.apache.ignite.sql.ResultSetMetadata;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Plan representing single lookup by a primary key.
- *
- * <p>Note: since {@link IgnitePkLookup} is not supposed to be a part of distributed
- * query plan, we need a new object to be able to handle it differently by {@link ExecutorService}.
  */
-public class KeyValueGetPlan implements ExplainablePlan {
+public class KeyValueGetPlan implements ExplainablePlan, ExecutablePlan {
+    private static final IgniteLogger LOG = Loggers.forClass(KeyValueGetPlan.class);
+
     private final PlanId id;
-    private final IgnitePkLookup lookupNode;
+    private final int schemaVersion;
+    private final IgniteKeyValueGet lookupNode;
     private final ResultSetMetadata meta;
     private final ParameterMetadata parameterMetadata;
 
-    KeyValueGetPlan(PlanId id, IgnitePkLookup lookupNode, ResultSetMetadata meta, ParameterMetadata parameterMetadata) {
+    KeyValueGetPlan(
+            PlanId id,
+            int schemaVersion,
+            IgniteKeyValueGet lookupNode,
+            ResultSetMetadata meta,
+            ParameterMetadata parameterMetadata
+    ) {
         this.id = id;
+        this.schemaVersion = schemaVersion;
         this.lookupNode = lookupNode;
         this.meta = meta;
         this.parameterMetadata = parameterMetadata;
@@ -72,7 +107,7 @@ public class KeyValueGetPlan implements ExplainablePlan {
     }
 
     /** Returns a table in question. */
-    public IgniteTable table() {
+    private IgniteTable table() {
         IgniteTable table = lookupNode.getTable().unwrap(IgniteTable.class);
 
         assert table != null : lookupNode.getTable();
@@ -87,7 +122,89 @@ public class KeyValueGetPlan implements ExplainablePlan {
         return RelOptUtil.toString(clonedRoot, SqlExplainLevel.ALL_ATTRIBUTES);
     }
 
-    public IgnitePkLookup lookupNode() {
+    public IgniteKeyValueGet lookupNode() {
         return lookupNode;
     }
+
+    @Override
+    public <RowT> AsyncCursor<InternalSqlRow> execute(
+            ExecutionContext<RowT> ctx,
+            InternalTransaction tx,
+            ExecutableTableRegistry tableRegistry,
+            @Nullable QueryPrefetchCallback firstPageReadyCallback
+    ) {
+        IgniteTable sqlTable = table();
+
+        CompletableFuture<Iterator<InternalSqlRow>> result = tableRegistry.getTable(schemaVersion, sqlTable.id())
+                .thenCompose(execTable -> {
+
+                    ImmutableBitSet requiredColumns = lookupNode.requiredColumns();
+                    RexNode filterExpr = lookupNode.condition();
+                    List<RexNode> projectionExpr = lookupNode.projects();
+                    List<RexNode> keyExpressions = lookupNode.keyExpressions();
+
+                    RelDataType rowType = sqlTable.getRowType(Commons.typeFactory(), requiredColumns);
+
+                    Supplier<RowT> keySupplier = ctx.expressionFactory()
+                            .rowSource(keyExpressions);
+                    Predicate<RowT> filter = filterExpr == null ? null : ctx.expressionFactory()
+                            .predicate(filterExpr, rowType);
+                    Function<RowT, RowT> projection = projectionExpr == null ? null : ctx.expressionFactory()
+                            .project(projectionExpr, rowType);
+
+                    RowHandler<RowT> rowHandler = ctx.rowHandler();
+                    RowSchema rowSchema = TypeUtils.rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
+                    RowFactory<RowT> rowFactory = rowHandler.factory(rowSchema);
+
+                    RelDataType resultType = lookupNode.getRowType();
+                    BiFunction<Integer, Object, Object> internalTypeConverter = TypeUtils.resultTypeConverter(ctx, resultType);
+
+                    ScannableTable scannableTable = execTable.scannableTable();
+                    Function<RowT, Iterator<InternalSqlRow>> postProcess = row -> {
+                        if (row == null) {
+                            return Collections.emptyIterator();
+                        }
+
+                        if (filter != null && !filter.test(row)) {
+                            return Collections.emptyIterator();
+                        }
+
+                        if (projection != null) {
+                            row = projection.apply(row);
+                        }
+
+                        return List.<InternalSqlRow>of(
+                                new InternalSqlRowImpl<>(row, rowHandler, internalTypeConverter)
+                        ).iterator();
+                    };
+
+                    CompletableFuture<RowT> lookupResult = scannableTable.primaryKeyLookup(
+                            ctx, tx, rowFactory, keySupplier.get(), requiredColumns.toBitSet()
+                    );
+
+                    if (projection == null && filter == null) {
+                        // no arbitrary computations, should be safe to proceed execution on
+                        // thread that completes the future
+                        return lookupResult.thenApply(postProcess);
+                    } else {
+                        Executor executor = task -> ctx.execute(task::run, error -> {
+                            // this executor is used to process future chain, so any unhandled exception
+                            // should be wrapped with CompletionException and returned as a result, implying
+                            // no error handler should be called.
+                            // But just in case there is error in future processing pipeline let's log error
+                            LOG.error("Unexpected error", error);
+                        });
+
+                        return lookupResult.thenApplyAsync(postProcess, executor);
+                    }
+                });
+
+        if (firstPageReadyCallback != null) {
+            Executor executor = task -> ctx.execute(task::run, firstPageReadyCallback::onPrefetchComplete);
+
+            result.whenCompleteAsync((res, err) -> firstPageReadyCallback.onPrefetchComplete(err), executor);
+        }
+
+        return new AsyncWrapper<>(result, Runnable::run);
+    }
 }
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
new file mode 100644
index 0000000000..0a6df52446
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.prepare;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.ignite.internal.sql.engine.InternalSqlRow;
+import org.apache.ignite.internal.sql.engine.InternalSqlRowSingleLong;
+import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.exec.ExecutablePlan;
+import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.UpdatableTable;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
+import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.sql.engine.util.Cloner;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.AsyncCursor;
+import org.apache.ignite.internal.util.AsyncWrapper;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Plan representing simple modify operation that can be executed by Key-Value API.
+ */
+public class KeyValueModifyPlan implements ExplainablePlan, ExecutablePlan {
+    private final PlanId id;
+    private final int schemaVersion;
+    private final IgniteKeyValueModify modifyNode;
+    private final ResultSetMetadata meta;
+    private final ParameterMetadata parameterMetadata;
+
+    KeyValueModifyPlan(
+            PlanId id,
+            int schemaVersion,
+            IgniteKeyValueModify modifyNode,
+            ResultSetMetadata meta,
+            ParameterMetadata parameterMetadata
+    ) {
+        this.id = id;
+        this.schemaVersion = schemaVersion;
+        this.modifyNode = modifyNode;
+        this.meta = meta;
+        this.parameterMetadata = parameterMetadata;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public PlanId id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public SqlQueryType type() {
+        return SqlQueryType.DML;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ResultSetMetadata metadata() {
+        return meta;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ParameterMetadata parameterMetadata() {
+        return parameterMetadata;
+    }
+
+    /** Returns a table in question. */
+    private IgniteTable table() {
+        IgniteTable table = modifyNode.getTable().unwrap(IgniteTable.class);
+
+        assert table != null : modifyNode.getTable();
+
+        return table;
+    }
+
+    @Override
+    public String explain() {
+        IgniteRel clonedRoot = Cloner.clone(modifyNode, Commons.cluster());
+
+        return RelOptUtil.toString(clonedRoot, SqlExplainLevel.ALL_ATTRIBUTES);
+    }
+
+    public IgniteKeyValueModify modifyNode() {
+        return modifyNode;
+    }
+
+    @Override
+    public <RowT> AsyncCursor<InternalSqlRow> execute(
+            ExecutionContext<RowT> ctx,
+            InternalTransaction tx,
+            ExecutableTableRegistry tableRegistry,
+            @Nullable QueryPrefetchCallback firstPageReadyCallback
+    ) {
+        IgniteTable sqlTable = table();
+
+        CompletableFuture<Iterator<InternalSqlRow>> result = tableRegistry.getTable(schemaVersion, sqlTable.id())
+                .thenCompose(execTable -> {
+                    List<RexNode> expressions = modifyNode.expressions();
+
+                    Supplier<RowT> rowSupplier = ctx.expressionFactory()
+                            .rowSource(expressions);
+
+                    UpdatableTable updatableTable = execTable.updatableTable();
+
+                    return updatableTable.insert(
+                            tx, ctx, rowSupplier.get()
+                    ).thenApply(none -> List.<InternalSqlRow>of(new InternalSqlRowSingleLong(1L)).iterator());
+                });
+
+        if (firstPageReadyCallback != null) {
+            Executor executor = task -> ctx.execute(task::run, firstPageReadyCallback::onPrefetchComplete);
+
+            result.whenCompleteAsync((res, err) -> firstPageReadyCallback.onPrefetchComplete(err), executor);
+        }
+
+        return new AsyncWrapper<>(result, Runnable::run);
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
index 1fb987d335..728e4b12f3 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerHelper.java
@@ -41,10 +41,8 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.hint.Hints;
 import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
-import org.apache.ignite.internal.sql.engine.rel.IgnitePkLookup;
 import org.apache.ignite.internal.sql.engine.rel.IgniteProject;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
-import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 
 /**
@@ -118,13 +116,15 @@ public final class PlannerHelper {
 
             rel = planner.transform(PlannerPhase.HEP_PROJECT_PUSH_DOWN, rel.getTraitSet(), rel);
 
-            // if after all operators being pushed down an entire tree was collapsed to a single node,
-            // then, probably, we may replaced it with optimized lookup by a primary key
-            if (rel instanceof IgniteLogicalTableScan) {
-                IgniteRel igniteRel = IgnitePkLookup.convert((IgniteLogicalTableScan) rel);
+            {
+                // the sole purpose of this code block is to limit scope of `simpleOperation` variable.
+                // The result of `HEP_TO_SIMPLE_KEY_VALUE_OPERATION` phase MUST NOT be passed to next stage,
+                // thus if result meets our expectation, then return the result, otherwise discard it and
+                // proceed with regular flow
+                RelNode simpleOperation = planner.transform(PlannerPhase.HEP_TO_SIMPLE_KEY_VALUE_OPERATION, rel.getTraitSet(), rel);
 
-                if (igniteRel != null) {
-                    return igniteRel;
+                if (simpleOperation instanceof IgniteRel) {
+                    return (IgniteRel) simpleOperation;
                 }
             }
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index 862d05af4c..120a12014b 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -59,6 +59,8 @@ import org.apache.ignite.internal.sql.engine.rule.SortAggregateConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.SortConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.TableFunctionScanConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.TableModifyConverterRule;
+import org.apache.ignite.internal.sql.engine.rule.TableModifyToKeyValuePutRule;
+import org.apache.ignite.internal.sql.engine.rule.TableScanToKeyValueGetRule;
 import org.apache.ignite.internal.sql.engine.rule.UnionConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.ValuesConverterRule;
 import org.apache.ignite.internal.sql.engine.rule.logical.ExposeIndexRule;
@@ -83,6 +85,19 @@ public enum PlannerPhase {
         }
     },
 
+    HEP_TO_SIMPLE_KEY_VALUE_OPERATION(
+            "Heuristic phase to convert relational tree to simple Key-Value operation",
+            TableScanToKeyValueGetRule.INSTANCE,
+            TableModifyToKeyValuePutRule.PROJECT,
+            TableModifyToKeyValuePutRule.VALUES
+    ) {
+        /** {@inheritDoc} */
+        @Override
+        public Program getProgram(PlanningContext ctx) {
+            return hep(getRules(ctx));
+        }
+    },
+
     HEP_FILTER_PUSH_DOWN(
             "Heuristic phase to push down filters",
             FilterScanMergeRule.TABLE_SCAN_SKIP_CORRELATED,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
index 0909ad6ff0..b0374357e5 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java
@@ -52,7 +52,8 @@ import org.apache.ignite.internal.sql.configuration.distributed.SqlDistributedCo
 import org.apache.ignite.internal.sql.configuration.local.SqlLocalConfiguration;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlSqlToCommandConverter;
-import org.apache.ignite.internal.sql.engine.rel.IgnitePkLookup;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
 import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
@@ -340,8 +341,12 @@ public class PrepareServiceImpl implements PrepareService {
 
                 ResultSetMetadata resultSetMetadata = resultSetMetadata(validated.dataType(), validated.origins(), validated.aliases());
 
-                if (clonedTree instanceof IgnitePkLookup) {
-                    return new KeyValueGetPlan(nextPlanId(), (IgnitePkLookup) clonedTree, resultSetMetadata, parameterMetadata);
+                if (clonedTree instanceof IgniteKeyValueGet) {
+                    int schemaVersion = ctx.unwrap(BaseQueryContext.class).schemaVersion();
+
+                    return new KeyValueGetPlan(
+                            nextPlanId(), schemaVersion, (IgniteKeyValueGet) clonedTree, resultSetMetadata, parameterMetadata
+                    );
                 }
 
                 return new MultiStepPlan(nextPlanId(), SqlQueryType.QUERY, clonedTree, resultSetMetadata, parameterMetadata);
@@ -399,6 +404,14 @@ public class PrepareServiceImpl implements PrepareService {
                 // before storing tree in plan cache
                 IgniteRel clonedTree = Cloner.clone(igniteRel, Commons.emptyCluster());
 
+                if (clonedTree instanceof IgniteKeyValueModify) {
+                    int schemaVersion = ctx.unwrap(BaseQueryContext.class).schemaVersion();
+
+                    return new KeyValueModifyPlan(
+                            nextPlanId(), schemaVersion, (IgniteKeyValueModify) clonedTree, DML_METADATA, parameterMetadata
+                    );
+                }
+
                 return new MultiStepPlan(nextPlanId(), SqlQueryType.DML, clonedTree, DML_METADATA, parameterMetadata);
             }, planningPool));
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueGet.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueGet.java
new file mode 100644
index 0000000000..0f300fcd8a
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueGet.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rel;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Relational operator that represents lookup by a primary key.
+ *
+ * <p>Note: at the moment, KV api requires an actual transaction which is object
+ * of type {@link InternalTransaction} while distributed execution has access to
+ * only certain {@link TxAttributes attributes} of a transaction. Given that node is
+ * not supposed to be a part of distributed query plan, the following parts were
+ * deliberately omitted:<ul>
+ *     <li>this class doesn't implement {@link SourceAwareIgniteRel}, making it impossible
+ *     to map properly by {@link MappingService}</li>
+ *     <li>de-serialisation constructor is omitted (see {@link ProjectableFilterableTableScan#ProjectableFilterableTableScan(RelInput)}
+ *     as example)</li>
+ * </ul>
+ */
+public class IgniteKeyValueGet extends ProjectableFilterableTableScan implements IgniteRel {
+    private final List<RexNode> keyExpressions;
+
+    /**
+     * Constructor.
+     *
+     * @param cluster A cluster this relation belongs to.
+     * @param traits A set of traits this node satisfies.
+     * @param table A source table.
+     * @param hints List of hints related to a relational node.
+     * @param keyExpressions List of expressions representing primary key.
+     * @param proj Optional projection to transform the row.
+     * @param cond Optional condition to do post-filtration.
+     * @param requiredColumns Optional set required fields to do trimming unused columns.
+     */
+    public IgniteKeyValueGet(
+            RelOptCluster cluster,
+            RelTraitSet traits,
+            RelOptTable table,
+            List<RelHint> hints,
+            List<RexNode> keyExpressions,
+            @Nullable List<RexNode> proj,
+            @Nullable RexNode cond,
+            @Nullable ImmutableBitSet requiredColumns
+    ) {
+        super(cluster, traits, hints, table, proj, cond, requiredColumns);
+
+        this.keyExpressions = keyExpressions;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteKeyValueGet(cluster, getTraitSet(), getTable(), getHints(), keyExpressions, projects, condition, requiredColumns);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteKeyValueGet withHints(List<RelHint> hintList) {
+        return new IgniteKeyValueGet(
+                getCluster(), getTraitSet(), getTable(), hintList, keyExpressions, projects, condition, requiredColumns
+        );
+    }
+
+    @Override
+    protected RelWriter explainTerms0(RelWriter pw) {
+        return super.explainTerms0(pw)
+                .item("key", keyExpressions);
+    }
+
+    /**
+     * Returns a list of expressions in the order of a primary key columns of related table
+     * to use as lookup key.
+     *
+     * @return List of expressions representing lookup key.
+     */
+    public List<RexNode> keyExpressions() {
+        return keyExpressions;
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueModify.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueModify.java
new file mode 100644
index 0000000000..57d991e15a
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteKeyValueModify.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rel;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
+import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
+import org.apache.ignite.internal.tx.InternalTransaction;
+
+/**
+ * Relational operator that represents simple modification that can be transacted
+ * to Key-Value operation.
+ *
+ * <p>Note: at the moment, KV api requires an actual transaction which is object
+ * of type {@link InternalTransaction} while distributed execution has access to
+ * only certain {@link TxAttributes attributes} of a transaction. Given that node is
+ * not supposed to be a part of distributed query plan, the following parts were
+ * deliberately omitted:<ul>
+ *     <li>this class doesn't implement {@link SourceAwareIgniteRel}, making it impossible
+ *     to map properly by {@link MappingService}</li>
+ *     <li>de-serialisation constructor is omitted (see {@link IgniteTableModify#IgniteTableModify(RelInput)}
+ *     as example)</li>
+ * </ul>
+ */
+public class IgniteKeyValueModify extends AbstractRelNode implements IgniteRel {
+    /** Enumeration of supported modification operations. */
+    public enum Operation {
+        PUT
+    }
+
+    private final RelOptTable table;
+    private final Operation operation;
+    private final List<RexNode> expressions;
+
+    /**
+     * Constructor.
+     *
+     * @param cluster A cluster this relation belongs to.
+     * @param traits A set of traits this node satisfies.
+     * @param table A target table.
+     * @param operation Type of the modification operation.
+     * @param expressions List of expressions representing either full row or only a key
+     *      depending on particular operation.
+     */
+    public IgniteKeyValueModify(
+            RelOptCluster cluster,
+            RelTraitSet traits,
+            RelOptTable table,
+            Operation operation,
+            List<RexNode> expressions
+    ) {
+        super(cluster, traits);
+
+        this.table = table;
+        this.operation = operation;
+        this.expressions = expressions;
+    }
+
+    @Override public RelDataType deriveRowType() {
+        return RelOptUtil.createDmlRowType(
+                SqlKind.INSERT, getCluster().getTypeFactory());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public RelOptTable getTable() {
+        return table;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        assert inputs.isEmpty() : inputs;
+
+        return new IgniteKeyValueModify(cluster, getTraitSet(), table, operation, expressions);
+    }
+
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return super.explainTerms(pw)
+                .item("table", table.getQualifiedName())
+                .item("operation", operation)
+                .item("expressions", expressions);
+    }
+
+    /**
+     * Returns a list of expressions representing either full row or only a key
+     * depending on particular operation.
+     *
+     * @return List of expressions.
+     */
+    public List<RexNode> expressions() {
+        return expressions;
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java
index 3bf9f9fafe..ddaf73d6fb 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteRelVisitor.java
@@ -169,6 +169,16 @@ public interface IgniteRelVisitor<T> {
      */
     T visit(IgniteTableFunctionScan rel);
 
+    /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}.
+     */
+    T visit(IgniteKeyValueGet rel);
+
+    /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}.
+     */
+    T visit(IgniteKeyValueModify rel);
+
     /**
      * Visits a relational node and calculates a result on the basis of node meta information.
      *
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValuePutRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValuePutRule.java
new file mode 100644
index 0000000000..b19e25eb97
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableModifyToKeyValuePutRule.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.rule;
+
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify.Operation;
+import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.immutables.value.Value;
+
+/**
+ * Rule that converts {@link TableModify} representing INSERT operation with a determined source
+ * to a Key-Value PUT operation.
+ *
+ * <p>Note: at the moment, this rule support only single row insert.
+ */
+@Value.Enclosing
+public class TableModifyToKeyValuePutRule extends RelRule<TableModifyToKeyValuePutRule.Config> {
+    public static final RelOptRule VALUES = Config.VALUES.toRule();
+    public static final RelOptRule PROJECT = Config.PROJECT.toRule();
+
+    private TableModifyToKeyValuePutRule(Config cfg) {
+        super(cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        List<RelNode> operands = call.getRelList();
+
+        TableModify modify = cast(operands.get(0));
+
+        assert modify.getOperation() == TableModify.Operation.INSERT : modify.getOperation();
+
+        List<RexNode> expressions;
+        if (operands.size() == 2) {
+            Values values = cast(operands.get(1));
+
+            assert values.getTuples().size() == 1 : "Expected exactly one tuple, but was " + values.getTuples().size();
+
+            expressions = List.copyOf(values.getTuples().get(0));
+        } else {
+            assert operands.size() == 3 : operands;
+
+            Values values = cast(operands.get(2));
+
+            assert values.getTuples().size() == 1 : "Expected exactly one tuple, but was " + values.getTuples().size();
+
+            List<RexNode> inputExpressions = List.copyOf(values.getTuples().get(0));
+
+            RexVisitor<RexNode> inputInliner = new RexShuttle() {
+                @Override
+                public RexNode visitInputRef(RexInputRef inputRef) {
+                    return inputExpressions.get(inputRef.getIndex());
+                }
+            };
+
+            Project project = cast(operands.get(1));
+
+            expressions = inputInliner.visitList(project.getProjects());
+        }
+
+        call.transformTo(
+                new IgniteKeyValueModify(
+                        modify.getCluster(),
+                        modify.getTraitSet()
+                                .replace(IgniteConvention.INSTANCE)
+                                .replace(IgniteDistributions.single()),
+                        modify.getTable(),
+                        Operation.PUT,
+                        expressions
+                )
+        );
+    }
+
+    private static <T extends RelNode> T cast(RelNode node) {
+        return (T) node;
+    }
+
+    /**
+     * Configuration.
+     */
+    @SuppressWarnings({"ClassNameSameAsAncestorName", "InnerClassFieldHidesOuterClassField"})
+    @Value.Immutable
+    public interface Config extends RelRule.Config {
+        Config VALUES = ImmutableTableModifyToKeyValuePutRule.Config.of()
+                .withDescription("TableModifyToKeyValuePutRule:VALUES")
+                .withOperandSupplier(o0 ->
+                        o0.operand(TableModify.class)
+                                .predicate(TableModify::isInsert)
+                                .oneInput(o1 ->
+                                        o1.operand(Values.class)
+                                                .predicate(values -> values.getTuples().size() == 1)
+                                                .noInputs()))
+                .as(Config.class);
+
+        Config PROJECT = ImmutableTableModifyToKeyValuePutRule.Config.of()
+                .withDescription("TableModifyToKeyValuePutRule:PROJECT")
+                .withOperandSupplier(o0 ->
+                        o0.operand(TableModify.class)
+                                .predicate(TableModify::isInsert)
+                                .oneInput(o1 ->
+                                        o1.operand(Project.class).oneInput(o2 ->
+                                                o2.operand(Values.class)
+                                                        .predicate(values -> values.getTuples().size() == 1)
+                                                        .noInputs())))
+                .as(Config.class);
+
+        /** {@inheritDoc} */
+        @Override
+        default TableModifyToKeyValuePutRule toRule() {
+            return new TableModifyToKeyValuePutRule(this);
+        }
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgnitePkLookup.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableScanToKeyValueGetRule.java
similarity index 52%
rename from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgnitePkLookup.java
rename to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableScanToKeyValueGetRule.java
index 4e25a0f08f..b555ca997a 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgnitePkLookup.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/TableScanToKeyValueGetRule.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.rel;
+package org.apache.ignite.internal.sql.engine.rule;
 
 import static org.apache.ignite.internal.sql.engine.util.RexUtils.builder;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
@@ -25,118 +25,58 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.RelRule;
 import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.mapping.Mappings;
-import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
-import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
 import org.apache.ignite.internal.sql.engine.prepare.bounds.ExactBounds;
 import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
+import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
+import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet;
 import org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.RexUtils;
-import org.apache.ignite.internal.tx.InternalTransaction;
+import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Relational operator that represents lookup by a primary key.
+ * Tries to convert given scan node to physical node representing primary key lookup.
  *
- * <p>Note: at the moment, KV api requires an actual transaction which is object
- * of type {@link InternalTransaction} while distributed execution has access to
- * only certain {@link TxAttributes attributes} of a transaction. Given that node is
- * not supposed to be a part of distributed query plan, the following parts were
- * deliberately omitted:<ul>
- *     <li>this class doesn't implement {@link SourceAwareIgniteRel}, making it impossible
- *     to map properly by {@link MappingService}</li>
- *     <li>{@link IgniteRelVisitor} was not extended with this class</li>
- *     <li>de-serialisation constructor is omitted (see {@link ProjectableFilterableTableScan#ProjectableFilterableTableScan(RelInput)}
- *     as example)</li>
- * </ul>
+ * <p>Conversion will be successful if: <ol>
+ *     <li>there is condition</li>
+ *     <li>table has primary key index</li>
+ *     <li>condition covers all columns of primary key index</li>
+ *     <li>only single search key is derived from condition</li>
+ * </ol>
  */
-public class IgnitePkLookup extends ProjectableFilterableTableScan implements IgniteRel {
-    private final List<RexNode> keyExpressions;
-
-    private IgnitePkLookup(
-            RelOptCluster cluster,
-            RelTraitSet traits,
-            RelOptTable tbl,
-            List<RelHint> hints,
-            List<RexNode> keyExpressions,
-            @Nullable List<RexNode> proj,
-            @Nullable RexNode cond,
-            @Nullable ImmutableBitSet requiredColumns
-    ) {
-        super(cluster, traits, hints, tbl, proj, cond, requiredColumns);
-
-        this.keyExpressions = keyExpressions;
-    }
+@Value.Enclosing
+public class TableScanToKeyValueGetRule extends RelRule<TableScanToKeyValueGetRule.Config> {
+    public static final RelOptRule INSTANCE = Config.INSTANCE.toRule();
 
-    /** {@inheritDoc} */
-    @Override
-    public <T> T accept(IgniteRelVisitor<T> visitor) {
-        return visitor.visit(this);
+    private TableScanToKeyValueGetRule(Config cfg) {
+        super(cfg);
     }
 
     /** {@inheritDoc} */
     @Override
-    public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgnitePkLookup(cluster, getTraitSet(), getTable(), getHints(), keyExpressions, projects, condition, requiredColumns);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public IgnitePkLookup withHints(List<RelHint> hintList) {
-        return new IgnitePkLookup(getCluster(), getTraitSet(), getTable(), hintList, keyExpressions, projects, condition, requiredColumns);
-    }
-
-    @Override
-    protected RelWriter explainTerms0(RelWriter pw) {
-        return super.explainTerms0(pw)
-                .item("key", keyExpressions);
-    }
+    public void onMatch(RelOptRuleCall call) {
+        IgniteLogicalTableScan scan = cast(call.rel(0));
 
-    /**
-     * Returns a list of expressions in the order of a primary key columns of related table
-     * to use as lookup key.
-     *
-     * @return List of expressions representing lookup key.
-     */
-    public List<RexNode> keyExpressions() {
-        return keyExpressions;
-    }
-
-    /**
-     * Tries to convert given scan node to physical node representing primary key lookup.
-     *
-     * <p>Conversion will be successful if: <ol>
-     *     <li>there is condition</li>
-     *     <li>table has primary key index</li>
-     *     <li>condition covers all columns of primary key index</li>
-     *     <li>only single search key is derived from condition</li>
-     * </ol>
-     *
-     * @param scan A scan node to analyze.
-     * @return A physical node representing optimized look up by primary key or {@code null}
-     *      if we were unable to derive all necessary information.
-     */
-    public static @Nullable IgnitePkLookup convert(IgniteLogicalTableScan scan) {
         List<SearchBounds> bounds = deriveSearchBounds(scan);
 
         if (nullOrEmpty(bounds)) {
-            return null;
+            return;
         }
 
         List<RexNode> expressions = new ArrayList<>(bounds.size());
@@ -150,7 +90,7 @@ public class IgnitePkLookup extends ProjectableFilterableTableScan implements Ig
             // iteration over a number of search keys are not supported yet,
             // thus we need to make sure only single key was derived
             if (!(bound instanceof ExactBounds)) {
-                return null;
+                return;
             }
 
             condition.remove(bound.condition());
@@ -158,7 +98,7 @@ public class IgnitePkLookup extends ProjectableFilterableTableScan implements Ig
         }
 
         if (nullOrEmpty(expressions)) {
-            return null;
+            return;
         }
 
         RexNode resultingCondition = RexUtil.composeConjunction(rexBuilder, condition);
@@ -166,17 +106,19 @@ public class IgnitePkLookup extends ProjectableFilterableTableScan implements Ig
             resultingCondition = null;
         }
 
-        return new IgnitePkLookup(
-                cluster,
-                scan.getTraitSet()
-                        .replace(IgniteConvention.INSTANCE)
-                        .replace(IgniteDistributions.single()),
-                scan.getTable(),
-                scan.getHints(),
-                expressions,
-                scan.projects(),
-                resultingCondition,
-                scan.requiredColumns()
+        call.transformTo(
+                new IgniteKeyValueGet(
+                        cluster,
+                        scan.getTraitSet()
+                                .replace(IgniteConvention.INSTANCE)
+                                .replace(IgniteDistributions.single()),
+                        scan.getTable(),
+                        scan.getHints(),
+                        expressions,
+                        scan.projects(),
+                        resultingCondition,
+                        scan.requiredColumns()
+                )
         );
     }
 
@@ -220,4 +162,27 @@ public class IgnitePkLookup extends ProjectableFilterableTableScan implements Ig
                 requiredColumns
         );
     }
+
+    private static <T extends RelNode> T cast(RelNode node) {
+        return (T) node;
+    }
+
+    /** Configuration. */
+    @SuppressWarnings({"ClassNameSameAsAncestorName", "InnerClassFieldHidesOuterClassField"})
+    @Value.Immutable
+    public interface Config extends RelRule.Config {
+        Config INSTANCE = ImmutableTableScanToKeyValueGetRule.Config.of()
+                .withDescription("TableScanToKeyValueGetRule")
+                .withOperandSupplier(o0 ->
+                        o0.operand(IgniteLogicalTableScan.class)
+                                .predicate(scan -> scan.condition() != null)
+                                .noInputs())
+                .as(Config.class);
+
+        /** {@inheritDoc} */
+        @Override
+        default TableScanToKeyValueGetRule toRule() {
+            return new TableScanToKeyValueGetRule(this);
+        }
+    }
 }
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 514a17b3f6..06bfda53aa 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -588,7 +588,7 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest {
     public void exceptionArrivingBeforeRootFragmentExecutesDoesNotLeaveQueryHanging() {
         ExecutionService execService = executionServices.get(0);
         BaseQueryContext ctx = createContext();
-        QueryPlan plan = prepare("INSERT INTO test_tbl(ID, VAL) VALUES (1, 1)", ctx);
+        QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx);
 
         CountDownLatch queryFailedLatch = new CountDownLatch(1);
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 69b1159a0d..44d6e0be4d 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.sql.engine.framework;
 import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
 import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -198,8 +197,6 @@ public class TestNode implements LifecycleAware {
         ParsedResult parsedResult = parserService.parse(query);
         BaseQueryContext ctx = createContext();
 
-        assertEquals(ctx.parameters().length, parsedResult.dynamicParamsCount(), "Invalid number of dynamic parameters");
-
         return await(prepareService.prepareAsync(parsedResult, ctx));
     }
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 6f18f64f52..62e64d3d98 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.rel.IgniteSystemViewScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.rule.TableModifyToKeyValuePutRule;
 import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
 import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
 import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Collation;
@@ -114,6 +115,11 @@ import org.jetbrains.annotations.Nullable;
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
 public abstract class AbstractPlannerTest extends IgniteAbstractTest {
+    protected static final String[] DISABLE_KEY_VALUE_MODIFY_RULES = {
+            TableModifyToKeyValuePutRule.VALUES.toString(),
+            TableModifyToKeyValuePutRule.PROJECT.toString(),
+    };
+
     protected static final IgniteTypeFactory TYPE_FACTORY = Commons.typeFactory();
 
     protected static final int DEFAULT_TBL_SIZE = 500_000;
@@ -981,10 +987,9 @@ public abstract class AbstractPlannerTest extends IgniteAbstractTest {
      * An implementation of {@link PlanChecker} with initialized {@link SqlPrepare} to test plans.
      */
     public class PlanChecker extends StatementChecker {
-
         PlanChecker() {
-            super((schema, sql, params) -> {
-                PlanningContext planningContext = plannerCtx(sql, List.of(schema), HintStrategyTable.EMPTY, params);
+            super((schema, sql, params, rulesToDisable) -> {
+                PlanningContext planningContext = plannerCtx(sql, List.of(schema), HintStrategyTable.EMPTY, params, rulesToDisable);
                 IgnitePlanner planner = planningContext.planner();
                 IgniteRel igniteRel = physicalPlan(planner, sql);
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
index 47b1d55517..ad25745a61 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java
@@ -37,10 +37,9 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 /**
- * Tests to verify DML plans.
+ * Tests to verify multi-step versions of DML plans.
  */
 public class DmlPlannerTest extends AbstractPlannerTest {
-
     /**
      * Test for INSERT .. VALUES when table has a single distribution.
      */
@@ -51,7 +50,9 @@ public class DmlPlannerTest extends AbstractPlannerTest {
 
         // There should be no exchanges and other operations.
         assertPlan("INSERT INTO TEST1 (C1, C2) VALUES(1, 2)", schema,
-                isInstanceOf(IgniteTableModify.class).and(input(isInstanceOf(IgniteValues.class))));
+                isInstanceOf(IgniteTableModify.class).and(input(isInstanceOf(IgniteValues.class))),
+                DISABLE_KEY_VALUE_MODIFY_RULES
+        );
     }
 
     /**
@@ -68,7 +69,8 @@ public class DmlPlannerTest extends AbstractPlannerTest {
                 nodeOrAnyChild(isInstanceOf(IgniteExchange.class)
                         .and(e -> e.distribution().equals(IgniteDistributions.single())))
                         .and(nodeOrAnyChild(isInstanceOf(IgniteTableModify.class))
-                                .and(hasChildThat(isInstanceOf(IgniteExchange.class).and(e -> distribution.equals(e.distribution())))))
+                                .and(hasChildThat(isInstanceOf(IgniteExchange.class).and(e -> distribution.equals(e.distribution()))))),
+                DISABLE_KEY_VALUE_MODIFY_RULES
         );
     }
 
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DynamicParametersTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DynamicParametersTest.java
index bda29a8abb..917039ca19 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DynamicParametersTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DynamicParametersTest.java
@@ -483,24 +483,28 @@ public class DynamicParametersTest extends AbstractPlannerTest {
     public Stream<DynamicTest> testInsertDynamicParams() {
         return Stream.of(
                 checkStatement()
+                        .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
                         .table("t1", "c1", NativeTypes.INT32)
                         .sql("INSERT INTO t1 VALUES (?)", 1)
                         .parameterTypes(nullable(NativeTypes.INT32))
                         .project("?0"),
 
                 checkStatement()
+                        .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
                         .table("t1", "c1", NativeTypes.INT64)
                         .sql("INSERT INTO t1 VALUES (?)", 1)
                         .parameterTypes(nullable(NativeTypes.INT32))
                         .project("CAST(?0):BIGINT"),
 
                 checkStatement()
+                        .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
                         .table("t1", "c1", NativeTypes.INT64)
                         .sql("INSERT INTO t1 VALUES (?)", Unspecified.UNKNOWN)
                         .parameterTypes(nullable(NativeTypes.INT64))
                         .project("?0"),
 
                 checkStatement()
+                        .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
                         .table("t1", "c1", NativeTypes.INT64)
                         .sql("INSERT INTO t1 VALUES (?)", new Object[]{null})
                         .parameterTypes(new NativeType[]{null})
@@ -513,6 +517,7 @@ public class DynamicParametersTest extends AbstractPlannerTest {
                         .project("?0"),
 
                 checkStatement()
+                        .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
                         .table("t1", "c1", NativeTypes.INT32)
                         .sql("INSERT INTO t1 VALUES (?), (2), (?)", Unspecified.UNKNOWN, Unspecified.UNKNOWN)
                         .parameterTypes(nullable(NativeTypes.INT32), nullable(NativeTypes.INT32))
@@ -521,6 +526,7 @@ public class DynamicParametersTest extends AbstractPlannerTest {
                 // compatible type
 
                 checkStatement()
+                        .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
                         .table("t1", "c1", NativeTypes.INT64)
                         .sql("INSERT INTO t1 VALUES (?)", 1)
                         .parameterTypes(nullable(NativeTypes.INT32))
@@ -530,6 +536,7 @@ public class DynamicParametersTest extends AbstractPlannerTest {
                 // Incompatible types in dynamic params
 
                 checkStatement()
+                        .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
                         .table("t1", "c1", NativeTypes.INT32)
                         .sql("INSERT INTO t1 VALUES (?)", "10")
                         .fails("Values passed to VALUES operator must have compatible types"),
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java
index d876e3a46a..061ae5aec2 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ImplicitCastsTest.java
@@ -63,7 +63,6 @@ import org.junit.jupiter.params.provider.MethodSource;
  * Type coercion related tests that ensure that the necessary casts are placed where it is necessary.
  */
 public class ImplicitCastsTest extends AbstractPlannerTest {
-
     private static TestTable tableWithColumn(String tableName, String columnName, RelDataType columnType) {
         return TestBuilders.table()
                 .name(tableName)
@@ -318,6 +317,7 @@ public class ImplicitCastsTest extends AbstractPlannerTest {
 
                 // DEFAULT is not coerced
                 checkStatement()
+                        .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
                         .table("t1", (table) -> {
                             return table.name("T1")
                                     .addColumn("INT_COL", NativeTypes.INT32)
@@ -476,6 +476,7 @@ public class ImplicitCastsTest extends AbstractPlannerTest {
 
         return Stream.of(
                 checkStatement(setup)
+                        .disableRules(DISABLE_KEY_VALUE_MODIFY_RULES)
                         .table("t3", "str_col", NativeTypes.stringOf(36))
                         .sql("INSERT INTO t3 VALUES('1111'::UUID)")
                         .project("CAST(CAST(_UTF-8'1111'):UUID NOT NULL):VARCHAR(36) CHARACTER SET \"UTF-8\" NOT NULL"),
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/KeyValueModifyPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/KeyValueModifyPlannerTest.java
new file mode 100644
index 0000000000..08c79c5796
--- /dev/null
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/KeyValueModifyPlannerTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.planner;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.commands.DropTableCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.framework.TestCluster;
+import org.apache.ignite.internal.sql.engine.framework.TestNode;
+import org.apache.ignite.internal.sql.engine.prepare.KeyValueModifyPlan;
+import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Test cases to very KV modify optimized plans.
+ */
+public class KeyValueModifyPlannerTest extends AbstractPlannerTest {
+    private static final String NODE_NAME = "N1";
+
+    private static final TestCluster CLUSTER = TestBuilders.cluster()
+            .nodes(NODE_NAME)
+            .build();
+
+    private final TestNode node = CLUSTER.node(NODE_NAME);
+
+    @BeforeAll
+    static void start() {
+        CLUSTER.start();
+    }
+
+    @AfterAll
+    static void stop() throws Exception {
+        CLUSTER.stop();
+    }
+
+    @AfterEach
+    void clearCatalog() {
+        int version = CLUSTER.catalogManager().latestCatalogVersion();
+
+        List<CatalogCommand> commands = new ArrayList<>();
+        for (CatalogTableDescriptor table : CLUSTER.catalogManager().tables(version)) {
+            commands.add(
+                    DropTableCommand.builder()
+                            .schemaName(CatalogService.DEFAULT_SCHEMA_NAME)
+                            .tableName(table.name())
+                            .build()
+            );
+        }
+
+        await(CLUSTER.catalogManager().execute(commands));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+            "INSERT INTO test VALUES (10, 20)",
+            "INSERT INTO test(id, val) VALUES (10, 20)",
+            "INSERT INTO test(val, id) VALUES (20, 10)",
+    })
+    void optimizedInsertUsedForLiterals(String insertStatement) {
+        node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, val INT)");
+
+        {
+            QueryPlan plan = node.prepare(insertStatement);
+
+            assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+            assertExpressions((KeyValueModifyPlan) plan, "10", "20");
+        }
+    }
+
+    @Test
+    void optimizedInsertUsedForDynamicParams() {
+        node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, val INT)");
+
+        {
+            QueryPlan plan = node.prepare("INSERT INTO test VALUES (?, ?)");
+
+            assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+            assertExpressions((KeyValueModifyPlan) plan, "?0", "?1");
+        }
+
+        {
+            QueryPlan plan = node.prepare("INSERT INTO test(id, val) VALUES (?, ?)");
+
+            assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+            assertExpressions((KeyValueModifyPlan) plan, "?0", "?1");
+        }
+
+        {
+            QueryPlan plan = node.prepare("INSERT INTO test(val, id) VALUES (?, ?)");
+
+            assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+            assertExpressions((KeyValueModifyPlan) plan, "?1", "?0");
+        }
+    }
+
+    @Test
+    void optimizedInsertUsedForMixedCase() {
+        node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, int_val INT, str_val VARCHAR(128))");
+
+        QueryPlan plan = node.prepare("INSERT INTO test VALUES (?, 1, CAST(CURRENT_DATE as VARCHAR(128)))");
+
+        assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+        assertExpressions((KeyValueModifyPlan) plan, "?0", "1", "CAST(CURRENT_DATE):VARCHAR(128) CHARACTER SET \"UTF-8\" NOT NULL");
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+            "INSERT INTO test(id) VALUES (1)",
+            "INSERT INTO test(id, int_val) VALUES (1, DEFAULT)"
+    })
+    void optimizedInsertUsedWithDefaults(String insertStatement) {
+        node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, int_val INT DEFAULT 10, str_val VARCHAR(128) DEFAULT 'a')");
+
+        QueryPlan plan = node.prepare(insertStatement);
+
+        assertThat(plan, instanceOf(KeyValueModifyPlan.class));
+        assertExpressions((KeyValueModifyPlan) plan, "1", "10", "_UTF-8'a'");
+    }
+
+    @Test
+    void optimizedInsertNotUsedForMultiInsert() {
+        node.initSchema("CREATE TABLE test (id INT PRIMARY KEY, val INT)");
+
+        QueryPlan plan = node.prepare("INSERT INTO test VALUES (1, 1), (2, 2)");
+
+        assertThat(plan, not(instanceOf(KeyValueModifyPlan.class)));
+    }
+
+    @Test
+    void optimizedInsertNotUsedForInsertFromOtherTable() {
+        node.initSchema("CREATE TABLE source_t (id INT PRIMARY KEY, val INT);"
+                + "CREATE TABLE target_t (id INT PRIMARY KEY, val INT)");
+
+        QueryPlan plan = node.prepare("INSERT INTO target_t SELECT * FROM source_t");
+
+        assertThat(plan, not(instanceOf(KeyValueModifyPlan.class)));
+    }
+
+    private static void assertExpressions(KeyValueModifyPlan plan, String... expectedExpressions) {
+        List<String> keyExpressions = plan.modifyNode().expressions().stream()
+                .map(RexNode::toString)
+                .collect(toList());
+
+        assertThat(
+                keyExpressions,
+                equalTo(List.of(expectedExpressions))
+        );
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/StatementChecker.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/StatementChecker.java
index e2d2ef252d..25ecc02855 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/StatementChecker.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/StatementChecker.java
@@ -124,6 +124,8 @@ public class StatementChecker {
 
     private boolean dumpPlan;
 
+    private String[] rulesToDisable = new String[0];
+
     private Consumer<StatementChecker> setup = (checker) -> {};
 
     private List<RelDataType> expectedParameterTypes;
@@ -148,8 +150,11 @@ public class StatementChecker {
          * @param schema A schema.
          * @param sql An SQL statement.
          * @param params A list of dynamic parameters.
+         * @param rulesToDisable A list of rules to exclude from optimisation.
          */
-        Pair<IgniteRel, IgnitePlanner> prepare(IgniteSchema schema, String sql, List<Object> params) throws Exception;
+        Pair<IgniteRel, IgnitePlanner> prepare(
+                IgniteSchema schema, String sql, List<Object> params, String... rulesToDisable
+        ) throws Exception;
     }
 
     /** Sets a function that is going to be called prior to test run. */
@@ -158,6 +163,13 @@ public class StatementChecker {
         return this;
     }
 
+    /** Sets rules to exclude from optimisation. */
+    public StatementChecker disableRules(String... rulesToDisable) {
+        this.rulesToDisable = rulesToDisable;
+
+        return this;
+    }
+
     /**
      * Updates schema to include a table with 1 column.
      */
@@ -419,7 +431,7 @@ public class StatementChecker {
             IgnitePlanner planner;
 
             try {
-                Pair<IgniteRel, IgnitePlanner> result = sqlPrepare.prepare(schema, sqlStatement, dynamicParams);
+                Pair<IgniteRel, IgnitePlanner> result = sqlPrepare.prepare(schema, sqlStatement, dynamicParams, rulesToDisable);
                 root = result.getFirst();
                 planner = result.getSecond();
 
@@ -478,7 +490,7 @@ public class StatementChecker {
             Throwable err = null;
             IgniteRel unexpectedPlan = null;
             try {
-                Pair<IgniteRel, ?> unexpected = sqlPrepare.prepare(schema, sqlStatement, dynamicParams);
+                Pair<IgniteRel, ?> unexpected = sqlPrepare.prepare(schema, sqlStatement, dynamicParams, rulesToDisable);
                 unexpectedPlan = unexpected.getFirst();
             } catch (Throwable t) {
                 err = t;
diff --git a/modules/sql-engine/src/test/resources/mapping/dml.test b/modules/sql-engine/src/test/resources/mapping/dml.test
index b977e15515..fed5b9ceac 100644
--- a/modules/sql-engine/src/test/resources/mapping/dml.test
+++ b/modules/sql-engine/src/test/resources/mapping/dml.test
@@ -1,5 +1,5 @@
 N0
-INSERT INTO t1_n1 VALUES(1, 1, 1)
+INSERT INTO t1_n1 VALUES (1, 1, 1), (2, 2, 2)
 ---
 Fragment#0 root
   executionNodes: [N0]