You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/03/08 11:12:45 UTC

[ignite] branch master updated: IGNITE-11227: SQL: Decoupled query execution entry point from DML. This closes #6246.

This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 455b56d  IGNITE-11227: SQL: Decoupled query execution entry point from DML. This closes #6246.
455b56d is described below

commit 455b56d4166c9455a4b02532049de63e2e6090e3
Author: devozerov <pp...@gmail.com>
AuthorDate: Fri Mar 8 14:12:38 2019 +0300

    IGNITE-11227: SQL: Decoupled query execution entry point from DML. This closes #6246.
---
 .../thin/JdbcThinDataPageScanPropertySelfTest.java |   33 +-
 .../thin/JdbcThinStreamingAbstractSelfTest.java    |   22 +-
 .../internal/processors/cache/mvcc/MvccUtils.java  |    7 +-
 .../processors/query/GridQueryIndexing.java        |   15 +-
 .../processors/query/GridQueryProcessor.java       |    9 +-
 .../IgniteClientCacheInitializationFailTest.java   |   12 +-
 .../processors/query/h2/CommandProcessor.java      |   26 +-
 .../processors/query/h2/IgniteH2Indexing.java      | 1163 +++++++++++---------
 ...eryParserCacheKey.java => QueryDescriptor.java} |  110 +-
 .../processors/query/h2/QueryParameters.java       |  215 ++++
 .../internal/processors/query/h2/QueryParser.java  |  131 ++-
 .../processors/query/h2/QueryParserResult.java     |   29 +-
 .../processors/query/h2/dml/UpdatePlanBuilder.java |   68 +-
 .../processors/query/RunningQueriesTest.java       |   27 +-
 14 files changed, 1167 insertions(+), 700 deletions(-)

diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataPageScanPropertySelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataPageScanPropertySelfTest.java
index 40e1d78..9bd9064 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataPageScanPropertySelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataPageScanPropertySelfTest.java
@@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.SqlClientContext;
@@ -121,14 +120,19 @@ public class JdbcThinDataPageScanPropertySelfTest extends GridCommonAbstractTest
     private void checkDataPageScanInBatch(String qryWithParam, @Nullable Boolean dps) throws Exception {
         String params = (dps == null) ? null : "dataPageScanEnabled=" + dps;
 
+        int expCnt = 0;
+
         try (Connection conn = GridTestUtils.connect(grid(0), params)) {
             try (PreparedStatement upd = conn.prepareStatement(qryWithParam)) {
                 for (int i = 0; i < TOTAL_QUERIES_TO_EXECUTE; i++) {
                     upd.setInt(1, i);
                     upd.addBatch();
 
-                    if ((i + 1) % BATCH_SIZE == 0 || (i + 1) == TOTAL_QUERIES_TO_EXECUTE)
+                    if ((i + 1) % BATCH_SIZE == 0 || (i + 1) == TOTAL_QUERIES_TO_EXECUTE) {
                         upd.executeBatch();
+
+                        expCnt++;
+                    }
                 }
             }
         }
@@ -146,10 +150,7 @@ public class JdbcThinDataPageScanPropertySelfTest extends GridCommonAbstractTest
 
         int executed = IndexingWithQueries.queries.size();
 
-        assertTrue(
-            "Expected that there are executed at least " + TOTAL_QUERIES_TO_EXECUTE + " queries. " +
-                "But executed only " + executed,
-            executed >= TOTAL_QUERIES_TO_EXECUTE);
+        assertEquals(expCnt, executed);
 
         IndexingWithQueries.queries.clear();
     }
@@ -198,12 +199,24 @@ public class JdbcThinDataPageScanPropertySelfTest extends GridCommonAbstractTest
         static final Queue<SqlFieldsQuery> queries = new LinkedBlockingQueue<>();
 
         /** {@inheritDoc} */
-        @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
-            @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts,
-            MvccQueryTracker tracker, GridQueryCancel cancel, boolean registerAsNewQry) {
+        @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
+            String schemaName,
+            SqlFieldsQuery qry,
+            @Nullable SqlClientContext cliCtx,
+            boolean keepBinary,
+            boolean failOnMultipleStmts,
+            GridQueryCancel cancel
+        ) {
             queries.add(qry);
 
-            return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, tracker, cancel, registerAsNewQry);
+            return super.querySqlFields(
+                schemaName,
+                qry,
+                cliCtx,
+                keepBinary,
+                failOnMultipleStmts,
+                cancel
+            );
         }
     }
 }
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
index 5929839..71b5049 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.SqlClientContext;
@@ -517,13 +516,24 @@ public abstract class JdbcThinStreamingAbstractSelfTest extends JdbcStreamingSel
         }
 
         /** {@inheritDoc} */
-        @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
-            @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker,
-            GridQueryCancel cancel, boolean registerAsNewQry) {
+        @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
+            String schemaName,
+            SqlFieldsQuery qry,
+            @Nullable SqlClientContext cliCtx,
+            boolean keepBinary,
+            boolean failOnMultipleStmts,
+            GridQueryCancel cancel
+        ) {
             IndexingWithContext.cliCtx = cliCtx;
 
-            return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, tracker, cancel,
-                registerAsNewQry);
+            return super.querySqlFields(
+                schemaName,
+                qry,
+                cliCtx,
+                keepBinary,
+                failOnMultipleStmts,
+                cancel
+            );
         }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index cbc7023..c81b2da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -788,16 +788,17 @@ public class MvccUtils {
     /**
      * Initialises MVCC filter and returns MVCC query tracker if needed.
      * @param cctx Cache context.
-     * @param startTx Start transaction flag.
+     * @param autoStartTx Start transaction flag.
      * @return MVCC query tracker.
      * @throws IgniteCheckedException If failed.
      */
-    @NotNull public static MvccQueryTracker mvccTracker(GridCacheContext cctx, boolean startTx) throws IgniteCheckedException {
+    @NotNull public static MvccQueryTracker mvccTracker(GridCacheContext cctx, boolean autoStartTx)
+        throws IgniteCheckedException {
         assert cctx != null && cctx.mvccEnabled();
 
         GridNearTxLocal tx = tx(cctx.kernalContext());
 
-        if (tx == null && startTx)
+        if (tx == null && autoStartTx)
             tx = txStart(cctx, 0);
 
         return mvccTracker(cctx, tx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index f6215b7..557e3ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
@@ -85,14 +84,16 @@ public interface GridQueryIndexing {
      * @param cliCtx Client context.
      * @param keepBinary Keep binary flag.
      * @param failOnMultipleStmts Whether an exception should be thrown for multiple statements query.
-     * @param tracker Query tracker.
-     * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
-     * {@code false} otherwise.
      * @return Cursor.
      */
-    public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
-        SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker,
-        GridQueryCancel cancel, boolean registerAsNewQry);
+    public List<FieldsQueryCursor<List<?>>> querySqlFields(
+        String schemaName,
+        SqlFieldsQuery qry,
+        SqlClientContext cliCtx,
+        boolean keepBinary,
+        boolean failOnMultipleStmts,
+        GridQueryCancel cancel
+    );
 
     /**
      * Execute an INSERT statement using data streamer as receiver.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index b6af8ca..c3d0b88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2231,6 +2231,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             throw new CacheException("Execution of local SqlFieldsQuery on client node disallowed.");
 
         return executeQuerySafe(cctx, () -> {
+            assert idx != null;
+
             final String schemaName = qry.getSchema() != null ? qry.getSchema()
                 : (cctx != null ? idx.schema(cctx.name()) : QueryUtils.DFLT_SCHEMA);
 
@@ -2239,16 +2241,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     @Override public List<FieldsQueryCursor<List<?>>> applyx() {
                         GridQueryCancel cancel0 = cancel != null ? cancel : new GridQueryCancel();
 
-                        List<FieldsQueryCursor<List<?>>> res =
-                            idx.querySqlFields(
+                        List<FieldsQueryCursor<List<?>>> res = idx.querySqlFields(
                                 schemaName,
                                 qry,
                                 cliCtx,
                                 keepBinary,
                                 failOnMultipleStmts,
-                                null,
-                                cancel0,
-                                true
+                                cancel0
                             );
 
                         if (cctx != null)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index 5313c09..9e3af5a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -285,9 +284,14 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
         }
 
         /** {@inheritDoc} */
-        @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
-            SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker,
-            GridQueryCancel cancel, boolean registerAsNewQry) {
+        @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
+            String schemaName,
+            SqlFieldsQuery qry,
+            SqlClientContext cliCtx,
+            boolean keepBinary,
+            boolean failOnMultipleStmts,
+            GridQueryCancel cancel
+        ) {
             return null;
         }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
index 0007fd8..c71a5c2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
@@ -37,7 +37,6 @@ import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.QueryIndexType;
 import org.apache.ignite.cache.query.BulkLoadContextCursor;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -50,7 +49,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -160,15 +158,16 @@ public class CommandProcessor {
     /**
      * Execute command.
      *
-     * @param qry Query.
+     * @param sql SQL.
      * @param cmdNative Native command (if any).
      * @param cmdH2 H2 command (if any).
+     * @param params Parameters.
      * @param cliCtx Client context.
      * @param qryId Running query ID.
      * @return Result.
      */
-    public CommandResult runCommand(SqlFieldsQuery qry, SqlCommand cmdNative, GridSqlStatement cmdH2,
-        @Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException {
+    public CommandResult runCommand(String sql, SqlCommand cmdNative, GridSqlStatement cmdH2,
+        QueryParameters params, @Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException {
         assert cmdNative != null || cmdH2 != null;
 
         // Do execute.
@@ -179,7 +178,7 @@ public class CommandProcessor {
             assert cmdH2 == null;
 
             if (isDdl(cmdNative))
-                runCommandNativeDdl(qry.getSql(), cmdNative);
+                runCommandNativeDdl(sql, cmdNative);
             else if (cmdNative instanceof SqlBulkLoadCommand) {
                 res = processBulkLoadCommand((SqlBulkLoadCommand) cmdNative, qryId);
 
@@ -188,12 +187,12 @@ public class CommandProcessor {
             else if (cmdNative instanceof SqlSetStreamingCommand)
                 processSetStreamingCommand((SqlSetStreamingCommand)cmdNative, cliCtx);
             else
-                processTxCommand(cmdNative, qry);
+                processTxCommand(cmdNative, params);
         }
         else {
             assert cmdH2 != null;
 
-            runCommandH2(qry.getSql(), cmdH2);
+            runCommandH2(sql, cmdH2);
         }
 
         return new CommandResult(res, unregister);
@@ -839,13 +838,12 @@ public class CommandProcessor {
     /**
      * Process transactional command.
      * @param cmd Command.
-     * @param qry Query.
+     * @param params Parameters.
      * @throws IgniteCheckedException if failed.
      */
-    private void processTxCommand(SqlCommand cmd, SqlFieldsQuery qry)
+    private void processTxCommand(SqlCommand cmd, QueryParameters params)
         throws IgniteCheckedException {
-        NestedTxMode nestedTxMode = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx)qry).getNestedTxMode() :
-            NestedTxMode.DEFAULT;
+        NestedTxMode nestedTxMode = params.nestedTxMode();
 
         GridNearTxLocal tx = tx(ctx);
 
@@ -862,7 +860,7 @@ public class CommandProcessor {
                     case COMMIT:
                         doCommit(tx);
 
-                        txStart(ctx, qry.getTimeout());
+                        txStart(ctx, params.timeout());
 
                         break;
 
@@ -881,7 +879,7 @@ public class CommandProcessor {
                 }
             }
             else
-                txStart(ctx, qry.getTimeout());
+                txStart(ctx, params.timeout());
         }
         else if (cmd instanceof SqlCommitTransactionCommand) {
             // Do nothing if there's no transaction.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 65a85ba..17e9b1d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -71,14 +71,12 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.RegisteredQueryCursor;
-import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.query.EnlistOperation;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
@@ -420,32 +418,27 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * Queries individual fields (generally used by JDBC drivers).
      *
-     * @param schemaName Schema name.
-     * @param qry Query.
-     * @param params Query parameters.
+     * @param qryDesc Query descriptor.
+     * @param qryParams Query parameters.
+     * @param select Select.
      * @param filter Cache name and key filter.
-     * @param enforceJoinOrder Enforce join order of tables in the query.
-     * @param startTx Start transaction flag.
-     * @param qryTimeout Query timeout in milliseconds.
-     * @param cancel Query cancel.
+     * @param autoStartTx Start transaction flag.
      * @param mvccTracker Query tracker.
-     * @param dataPageScanEnabled If data page scan is enabled.
+     * @param cancel Query cancel.
      * @return Query result.
      * @throws IgniteCheckedException If failed.
      */
-    private GridQueryFieldsResult executeQueryLocal0(
-        final String schemaName,
-        String qry,
-        @Nullable final Collection<Object> params,
-        @Nullable List<GridQueryFieldMetadata> meta,
+    private GridQueryFieldsResult executeSelectLocal(
+        QueryDescriptor qryDesc,
+        QueryParameters qryParams,
+        QueryParserResultSelect select,
         final IndexingQueryFilter filter,
-        boolean enforceJoinOrder,
-        boolean startTx,
-        int qryTimeout,
-        final GridQueryCancel cancel,
+        boolean autoStartTx,
         MvccQueryTracker mvccTracker,
-        Boolean dataPageScanEnabled
+        GridQueryCancel cancel
     ) throws IgniteCheckedException {
+        String qry = qryDesc.sql();
+
         GridNearTxLocal tx = null;
 
         boolean mvccEnabled = mvccEnabled(kernalContext());
@@ -453,37 +446,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         assert mvccEnabled || mvccTracker == null;
 
         try {
-            SqlFieldsQuery fieldsQry = new SqlFieldsQuery(qry)
-                .setLocal(true)
-                .setEnforceJoinOrder(enforceJoinOrder)
-                .setDataPageScanEnabled(dataPageScanEnabled)
-                .setTimeout(qryTimeout, TimeUnit.MILLISECONDS);
-
-            if (params != null)
-                fieldsQry.setArgs(params.toArray());
-
-            QueryParserResult parseRes = parser.parse(schemaName, fieldsQry, false);
-
-            if (parseRes.isDml()) {
-                QueryParserResultDml dml = parseRes.dml();
-
-                assert dml != null;
-
-                UpdateResult updRes = executeUpdate(schemaName, dml, fieldsQry, true, filter, cancel);
-
-                List<?> updResRow = Collections.singletonList(updRes.counter());
-
-                return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META, new IgniteSingletonIterator<>(updResRow));
-            }
-            else if (parseRes.isCommand()) {
-                throw new IgniteSQLException("DDL statements are supported for the whole cluster only.",
-                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-            }
-
-            assert parseRes.isSelect();
-
-            QueryParserResultSelect select = parseRes.select();
-
             assert select != null;
 
             MvccSnapshot mvccSnapshot = null;
@@ -499,7 +461,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             GridNearTxSelectForUpdateFuture sfuFut = null;
 
-            int opTimeout = qryTimeout;
+            int opTimeout = qryParams.timeout();
 
             if (mvccEnabled) {
                 if (mvccTracker == null) {
@@ -512,7 +474,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                             throw new IgniteCheckedException("Cache has been stopped concurrently [cacheId=" +
                                 mvccCacheId + ']');
 
-                        mvccTracker = MvccUtils.mvccTracker(mvccCacheCtx, startTx);
+                        mvccTracker = MvccUtils.mvccTracker(mvccCacheCtx, autoStartTx);
                     }
                 }
 
@@ -563,7 +525,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 null
             );
 
-            return new GridQueryFieldsResultAdapter(meta, null) {
+            return new GridQueryFieldsResultAdapter(select.meta(), null) {
                 @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
                     assert qryCtxRegistry.getThreadLocal() == null;
 
@@ -572,12 +534,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable conn = connMgr.detachThreadConnection();
 
                     try {
-                        Connection conn0 = conn.object().connection(schemaName);
+                        Connection conn0 = conn.object().connection(qryDesc.schemaName());
+
+                        List<Object> args = F.asList(qryParams.arguments());
 
                         PreparedStatement stmt = preparedStatementWithParams(
                             conn0,
                             qry0,
-                            params,
+                            args,
                             true
                         );
 
@@ -585,10 +549,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                             stmt,
                             conn0,
                             qry0,
-                            params,
+                            args,
                             timeout0,
                             cancel,
-                            dataPageScanEnabled
+                            qryParams.dataPageScanEnabled()
                         );
 
                         if (sfuFut0 != null) {
@@ -991,65 +955,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * Queries individual fields (generally used by JDBC drivers).
-     *
-     * @param schemaName Schema name.
-     * @param qry Query.
-     * @param keepBinary Keep binary flag.
-     * @param filter Cache name and key filter.
-     * @param cancel Query cancel.
-     * @param qryId Running query id. {@code null} in case query is not registered.
-     * @return Cursor.
-     */
-    private FieldsQueryCursor<List<?>> executeQueryLocal(
-        String schemaName,
-        SqlFieldsQuery qry,
-        List<GridQueryFieldMetadata> meta,
-        final boolean keepBinary,
-        IndexingQueryFilter filter,
-        GridQueryCancel cancel,
-        Long qryId
-    ) throws IgniteCheckedException {
-        boolean startTx = autoStartTx(qry);
-
-        final GridQueryFieldsResult res = executeQueryLocal0(
-            schemaName,
-            qry.getSql(),
-            F.asList(qry.getArgs()),
-            meta,
-            filter,
-            qry.isEnforceJoinOrder(),
-            startTx,
-            qry.getTimeout(),
-            cancel,
-            null,
-            qry.isDataPageScanEnabled()
-        );
-
-        Iterable<List<?>> iter = () -> {
-            try {
-                return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        };
-
-        QueryCursorImpl<List<?>> cursor = qryId != null
-            ? new RegisteredQueryCursor<>(iter, cancel, runningQueryManager(), qryId)
-            : new QueryCursorImpl<>(iter, cancel);
-
-        cursor.fieldsMeta(res.metaData());
-
-        return cursor;
-    }
-
-    /**
      * @param schemaName Schema name.
      * @param qry Query.
      * @param keepCacheObj Flag to keep cache object.
      * @param enforceJoinOrder Enforce join order of tables.
-     * @param startTx Start transaction flag.
+     * @param autoStartTx Start transaction flag.
      * @param qryTimeout Query timeout.
      * @param cancel Cancel object.
      * @param params Query parameters.
@@ -1060,12 +970,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param pageSize Page size.
      * @return Iterable result.
      */
+    @SuppressWarnings("IfMayBeConditional")
     private Iterable<List<?>> runQueryTwoStep(
         final String schemaName,
         final GridCacheTwoStepQuery qry,
         final boolean keepCacheObj,
         final boolean enforceJoinOrder,
-        boolean startTx,
+        boolean autoStartTx,
         final int qryTimeout,
         final GridQueryCancel cancel,
         final Object[] params,
@@ -1078,10 +989,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         assert !qry.mvccEnabled() || !F.isEmpty(qry.cacheIds());
 
         try {
-            final MvccQueryTracker tracker = mvccTracker == null && qry.mvccEnabled() ?
-                MvccUtils.mvccTracker(ctx.cache().context().cacheContext(qry.cacheIds().get(0)), startTx) : mvccTracker;
+            final MvccQueryTracker mvccTracker0;
 
-            GridNearTxLocal tx = tracker != null ? tx(ctx) : null;
+            if (mvccTracker == null && qry.mvccEnabled()) {
+                mvccTracker0 = MvccUtils.mvccTracker(
+                    ctx.cache().context().cacheContext(qry.cacheIds().get(0)),
+                    autoStartTx
+                );
+            }
+            else
+                mvccTracker0 = mvccTracker;
+
+            GridNearTxLocal tx = mvccTracker0 != null ? tx(ctx) : null;
 
             // Locking has no meaning if SELECT FOR UPDATE is not executed in explicit transaction.
             // So, we can can reset forUpdate flag if there is no explicit transaction.
@@ -1103,15 +1022,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                             params,
                             parts,
                             lazy,
-                            tracker,
+                            mvccTracker0,
                             dataPageScanEnabled,
                             forUpdate,
                             pageSize
                         );
                     }
                     catch (Throwable e) {
-                        if (tracker != null)
-                            tracker.onDone();
+                        if (mvccTracker0 != null)
+                            mvccTracker0.onDone();
 
                         throw e;
                     }
@@ -1166,15 +1085,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * Execute command.
      *
-     * @param schemaName Schema name.
-     * @param qry Query.
+     * @param qryDesc Query descriptor.
+     * @param qryParams Query parameters.
      * @param cliCtx CLient context.
      * @param cmd Command (native).
      * @return Result.
      */
     private FieldsQueryCursor<List<?>> executeCommand(
-        String schemaName,
-        SqlFieldsQuery qry,
+        QueryDescriptor qryDesc,
+        QueryParameters qryParams,
         @Nullable SqlClientContext cliCtx,
         QueryParserResultCommand cmd
     ) {
@@ -1184,26 +1103,26 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         SqlCommand cmdNative = cmd.commandNative();
         GridSqlStatement cmdH2 = cmd.commandH2();
 
-        if (qry.isLocal()) {
+        if (qryDesc.local()) {
             throw new IgniteSQLException("DDL statements are not supported for LOCAL caches",
                 IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
         }
 
-        Long qryId = registerRunningQuery(schemaName, null, qry.getSql(), qry.isLocal(), true);
+        Long qryId = registerRunningQuery(qryDesc, null);
 
         boolean fail = false;
 
         CommandResult res = null;
 
         try {
-            res = cmdProc.runCommand(qry, cmdNative, cmdH2, cliCtx, qryId);
+            res = cmdProc.runCommand(qryDesc.sql(), cmdNative, cmdH2, qryParams, cliCtx, qryId);
 
             return res.cursor();
         }
         catch (IgniteCheckedException e) {
             fail = true;
 
-            throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql() +
+            throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qryDesc.sql() +
                 ", err=" + e.getMessage() + ']', e);
         }
         finally {
@@ -1244,11 +1163,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         @Nullable SqlClientContext cliCtx,
         boolean keepBinary,
         boolean failOnMultipleStmts,
-        MvccQueryTracker tracker,
-        GridQueryCancel cancel,
-        boolean registerAsNewQry
+        GridQueryCancel cancel
     ) {
-        boolean mvccEnabled = mvccEnabled(ctx), startTx = autoStartTx(qry);
+        boolean mvccEnabled = mvccEnabled(ctx);
 
         try {
             List<FieldsQueryCursor<List<?>>> res = new ArrayList<>(1);
@@ -1262,12 +1179,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 remainingQry = parseRes.remainingQuery();
 
                 // Get next command.
-                SqlFieldsQuery newQry = parseRes.query();
+                QueryDescriptor newQryDesc = parseRes.queryDescriptor();
+                QueryParameters newQryParams = parseRes.queryParameters();
 
                 // Check if there is enough parameters. Batched statements are not checked at this point
                 // since they pass parameters differently.
-                if (!DmlUtils.isBatched(newQry)) {
-                    int qryParamsCnt = F.isEmpty(newQry.getArgs()) ? 0 : newQry.getArgs().length;
+                if (!newQryDesc.batched()) {
+                    int qryParamsCnt = F.isEmpty(newQryParams.arguments()) ? 0 : newQryParams.arguments().length;
 
                     if (qryParamsCnt < parseRes.parametersCount())
                         throw new IgniteSQLException("Invalid number of query parameters [expected=" +
@@ -1283,28 +1201,42 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                     assert cmd != null;
 
-                    // Execute command.
                     FieldsQueryCursor<List<?>> cmdRes = executeCommand(
-                        schemaName,
-                        newQry,
+                        newQryDesc,
+                        newQryParams,
                         cliCtx,
                         cmd
                     );
 
                     res.add(cmdRes);
                 }
+                else if (parseRes.isDml()) {
+                    QueryParserResultDml dml = parseRes.dml();
+
+                    assert dml != null;
+
+                    List<? extends FieldsQueryCursor<List<?>>> dmlRes = executeDml(
+                        newQryDesc,
+                        newQryParams,
+                        dml,
+                        cancel
+                    );
+
+                    res.addAll(dmlRes);
+                }
                 else {
-                    // Execute query or DML.
-                    List<? extends FieldsQueryCursor<List<?>>> qryRes = executeSelectOrDml(
-                        schemaName,
-                        newQry,
-                        parseRes.select(),
-                        parseRes.dml(),
+                    assert parseRes.isSelect();
+
+                    QueryParserResultSelect select = parseRes.select();
+
+                    assert select != null;
+
+                    List<? extends FieldsQueryCursor<List<?>>> qryRes = executeSelect(
+                        newQryDesc,
+                        newQryParams,
+                        select,
                         keepBinary,
-                        startTx,
-                        tracker,
-                        cancel,
-                        registerAsNewQry
+                        cancel
                     );
 
                     res.addAll(qryRes);
@@ -1329,137 +1261,233 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /**
      * Execute an all-ready {@link SqlFieldsQuery}.
-     * @param schemaName Schema name.
-     * @param qry Fields query with flags.
-     * @param select Select.
+     *
+     * @param qryDesc Plan key.
+     * @param qryParams Parameters.
      * @param dml DML.
+     * @param cancel Query cancel state holder.
+     * @return Query result.
+     */
+    private List<? extends FieldsQueryCursor<List<?>>> executeDml(
+        QueryDescriptor qryDesc,
+        QueryParameters qryParams,
+        QueryParserResultDml dml,
+        GridQueryCancel cancel
+    ) {
+        IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null);
+
+        Long qryId = registerRunningQuery(qryDesc, cancel);
+
+        boolean fail = false;
+
+        try {
+            if (!dml.mvccEnabled() && !updateInTxAllowed && ctx.cache().context().tm().inUserTx()) {
+                throw new IgniteSQLException("DML statements are not allowed inside a transaction over " +
+                    "cache(s) with TRANSACTIONAL atomicity mode (change atomicity mode to " +
+                    "TRANSACTIONAL_SNAPSHOT or disable this error message with system property " +
+                    "\"-DIGNITE_ALLOW_DML_INSIDE_TRANSACTION=true\")");
+            }
+
+            if (!qryDesc.local()) {
+                return executeUpdateDistributed(
+                    qryDesc,
+                    qryParams,
+                    dml,
+                    cancel
+                );
+            }
+            else {
+                UpdateResult updRes = executeUpdate(
+                    qryDesc,
+                    qryParams,
+                    dml,
+                    true,
+                    filter,
+                    cancel
+                );
+
+                return Collections.singletonList(new QueryCursorImpl<>(new Iterable<List<?>>() {
+                    @SuppressWarnings("NullableProblems")
+                    @Override public Iterator<List<?>> iterator() {
+                        return new IgniteSingletonIterator<>(Collections.singletonList(updRes.counter()));
+                    }
+                }, cancel));
+            }
+        }
+        catch (IgniteCheckedException e) {
+            fail = true;
+
+            throw new IgniteSQLException("Failed to execute DML statement [stmt=" + qryDesc.sql() +
+                ", params=" + Arrays.deepToString(qryParams.arguments()) + "]", e);
+        }
+        finally {
+            runningQryMgr.unregister(qryId, fail);
+        }
+    }
+
+    /**
+     * Execute an all-ready {@link SqlFieldsQuery}.
+     *
+     * @param qryDesc Plan key.
+     * @param qryParams Parameters.
+     * @param select Select.
      * @param keepBinary Whether binary objects must not be deserialized automatically.
-     * @param startTx Start transaction flag.
-     * @param mvccTracker MVCC tracker.
      * @param cancel Query cancel state holder.
-     * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
      * @return Query result.
      */
-    private List<? extends FieldsQueryCursor<List<?>>> executeSelectOrDml(
-        String schemaName,
-        SqlFieldsQuery qry,
-        @Nullable QueryParserResultSelect select,
-        @Nullable QueryParserResultDml dml,
+    private List<? extends FieldsQueryCursor<List<?>>> executeSelect(
+        QueryDescriptor qryDesc,
+        QueryParameters qryParams,
+        QueryParserResultSelect select,
         boolean keepBinary,
-        boolean startTx,
-        MvccQueryTracker mvccTracker,
-        GridQueryCancel cancel,
-        boolean registerAsNewQry
+        GridQueryCancel cancel
     ) {
-        String sqlQry = qry.getSql();
-
-        boolean loc = qry.isLocal();
+        if (cancel == null)
+            cancel = new GridQueryCancel();
 
-        IndexingQueryFilter filter = (loc ? backupFilter(null, qry.getPartitions()) : null);
+        // Check security.
+        if (ctx.security().enabled())
+            checkSecurity(select.cacheIds());
 
-        if (dml != null) {
-            Long qryId = registerRunningQuery(schemaName, cancel, sqlQry, loc, registerAsNewQry);
+        // Register query.
+        Long qryId = registerRunningQuery(qryDesc, cancel);
 
-            boolean fail = false;
+        try {
+            Iterable<List<?>> iter = executeSelect0(qryDesc, qryParams, select, keepBinary, null, cancel);
 
-            try {
-                if (!dml.mvccEnabled() && !updateInTxAllowed && ctx.cache().context().tm().inUserTx()) {
-                    throw new IgniteSQLException("DML statements are not allowed inside a transaction over " +
-                        "cache(s) with TRANSACTIONAL atomicity mode (change atomicity mode to " +
-                        "TRANSACTIONAL_SNAPSHOT or disable this error message with system property " +
-                        "\"-DIGNITE_ALLOW_DML_INSIDE_TRANSACTION=true\")");
-                }
+            QueryCursorImpl<List<?>> cursor = qryId != null
+                ? new RegisteredQueryCursor<>(iter, cancel, runningQueryManager(), qryId)
+                : new QueryCursorImpl<>(iter, cancel);
 
-                if (!loc)
-                    return executeUpdateDistributed(schemaName, dml , qry, cancel);
-                else {
-                    UpdateResult updRes = executeUpdate(schemaName, dml , qry, true, filter, cancel);
+            cursor.fieldsMeta(select.meta());
 
-                    return Collections.singletonList(new QueryCursorImpl<>(new Iterable<List<?>>() {
-                        @SuppressWarnings("NullableProblems")
-                        @Override public Iterator<List<?>> iterator() {
-                            return new IgniteSingletonIterator<>(Collections.singletonList(updRes.counter()));
-                        }
-                    }, cancel));
-                }
-            }
-            catch (IgniteCheckedException e) {
-                fail = true;
+            return Collections.singletonList(cursor);
+        }
+        catch (Exception e) {
+            runningQryMgr.unregister(qryId, true);
 
-                throw new IgniteSQLException("Failed to execute DML statement [stmt=" + sqlQry +
-                    ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e);
-            }
-            finally {
-                runningQryMgr.unregister(qryId, fail);
-            }
+            throw new IgniteSQLException("Failed to execute SELECT statement: " + qryDesc.sql(), e);
         }
+    }
+
+    /**
+     * Execute SELECT statement for DML.
+     *
+     * @param schema Schema.
+     * @param selectQry Select query.
+     * @param mvccTracker MVCC tracker.
+     * @param cancel Cancel.
+     * @return Fields query.
+     */
+    private QueryCursorImpl<List<?>> executeSelectForDml(
+        String schema,
+        SqlFieldsQuery selectQry,
+        MvccQueryTracker mvccTracker,
+        GridQueryCancel cancel
+    ) throws IgniteCheckedException {
+        QueryParserResult parseRes = parser.parse(schema, selectQry, false);
+
+        QueryParserResultSelect select = parseRes.select();
 
-        // Execute SQL.
         assert select != null;
 
+        Iterable<List<?>> iter = executeSelect0(
+            parseRes.queryDescriptor(),
+            parseRes.queryParameters(),
+            select,
+            true,
+            mvccTracker,
+            cancel
+        );
+
+        QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(iter, cancel);
+
+        cursor.fieldsMeta(select.meta());
+
+        return cursor;
+    }
+
+    /**
+     * Execute an all-ready {@link SqlFieldsQuery}.
+     *
+     * @param qryDesc Plan key.
+     * @param qryParams Parameters.
+     * @param select Select.
+     * @param keepBinary Whether binary objects must not be deserialized automatically.
+     * @param mvccTracker MVCC tracker.
+     * @param cancel Query cancel state holder.
+     * @return Query result.
+     */
+    private Iterable<List<?>> executeSelect0(
+        QueryDescriptor qryDesc,
+        QueryParameters qryParams,
+        QueryParserResultSelect select,
+        boolean keepBinary,
+        MvccQueryTracker mvccTracker,
+        GridQueryCancel cancel
+    ) throws IgniteCheckedException {
+        boolean autoStartTx = mvccEnabled(ctx) && !qryParams.autoCommit() && tx(ctx) == null;
+
+        Iterable<List<?>> iter;
+
         if (select.splitNeeded()) {
             // Distributed query.
             GridCacheTwoStepQuery twoStepQry = select.twoStepQuery();
 
             assert twoStepQry != null;
 
-            if (ctx.security().enabled())
-                checkSecurity(twoStepQry.cacheIds());
-
-            FieldsQueryCursor<List<?>> res = executeQueryWithSplit(
-                schemaName,
-                qry,
+            iter = executeSelectDistributed(
+                qryDesc,
+                qryParams,
                 twoStepQry,
-                select.meta(),
                 keepBinary,
-                startTx,
+                autoStartTx,
                 mvccTracker,
-                cancel,
-                registerAsNewQry
+                cancel
             );
-
-            return Collections.singletonList(res);
         }
         else {
             // Local query.
-            Long qryId = registerRunningQuery(schemaName, cancel, sqlQry, loc, registerAsNewQry);
+            IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null);
 
-            try {
-                FieldsQueryCursor<List<?>> res = executeQueryLocal(
-                    schemaName,
-                    qry,
-                    select.meta(),
-                    keepBinary,
-                    filter,
-                    cancel,
-                    qryId
-                );
-
-                return Collections.singletonList(res);
-            }
-            catch (IgniteCheckedException e) {
-                runningQryMgr.unregister(qryId, true);
+            GridQueryFieldsResult res = executeSelectLocal(
+                qryDesc,
+                qryParams,
+                select,
+                filter,
+                autoStartTx,
+                mvccTracker,
+                cancel
+            );
 
-                throw new IgniteSQLException("Failed to execute local statement [stmt=" + sqlQry +
-                    ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e);
-            }
+            iter = () -> {
+                try {
+                    return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary);
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            };
         }
+
+        return iter;
     }
 
     /**
-     * @param schemaName Schema name.
+     * Register running query.
+     *
+     * @param qryDesc Query descriptor.
      * @param cancel Query cancel state holder.
-     * @param qry Query.
-     * @param loc {@code true} for local query.
-     * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
      * @return Id of registered query or {@code null} if query wasn't registered.
      */
-    private Long registerRunningQuery(String schemaName, GridQueryCancel cancel, String qry, boolean loc,
-        boolean registerAsNewQry) {
-        if (registerAsNewQry)
-            return runningQryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, loc, cancel);
-
-        return null;
+    private Long registerRunningQuery(QueryDescriptor qryDesc, GridQueryCancel cancel) {
+        return runningQryMgr.register(
+            qryDesc.sql(),
+            GridCacheQueryType.SQL_FIELDS,
+            qryDesc.schemaName(),
+            qryDesc.local(),
+            cancel
+        );
     }
 
     /**
@@ -1479,18 +1507,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     }
 
-    /**
-     * @param qry Sql fields query.autoStartTx(qry)
-     * @return {@code True} if need to start transaction.
-     */
-    @SuppressWarnings("SimplifiableIfStatement")
-    private boolean autoStartTx(SqlFieldsQuery qry) {
-        if (!mvccEnabled(ctx))
-            return false;
-
-        return qry instanceof SqlFieldsQueryEx && !((SqlFieldsQueryEx)qry).isAutoCommit() && tx(ctx) == null;
-    }
-
     /** {@inheritDoc} */
     @Override public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(
         GridCacheContext<?, ?> cctx,
@@ -1549,44 +1565,40 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         // Force keepBinary for operation context to avoid binary deserialization inside entry processor
         DmlUtils.setKeepBinaryContext(planCctx);
 
+        SqlFieldsQuery selectFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fldsQry.isCollocated())
+            .setArgs(fldsQry.getArgs())
+            .setDistributedJoins(fldsQry.isDistributedJoins())
+            .setEnforceJoinOrder(fldsQry.isEnforceJoinOrder())
+            .setLocal(fldsQry.isLocal())
+            .setPageSize(fldsQry.getPageSize())
+            .setTimeout(fldsQry.getTimeout(), TimeUnit.MILLISECONDS)
+            .setDataPageScanEnabled(fldsQry.isDataPageScanEnabled());
+
         QueryCursorImpl<List<?>> cur;
 
         // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
         // sub-query and not some dummy stuff like "select 1, 2, 3;"
         if (!loc && !plan.isLocalSubquery()) {
-            SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fldsQry.isCollocated())
-                .setArgs(fldsQry.getArgs())
-                .setDistributedJoins(fldsQry.isDistributedJoins())
-                .setEnforceJoinOrder(fldsQry.isEnforceJoinOrder())
-                .setLocal(fldsQry.isLocal())
-                .setPageSize(fldsQry.getPageSize())
-                .setTimeout(fldsQry.getTimeout(), TimeUnit.MILLISECONDS)
-                .setDataPageScanEnabled(fldsQry.isDataPageScanEnabled());
-
-            cur = (QueryCursorImpl<List<?>>)querySqlFields(
+            cur = executeSelectForDml(
                 schema,
-                newFieldsQry,
-                null,
-                true,
-                true,
+                selectFieldsQry,
                 new StaticMvccQueryTracker(planCctx, mvccSnapshot),
-                cancel,
-                false
-            ).get(0);
+                cancel
+            );
         }
         else {
-            GridQueryFieldsResult res = executeQueryLocal0(
-                schema,
-                plan.selectQuery(),
-                F.asList(fldsQry.getArgs()),
-                null,
+            selectFieldsQry.setLocal(true);
+
+            QueryParserResult selectParseRes = parser.parse(schema, selectFieldsQry, false);
+
+            GridQueryFieldsResult res = executeSelectLocal(
+                selectParseRes.queryDescriptor(),
+                selectParseRes.queryParameters(),
+                selectParseRes.select(),
                 filter,
-                fldsQry.isEnforceJoinOrder(),
                 false,
-                fldsQry.getTimeout(),
-                cancel,
                 new StaticMvccQueryTracker(planCctx, mvccSnapshot),
-                null
+                cancel
             );
 
             cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@@ -1607,88 +1619,73 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /**
      * Run distributed query on detected set of partitions.
-     * @param schemaName Schema name.
-     * @param qry Original query.
+     *
+     * @param qryDesc Query descriptor.
+     * @param qryParams Query parameters.
      * @param twoStepQry Two-step query.
-     * @param meta Metadata to set to cursor.
      * @param keepBinary Keep binary flag.
-     * @param startTx Start transaction flag.
+     * @param autoStartTx Start transaction flag.
      * @param mvccTracker Query tracker.
      * @param cancel Cancel handler.
-     * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
      * @return Cursor representing distributed query result.
      */
-    private FieldsQueryCursor<List<?>> executeQueryWithSplit(String schemaName, SqlFieldsQuery qry,
-        GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, boolean keepBinary,
-        boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel, boolean registerAsNewQry) {
-        if (log.isDebugEnabled())
-            log.debug("Parsed query: `" + qry.getSql() + "` into two step query: " + twoStepQry);
-
-        if (cancel == null)
-            cancel = new GridQueryCancel();
-
-        Long qryId = registerRunningQuery(schemaName, cancel, qry.getSql(), qry.isLocal(), registerAsNewQry);
-
-        boolean cursorCreated = false;
-        boolean failed = true;
-
-        try {
-            // When explicit partitions are set, there must be an owning cache they should be applied to.
-            int explicitParts[] = qry.getPartitions();
-            PartitionResult derivedParts = twoStepQry.derivedPartitions();
-
-            int parts[] = PartitionResult.calculatePartitions(explicitParts, derivedParts, qry.getArgs());
+    @SuppressWarnings("IfMayBeConditional")
+    private Iterable<List<?>> executeSelectDistributed(
+        QueryDescriptor qryDesc,
+        QueryParameters qryParams,
+        GridCacheTwoStepQuery twoStepQry,
+        boolean keepBinary,
+        boolean autoStartTx,
+        MvccQueryTracker mvccTracker,
+        GridQueryCancel cancel
+    ) {
+        // When explicit partitions are set, there must be an owning cache they should be applied to.
+        PartitionResult derivedParts = twoStepQry.derivedPartitions();
 
-            if (parts != null && parts.length == 0) {
-                failed = false;
+        int parts[] = PartitionResult.calculatePartitions(
+            qryParams.partitions(),
+            derivedParts,
+            qryParams.arguments()
+        );
 
-                return new QueryCursorImpl<>(new Iterable<List<?>>() {
-                    @SuppressWarnings("NullableProblems")
-                    @Override public Iterator<List<?>> iterator() {
-                        return new Iterator<List<?>>() {
-                            @Override public boolean hasNext() {
-                                return false;
-                            }
+        Iterable<List<?>> iter;
 
-                            @SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException")
-                            @Override public List<?> next() {
-                                return null;
-                            }
-                        };
-                    }
-                });
-            }
+        if (parts != null && parts.length == 0) {
+            iter = new Iterable<List<?>>() {
+                @SuppressWarnings("NullableProblems")
+                @Override public Iterator<List<?>> iterator() {
+                    return new Iterator<List<?>>() {
+                        @Override public boolean hasNext() {
+                            return false;
+                        }
 
-            Iterable<List<?>> iter = runQueryTwoStep(
-                schemaName,
+                        @SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException")
+                        @Override public List<?> next() {
+                            return null;
+                        }
+                    };
+                }
+            };
+        }
+        else {
+            iter = runQueryTwoStep(
+                qryDesc.schemaName(),
                 twoStepQry,
                 keepBinary,
-                qry.isEnforceJoinOrder(),
-                startTx,
-                qry.getTimeout(),
+                qryDesc.enforceJoinOrder(),
+                autoStartTx,
+                qryParams.timeout(),
                 cancel,
-                qry.getArgs(),
+                qryParams.arguments(),
                 parts,
-                qry.isLazy(),
+                qryParams.lazy(),
                 mvccTracker,
-                qry.isDataPageScanEnabled(),
-                qry.getPageSize()
+                qryParams.dataPageScanEnabled(),
+                qryParams.pageSize()
             );
-
-            QueryCursorImpl<List<?>> cursor = registerAsNewQry
-                ? new RegisteredQueryCursor<>(iter, cancel, runningQueryManager(), qryId)
-                : new QueryCursorImpl<>(iter, cancel);
-
-            cursor.fieldsMeta(meta);
-
-            cursorCreated = true;
-
-            return cursor;
-        }
-        finally {
-            if (!cursorCreated)
-                runningQryMgr.unregister(qryId, failed);
         }
+
+        return iter;
     }
 
     /**
@@ -1733,7 +1730,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         assert dml != null;
 
-        return executeUpdate(schemaName, dml, qry, loc, filter, cancel);
+        return executeUpdate(
+            parseRes.queryDescriptor(),
+            parseRes.queryParameters(),
+            dml,
+            loc,
+            filter,
+            cancel
+        );
     }
 
     /**
@@ -2260,26 +2264,24 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * @param schemaName Schema.
+     * @param qryDesc Query descriptor.
+     * @param qryParams Query parameters.
      * @param dml DML statement.
-     * @param fieldsQry Initial query
      * @param cancel Query cancel.
      * @return Update result wrapped into {@link GridQueryFieldsResult}
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings("unchecked")
     private List<QueryCursorImpl<List<?>>> executeUpdateDistributed(
-        String schemaName,
+        QueryDescriptor qryDesc,
+        QueryParameters qryParams,
         QueryParserResultDml dml,
-        SqlFieldsQuery fieldsQry,
         GridQueryCancel cancel
     ) throws IgniteCheckedException {
-        if (DmlUtils.isBatched(fieldsQry)) {
-            SqlFieldsQueryEx fieldsQry0 = (SqlFieldsQueryEx)fieldsQry;
-
+        if (qryDesc.batched()) {
             Collection<UpdateResult> ress;
 
-            List<Object[]> argss = fieldsQry0.batchedArguments();
+            List<Object[]> argss = qryParams.batchedArguments();
 
             UpdatePlan plan = dml.plan();
 
@@ -2292,7 +2294,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 try {
                     List<List<List<?>>> cur = plan.createRows(argss);
 
-                    ress = DmlUtils.processSelectResultBatched(plan, cur, fieldsQry0.getPageSize());
+                    ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.pageSize());
                 }
                 finally {
                     DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
@@ -2309,15 +2311,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 int cntr = 0;
 
                 for (Object[] args : argss) {
-                    SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)fieldsQry0.copy();
-
-                    qry0.clearBatchedArgs();
-                    qry0.setArgs(args);
-
                     UpdateResult res;
 
                     try {
-                        res = executeUpdate(schemaName, dml, qry0, false, null, cancel);
+                        res = executeUpdate(
+                            qryDesc,
+                            qryParams.toSingleBatchedArguments(args),
+                            dml,
+                            false,
+                            null,
+                            cancel
+                        );
 
                         cntPerRow[cntr++] = (int)res.counter();
 
@@ -2356,7 +2360,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             return resCurs;
         }
         else {
-            UpdateResult res = executeUpdate(schemaName, dml, fieldsQry, false, null, cancel);
+            UpdateResult res = executeUpdate(
+                qryDesc,
+                qryParams,
+                dml,
+                false,
+                null,
+                cancel
+            );
 
             res.throwIfError();
 
@@ -2372,19 +2383,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
      *
-     * @param schemaName Schema.
+     * @param qryDesc Query descriptor.
+     * @param qryParams Query parameters.
      * @param dml DML command.
-     * @param fieldsQry Original query.
      * @param loc Query locality flag.
      * @param filters Cache name and key filter.
      * @param cancel Cancel.
      * @return Update result (modified items count and failed keys).
      * @throws IgniteCheckedException if failed.
      */
+    @SuppressWarnings("IfMayBeConditional")
     private UpdateResult executeUpdate(
-        String schemaName,
+        QueryDescriptor qryDesc,
+        QueryParameters qryParams,
         QueryParserResultDml dml,
-        SqlFieldsQuery fieldsQry,
         boolean loc,
         IndexingQueryFilter filters,
         GridQueryCancel cancel
@@ -2393,20 +2405,39 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         long items = 0;
 
-        UpdatePlan plan = dml.plan();
+        GridCacheContext<?, ?> cctx = dml.plan().cacheContext();
+
+        boolean transactional = cctx != null && cctx.mvccEnabled();
 
-        GridCacheContext<?, ?> cctx = plan.cacheContext();
+        int maxRetryCnt = transactional ? 1 : DFLT_UPDATE_RERUN_ATTEMPTS;
 
-        for (int i = 0; i < DFLT_UPDATE_RERUN_ATTEMPTS; i++) {
-            CacheOperationContext opCtx = DmlUtils.setKeepBinaryContext(cctx);
+        for (int i = 0; i < maxRetryCnt; i++) {
+            CacheOperationContext opCtx = cctx != null ? DmlUtils.setKeepBinaryContext(cctx) : null;
 
             UpdateResult r;
 
             try {
-                r = executeUpdate0(schemaName, plan, fieldsQry, loc, filters, cancel);
+                if (transactional)
+                    r = executeUpdateTransactional(
+                        qryDesc,
+                        qryParams,
+                        dml,
+                        loc,
+                        cancel
+                    );
+                else
+                    r = executeUpdateNonTransactional(
+                        qryDesc,
+                        qryParams,
+                        dml,
+                        loc,
+                        filters,
+                        cancel
+                    );
             }
             finally {
-                DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
+                if (opCtx != null)
+                    DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
             }
 
             items += r.counter();
@@ -2427,185 +2458,65 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * Actually perform SQL DML operation locally.
+     * Execute update in non-transactional mode.
      *
-     * @param schemaName Schema name.
-     * @param plan Cache context.
-     * @param fieldsQry Fields query.
-     * @param loc Local query flag.
-     * @param filters Cache name and key filter.
-     * @param cancel Query cancel state holder.
-     * @return Pair [number of successfully processed items; keys that have failed to be processed]
-     * @throws IgniteCheckedException if failed.
+     * @param qryDesc Query descriptor.
+     * @param qryParams Query parameters.
+     * @param dml Plan.
+     * @param loc Local flag.
+     * @param filters Filters.
+     * @param cancel Cancel hook.
+     * @return Update result.
+     * @throws IgniteCheckedException If failed.
      */
-    @SuppressWarnings({"ConstantConditions"})
-    private UpdateResult executeUpdate0(
-        String schemaName,
-        final UpdatePlan plan,
-        SqlFieldsQuery fieldsQry,
+    private UpdateResult executeUpdateNonTransactional(
+        QueryDescriptor qryDesc,
+        QueryParameters qryParams,
+        QueryParserResultDml dml,
         boolean loc,
         IndexingQueryFilter filters,
         GridQueryCancel cancel
     ) throws IgniteCheckedException {
-        GridCacheContext cctx = plan.cacheContext();
-
-        DmlDistributedPlanInfo distributedPlan = loc ? null : plan.distributedPlan();
-
-        if (cctx != null && cctx.mvccEnabled()) {
-            assert cctx.transactional();
-
-            GridNearTxLocal tx = tx(ctx);
-
-            boolean implicit = (tx == null);
-
-            boolean commit = implicit && (!(fieldsQry instanceof SqlFieldsQueryEx) ||
-                ((SqlFieldsQueryEx)fieldsQry).isAutoCommit());
-
-            if (implicit)
-                tx = txStart(cctx, fieldsQry.getTimeout());
-
-            requestSnapshot(tx);
-
-            try (GridNearTxLocal toCommit = commit ? tx : null) {
-                long timeout = implicit
-                    ? tx.remainingTime()
-                    : operationTimeout(fieldsQry.getTimeout(), tx);
-
-                if (cctx.isReplicated() || distributedPlan == null || ((plan.mode() == UpdateMode.INSERT
-                    || plan.mode() == UpdateMode.MERGE) && !plan.isLocalSubquery())) {
-
-                    boolean sequential = true;
-
-                    UpdateSourceIterator<?> it;
-
-                    if (plan.fastResult()) {
-                        IgniteBiTuple row = plan.getFastRow(fieldsQry.getArgs());
-
-                        EnlistOperation op = UpdatePlan.enlistOperation(plan.mode());
-
-                        it = new DmlUpdateSingleEntryIterator<>(op, op.isDeleteOrLock() ? row.getKey() : row);
-                    }
-                    else if (plan.hasRows())
-                        it = new DmlUpdateResultsIterator(UpdatePlan.enlistOperation(plan.mode()), plan, plan.createRows(fieldsQry.getArgs()));
-                    else {
-                        // TODO IGNITE-8865 if there is no ORDER BY statement it's no use to retain entries order on locking (sequential = false).
-                        SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
-                            .setArgs(fieldsQry.getArgs())
-                            .setDistributedJoins(fieldsQry.isDistributedJoins())
-                            .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
-                            .setLocal(fieldsQry.isLocal())
-                            .setPageSize(fieldsQry.getPageSize())
-                            .setTimeout((int)timeout, TimeUnit.MILLISECONDS)
-                            .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
-
-                        FieldsQueryCursor<List<?>> cur = querySqlFields(schemaName, newFieldsQry, null,
-                            true, true, MvccUtils.mvccTracker(cctx, tx), cancel, false).get(0);
-
-                        it = plan.iteratorForTransaction(connMgr, cur);
-                    }
-
-                    IgniteInternalFuture<Long> fut = tx.updateAsync(cctx, it,
-                        fieldsQry.getPageSize(), timeout, sequential);
-
-                    UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
-
-                    if (commit)
-                        toCommit.commit();
-
-                    return res;
-                }
-
-                int[] ids = U.toIntArray(distributedPlan.getCacheIds());
-
-                int flags = 0;
-
-                if (fieldsQry.isEnforceJoinOrder())
-                    flags |= GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
-
-                if (distributedPlan.isReplicatedOnly())
-                    flags |= GridH2QueryRequest.FLAG_REPLICATED;
-
-                flags = GridH2QueryRequest.setDataPageScanEnabled(flags,
-                    fieldsQry.isDataPageScanEnabled());
-
-                int[] parts = PartitionResult.calculatePartitions(
-                    fieldsQry.getPartitions(),
-                    distributedPlan.derivedPartitions(),
-                    fieldsQry.getArgs());
-
-                if (parts != null && parts.length == 0)
-                    return new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
-                else {
-                    IgniteInternalFuture<Long> fut = tx.updateAsync(
-                        cctx,
-                        ids,
-                        parts,
-                        schemaName,
-                        fieldsQry.getSql(),
-                        fieldsQry.getArgs(),
-                        flags,
-                        fieldsQry.getPageSize(),
-                        timeout);
-
-                    UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
-
-                    if (commit)
-                        toCommit.commit();
-
-                    return res;
-                }
-            }
-            catch (ClusterTopologyServerNotFoundException e) {
-                throw new CacheServerNotFoundException(e.getMessage(), e);
-            }
-            catch (IgniteCheckedException e) {
-                IgniteSQLException sqlEx = X.cause(e, IgniteSQLException.class);
-
-                if(sqlEx != null)
-                    throw sqlEx;
-
-                Exception ex = IgniteUtils.convertExceptionNoWrap(e);
-
-                if (ex instanceof IgniteException)
-                    throw (IgniteException)ex;
-
-                U.error(log, "Error during update [localNodeId=" + ctx.localNodeId() + "]", ex);
-
-                throw new IgniteSQLException("Failed to run update. " + ex.getMessage(), ex);
-            }
-            finally {
-                if (commit)
-                    cctx.tm().resetContext();
-            }
-        }
+        UpdatePlan plan = dml.plan();
 
-        UpdateResult fastUpdateRes = plan.processFast(fieldsQry.getArgs());
+        UpdateResult fastUpdateRes = plan.processFast(qryParams.arguments());
 
         if (fastUpdateRes != null)
             return fastUpdateRes;
 
+        DmlDistributedPlanInfo distributedPlan = loc ? null : plan.distributedPlan();
+
         if (distributedPlan != null) {
             if (cancel == null)
                 cancel = new GridQueryCancel();
 
             UpdateResult result = rdcQryExec.update(
-                schemaName,
+                qryDesc.schemaName(),
                 distributedPlan.getCacheIds(),
-                fieldsQry.getSql(),
-                fieldsQry.getArgs(),
-                fieldsQry.isEnforceJoinOrder(),
-                fieldsQry.getPageSize(),
-                fieldsQry.getTimeout(),
-                fieldsQry.getPartitions(),
+                qryDesc.sql(),
+                qryParams.arguments(),
+                qryDesc.enforceJoinOrder(),
+                qryParams.pageSize(),
+                qryParams.timeout(),
+                qryParams.partitions(),
                 distributedPlan.isReplicatedOnly(),
                 cancel
             );
 
-            // null is returned in case not all nodes support distributed DML.
+            // Null is returned in case not all nodes support distributed DML.
             if (result != null)
                 return result;
         }
 
+        SqlFieldsQuery selectFieldsQry = new SqlFieldsQuery(plan.selectQuery(), qryDesc.collocated())
+            .setArgs(qryParams.arguments())
+            .setDistributedJoins(qryDesc.distributedJoins())
+            .setEnforceJoinOrder(qryDesc.enforceJoinOrder())
+            .setLocal(qryDesc.local())
+            .setPageSize(qryParams.pageSize())
+            .setTimeout(qryParams.timeout(), TimeUnit.MILLISECONDS)
+            .setDataPageScanEnabled(qryParams.dataPageScanEnabled());
+
         Iterable<List<?>> cur;
 
         // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
@@ -2613,32 +2524,28 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (!loc && !plan.isLocalSubquery()) {
             assert !F.isEmpty(plan.selectQuery());
 
-            SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
-                .setArgs(fieldsQry.getArgs())
-                .setDistributedJoins(fieldsQry.isDistributedJoins())
-                .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
-                .setLocal(fieldsQry.isLocal())
-                .setPageSize(fieldsQry.getPageSize())
-                .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS)
-                .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
-
-            cur = querySqlFields(schemaName, newFieldsQry, null, true, true, null, cancel, false).get(0);
+            cur = executeSelectForDml(
+                qryDesc.schemaName(),
+                selectFieldsQry,
+                null,
+                cancel
+            );
         }
         else if (plan.hasRows())
-            cur = plan.createRows(fieldsQry.getArgs());
+            cur = plan.createRows(qryParams.arguments());
         else {
-            final GridQueryFieldsResult res = executeQueryLocal0(
-                schemaName,
-                plan.selectQuery(),
-                F.asList(fieldsQry.getArgs()),
-                null,
+            selectFieldsQry.setLocal(true);
+
+            QueryParserResult selectParseRes = parser.parse(qryDesc.schemaName(), selectFieldsQry, false);
+
+            final GridQueryFieldsResult res = executeSelectLocal(
+                selectParseRes.queryDescriptor(),
+                selectParseRes.queryParameters(),
+                selectParseRes.select(),
                 filters,
-                fieldsQry.isEnforceJoinOrder(),
                 false,
-                fieldsQry.getTimeout(),
-                cancel,
                 null,
-                null
+                cancel
             );
 
             cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@@ -2654,8 +2561,176 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             }, cancel);
         }
 
-        int pageSize = loc ? 0 : fieldsQry.getPageSize();
+        int pageSize = loc ? 0 : qryParams.pageSize();
 
         return DmlUtils.processSelectResult(plan, cur, pageSize);
     }
+
+    /**
+     * Execute update in transactional mode.
+     *
+     * @param qryDesc Query descriptor.
+     * @param qryParams Query parameters.
+     * @param dml Plan.
+     * @param loc Local flag.
+     * @param cancel Cancel hook.
+     * @return Update result.
+     * @throws IgniteCheckedException If failed.
+     */
+    private UpdateResult executeUpdateTransactional(
+        QueryDescriptor qryDesc,
+        QueryParameters qryParams,
+        QueryParserResultDml dml,
+        boolean loc,
+        GridQueryCancel cancel
+    ) throws IgniteCheckedException {
+        UpdatePlan plan = dml.plan();
+
+        GridCacheContext cctx = plan.cacheContext();
+
+        assert cctx != null;
+        assert cctx.transactional();
+
+        GridNearTxLocal tx = tx(ctx);
+
+        boolean implicit = (tx == null);
+
+        boolean commit = implicit && qryParams.autoCommit();
+
+        if (implicit)
+            tx = txStart(cctx, qryParams.timeout());
+
+        requestSnapshot(tx);
+
+        try (GridNearTxLocal toCommit = commit ? tx : null) {
+            DmlDistributedPlanInfo distributedPlan = loc ? null : plan.distributedPlan();
+
+            long timeout = implicit
+                ? tx.remainingTime()
+                : operationTimeout(qryParams.timeout(), tx);
+
+            if (cctx.isReplicated() || distributedPlan == null || ((plan.mode() == UpdateMode.INSERT
+                || plan.mode() == UpdateMode.MERGE) && !plan.isLocalSubquery())) {
+
+                boolean sequential = true;
+
+                UpdateSourceIterator<?> it;
+
+                if (plan.fastResult()) {
+                    IgniteBiTuple row = plan.getFastRow(qryParams.arguments());
+
+                    assert row != null;
+
+                    EnlistOperation op = UpdatePlan.enlistOperation(plan.mode());
+
+                    it = new DmlUpdateSingleEntryIterator<>(op, op.isDeleteOrLock() ? row.getKey() : row);
+                }
+                else if (plan.hasRows()) {
+                    it = new DmlUpdateResultsIterator(
+                        UpdatePlan.enlistOperation(plan.mode()),
+                        plan,
+                        plan.createRows(qryParams.arguments())
+                    );
+                }
+                else {
+                    SqlFieldsQuery selectFieldsQry = new SqlFieldsQuery(plan.selectQuery(), qryDesc.collocated())
+                        .setArgs(qryParams.arguments())
+                        .setDistributedJoins(qryDesc.distributedJoins())
+                        .setEnforceJoinOrder(qryDesc.enforceJoinOrder())
+                        .setLocal(qryDesc.local())
+                        .setPageSize(qryParams.pageSize())
+                        .setTimeout((int)timeout, TimeUnit.MILLISECONDS)
+                        .setDataPageScanEnabled(qryParams.dataPageScanEnabled());
+
+                    FieldsQueryCursor<List<?>> cur = executeSelectForDml(
+                        qryDesc.schemaName(),
+                        selectFieldsQry,
+                        MvccUtils.mvccTracker(cctx, tx),
+                        cancel
+                    );
+
+                    it = plan.iteratorForTransaction(connMgr, cur);
+                }
+
+                IgniteInternalFuture<Long> fut = tx.updateAsync(
+                    cctx,
+                    it,
+                    qryParams.pageSize(),
+                    timeout,
+                    sequential
+                );
+
+                UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
+
+                if (commit)
+                    toCommit.commit();
+
+                return res;
+            }
+
+            int[] ids = U.toIntArray(distributedPlan.getCacheIds());
+
+            int flags = 0;
+
+            if (qryDesc.enforceJoinOrder())
+                flags |= GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
+
+            if (distributedPlan.isReplicatedOnly())
+                flags |= GridH2QueryRequest.FLAG_REPLICATED;
+
+            flags = GridH2QueryRequest.setDataPageScanEnabled(flags,
+                qryParams.dataPageScanEnabled());
+
+            int[] parts = PartitionResult.calculatePartitions(
+                qryParams.partitions(),
+                distributedPlan.derivedPartitions(),
+                qryParams.arguments()
+            );
+
+            if (parts != null && parts.length == 0)
+                return new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
+            else {
+                IgniteInternalFuture<Long> fut = tx.updateAsync(
+                    cctx,
+                    ids,
+                    parts,
+                    qryDesc.schemaName(),
+                    qryDesc.sql(),
+                    qryParams.arguments(),
+                    flags,
+                    qryParams.pageSize(),
+                    timeout
+                );
+
+                UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
+
+                if (commit)
+                    toCommit.commit();
+
+                return res;
+            }
+        }
+        catch (ClusterTopologyServerNotFoundException e) {
+            throw new CacheServerNotFoundException(e.getMessage(), e);
+        }
+        catch (IgniteCheckedException e) {
+            IgniteSQLException sqlEx = X.cause(e, IgniteSQLException.class);
+
+            if(sqlEx != null)
+                throw sqlEx;
+
+            Exception ex = IgniteUtils.convertExceptionNoWrap(e);
+
+            if (ex instanceof IgniteException)
+                throw (IgniteException)ex;
+
+            U.error(log, "Error during update [localNodeId=" + ctx.localNodeId() + "]", ex);
+
+            throw new IgniteSQLException("Failed to run update. " + ex.getMessage(), ex);
+        }
+        finally {
+            if (commit)
+                cctx.tm().resetContext();
+        }
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserCacheKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java
similarity index 52%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserCacheKey.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java
index 21bf405..2b1ceaf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserCacheKey.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.query.h2;
 /**
  * Key for cached two-step query.
  */
-public class QueryParserCacheKey {
+public class QueryDescriptor {
     /** */
     private final String schemaName;
 
@@ -28,7 +28,7 @@ public class QueryParserCacheKey {
     private final String sql;
 
     /** */
-    private final boolean grpByCollocated;
+    private final boolean collocated;
 
     /** */
     private final boolean distributedJoins;
@@ -37,28 +37,97 @@ public class QueryParserCacheKey {
     private final boolean enforceJoinOrder;
 
     /** */
-    private final boolean isLocal;
+    private final boolean loc;
+
+    /** Skip reducer on update flag. */
+    private final boolean skipReducerOnUpdate;
+
+    /** Batched flag. */
+    private final boolean batched;
 
     /**
      * @param schemaName Schema name.
      * @param sql Sql.
-     * @param grpByCollocated Collocated GROUP BY.
+     * @param collocated Collocated GROUP BY.
      * @param distributedJoins Distributed joins enabled.
      * @param enforceJoinOrder Enforce join order of tables.
-     * @param isLocal Query is local flag.
+     * @param loc Query is local flag.
+     * @param skipReducerOnUpdate Skip reducer on update flag.
      */
-    QueryParserCacheKey(String schemaName,
+    QueryDescriptor(
+        String schemaName,
         String sql,
-        boolean grpByCollocated,
+        boolean collocated,
         boolean distributedJoins,
         boolean enforceJoinOrder,
-        boolean isLocal) {
+        boolean loc,
+        boolean skipReducerOnUpdate,
+        boolean batched
+    ) {
         this.schemaName = schemaName;
         this.sql = sql;
-        this.grpByCollocated = grpByCollocated;
+        this.collocated = collocated;
         this.distributedJoins = distributedJoins;
         this.enforceJoinOrder = enforceJoinOrder;
-        this.isLocal = isLocal;
+        this.loc = loc;
+        this.skipReducerOnUpdate = skipReducerOnUpdate;
+        this.batched = batched;
+    }
+
+    /**
+     * @return Schema name.
+     */
+    public String schemaName() {
+        return schemaName;
+    }
+
+    /**
+     * @return SQL.
+     */
+    public String sql() {
+        return sql;
+    }
+
+    /**
+     * @return Collocated GROUP BY flag.
+     */
+    public boolean collocated() {
+        return collocated;
+    }
+
+    /**
+     * @return Distributed joins flag.
+     */
+    public boolean distributedJoins() {
+        return distributedJoins;
+    }
+
+    /**
+     * @return Enforce join order flag.
+     */
+    public boolean enforceJoinOrder() {
+        return enforceJoinOrder;
+    }
+
+    /**
+     * @return Local flag.
+     */
+    public boolean local() {
+        return loc;
+    }
+
+    /**
+     * @return Skip reducer on update flag.
+     */
+    public boolean skipReducerOnUpdate() {
+        return skipReducerOnUpdate;
+    }
+
+    /**
+     * @return Batched flag.
+     */
+    public boolean batched() {
+        return batched;
     }
 
     /** {@inheritDoc} */
@@ -70,9 +139,9 @@ public class QueryParserCacheKey {
         if (o == null || getClass() != o.getClass())
             return false;
 
-        QueryParserCacheKey that = (QueryParserCacheKey)o;
+        QueryDescriptor that = (QueryDescriptor)o;
 
-        if (grpByCollocated != that.grpByCollocated)
+        if (collocated != that.collocated)
             return false;
 
         if (distributedJoins != that.distributedJoins)
@@ -81,20 +150,31 @@ public class QueryParserCacheKey {
         if (enforceJoinOrder != that.enforceJoinOrder)
             return false;
 
+        if (skipReducerOnUpdate != that.skipReducerOnUpdate)
+            return false;
+
+        if (batched != that.batched)
+            return false;
+
         if (schemaName != null ? !schemaName.equals(that.schemaName) : that.schemaName != null)
             return false;
 
-        return isLocal == that.isLocal && sql.equals(that.sql);
+        return loc == that.loc && sql.equals(that.sql);
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("AssignmentReplaceableWithOperatorAssignment")
     @Override public int hashCode() {
         int res = schemaName != null ? schemaName.hashCode() : 0;
+
         res = 31 * res + sql.hashCode();
-        res = 31 * res + (grpByCollocated ? 1 : 0);
+        res = 31 * res + (collocated ? 1 : 0);
+
         res = res + (distributedJoins ? 2 : 0);
         res = res + (enforceJoinOrder ? 4 : 0);
-        res = res + (isLocal ? 8 : 0);
+        res = res + (loc ? 8 : 0);
+        res = res + (skipReducerOnUpdate ? 16 : 0);
+        res = res + (batched ? 32 : 0);
 
         return res;
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java
new file mode 100644
index 0000000..72c0626
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.processors.query.NestedTxMode;
+
+import java.util.List;
+
+/**
+ * Query parameters which vary between requests having the same execution plan. Essentially, these are the arguments
+ * of original {@link org.apache.ignite.cache.query.SqlFieldsQuery} which are not part of {@link QueryDescriptor}.
+ */
+public class QueryParameters {
+    /** Arguments. */
+    private final Object[] args;
+
+    /** Partitions. */
+    private final int[] parts;
+
+    /** Timeout. */
+    private final int timeout;
+
+    /** Lazy flag. */
+    private final boolean lazy;
+
+    /** Page size. */
+    private final int pageSize;
+
+    /** Data page scan enabled flag. */
+    private final Boolean dataPageScanEnabled;
+
+    /** Nexted transactional mode. */
+    private final NestedTxMode nestedTxMode;
+
+    /** Auto-commit flag. */
+    private final boolean autoCommit;
+
+    /** Batched arguments. */
+    private final List<Object[]> batchedArgs;
+
+    /**
+     * Create parameters from query.
+     *
+     * @param qry Query.
+     * @return Parameters.
+     */
+    public static QueryParameters fromQuery(SqlFieldsQuery qry) {
+        NestedTxMode nestedTxMode = NestedTxMode.DEFAULT;
+        boolean autoCommit = true;
+        List<Object[]> batchedArgs = null;
+
+        if (qry instanceof SqlFieldsQueryEx) {
+            SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)qry;
+
+            if (qry0.getNestedTxMode() != null)
+                nestedTxMode = qry0.getNestedTxMode();
+
+            autoCommit = qry0.isAutoCommit();
+
+            batchedArgs = qry0.batchedArguments();
+        }
+
+        return new QueryParameters(
+            qry.getArgs(),
+            qry.getPartitions(),
+            qry.getTimeout(),
+            qry.isLazy(),
+            qry.getPageSize(),
+            qry.isDataPageScanEnabled(),
+            nestedTxMode,
+            autoCommit,
+            batchedArgs
+        );
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param args Arguments.
+     * @param parts Partitions.
+     * @param timeout Timeout.
+     * @param lazy Lazy flag.
+     * @param pageSize Page size.
+     * @param dataPageScanEnabled Data page scan enabled flag.
+     * @param nestedTxMode Nested TX mode.
+     * @param autoCommit Auto-commit flag.
+     * @param batchedArgs Batched arguments.
+     */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    private QueryParameters(
+        Object[] args,
+        int[] parts,
+        int timeout,
+        boolean lazy,
+        int pageSize,
+        Boolean dataPageScanEnabled,
+        NestedTxMode nestedTxMode,
+        boolean autoCommit,
+        List<Object[]> batchedArgs
+    ) {
+        this.args = args;
+        this.parts = parts;
+        this.timeout = timeout;
+        this.lazy = lazy;
+        this.pageSize = pageSize;
+        this.dataPageScanEnabled = dataPageScanEnabled;
+        this.nestedTxMode = nestedTxMode;
+        this.autoCommit = autoCommit;
+        this.batchedArgs = batchedArgs;
+    }
+
+    /**
+     * @return Arguments.
+     */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    public Object[] arguments() {
+        return args;
+    }
+
+    /**
+     * @return Partitions.
+     */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    public int[] partitions() {
+        return parts;
+    }
+
+    /**
+     * @return Timeout.
+     */
+    public int timeout() {
+        return timeout;
+    }
+
+    /**
+     * @return Lazy flag.
+     */
+    public boolean lazy() {
+        return lazy;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+
+    /**
+     * @return Data page scan enabled flag.
+     */
+    public Boolean dataPageScanEnabled() {
+        return dataPageScanEnabled;
+    }
+
+    /**
+     * @return Nested TX mode.
+     */
+    public NestedTxMode nestedTxMode() {
+        return nestedTxMode;
+    }
+
+    /**
+     * @return Auto-commit flag.
+     */
+    public boolean autoCommit() {
+        return autoCommit;
+    }
+
+    /**
+     * @return Batched arguments.
+     */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    public List<Object[]> batchedArguments() {
+        return batchedArgs;
+    }
+
+    /**
+     * Convert current batched arguments to a form with single arguments.
+     *
+     * @param args Arguments.
+     * @return Result.
+     */
+    public QueryParameters toSingleBatchedArguments(Object[] args) {
+        return new QueryParameters(
+            args,
+            this.parts,
+            this.timeout,
+            this.lazy,
+            this.pageSize,
+            this.dataPageScanEnabled,
+            this.nestedTxMode,
+            this.autoCommit,
+            null
+        );
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
index 6ddea41..cded1fe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlAstUtils;
-import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -94,7 +93,7 @@ public class QueryParser {
     private final IgniteLogger log;
 
     /** */
-    private volatile GridBoundedConcurrentLinkedHashMap<QueryParserCacheKey, QueryParserCacheEntry> cache =
+    private volatile GridBoundedConcurrentLinkedHashMap<QueryDescriptor, QueryParserCacheEntry> cache =
         new GridBoundedConcurrentLinkedHashMap<>(CACHE_SIZE);
 
     /**
@@ -135,33 +134,32 @@ public class QueryParser {
      * @return Parsing result that contains Parsed leading query and remaining sql script.
      */
     private QueryParserResult parse0(String schemaName, SqlFieldsQuery qry, boolean remainingAllowed) {
-        // First, let's check if we already have a two-step query for this statement...
-        QueryParserCacheKey cachedKey = new QueryParserCacheKey(
-            schemaName,
-            qry.getSql(),
-            qry.isCollocated(),
-            qry.isDistributedJoins(),
-            qry.isEnforceJoinOrder(),
-            qry.isLocal()
-        );
+        QueryDescriptor qryDesc = queryDescriptor(schemaName, qry);
 
-        QueryParserCacheEntry cached = cache.get(cachedKey);
+        QueryParserCacheEntry cached = cache.get(qryDesc);
 
         if (cached != null) 
-            return new QueryParserResult(qry, null, cached.select(), cached.dml(), cached.command());
+            return new QueryParserResult(
+                qryDesc,
+                QueryParameters.fromQuery(qry),
+                null,
+                cached.select(),
+                cached.dml(),
+                cached.command()
+            );
 
         // Try parting as native command.
         QueryParserResult parseRes = parseNative(schemaName, qry, remainingAllowed);
 
         // Otherwise parse with H2.
         if (parseRes == null)
-            parseRes = parseH2(schemaName, qry, remainingAllowed);
+            parseRes = parseH2(schemaName, qry, qryDesc.batched(), remainingAllowed);
 
         // Add to cache if not multi-statement.
         if (parseRes.remainingQuery() == null) {
             cached = new QueryParserCacheEntry(parseRes.select(), parseRes.dml(), parseRes.command());
 
-            cache.put(cachedKey, cached);
+            cache.put(qryDesc, cached);
         }
 
         // Done.
@@ -209,6 +207,8 @@ public class QueryParser {
 
             SqlFieldsQuery newQry = cloneFieldsQuery(qry).setSql(parser.lastCommandSql());
 
+            QueryDescriptor newPlanKey = queryDescriptor(schemaName, newQry);
+
             SqlFieldsQuery remainingQry = null;
             
             if (!F.isEmpty(parser.remainingSql())) {
@@ -219,7 +219,14 @@ public class QueryParser {
 
             QueryParserResultCommand cmd = new QueryParserResultCommand(nativeCmd, null, false);
 
-            return new QueryParserResult(newQry, remainingQry, null, null, cmd);
+            return new QueryParserResult(
+                newPlanKey,
+                QueryParameters.fromQuery(newQry),
+                remainingQry,
+                null,
+                null,
+                cmd
+            );
         }
         catch (SqlStrictParseException e) {
             throw new IgniteSQLException(e.getMessage(), IgniteQueryErrorCode.PARSING, e);
@@ -246,11 +253,13 @@ public class QueryParser {
      *
      * @param schemaName Schema name.
      * @param qry Query.
+     * @param batched Batched flag.
      * @param remainingAllowed Whether multiple statements are allowed.              
      * @return Parsing result.
      */
     @SuppressWarnings("IfMayBeConditional")
-    private QueryParserResult parseH2(String schemaName, SqlFieldsQuery qry, boolean remainingAllowed) {
+    private QueryParserResult parseH2(String schemaName, SqlFieldsQuery qry, boolean batched,
+        boolean remainingAllowed) {
         Connection c = connMgr.connectionForThread().connection(schemaName);
 
         // For queries that are explicitly local, we rely on the flag specified in the query
@@ -303,7 +312,7 @@ public class QueryParser {
             Object[] args = null;
             Object[] remainingArgs = null;
 
-            if (!DmlUtils.isBatched(qry) && paramsCnt > 0) {
+            if (!batched && paramsCnt > 0) {
                 if (argsOrig == null || argsOrig.length < paramsCnt)
                     // Not enough parameters, but we will handle this later on execution phase.
                     args = argsOrig;
@@ -319,6 +328,8 @@ public class QueryParser {
 
             newQry.setArgs(args);
 
+            QueryDescriptor newQryDesc = queryDescriptor(schemaName, newQry);
+
             if (remainingQry != null)
                 remainingQry.setArgs(remainingArgs);
 
@@ -328,17 +339,38 @@ public class QueryParser {
 
                 QueryParserResultCommand cmd = new QueryParserResultCommand(null, cmdH2, false);
 
-                return new QueryParserResult(newQry, remainingQry, null, null, cmd);
+                return new QueryParserResult(
+                    newQryDesc,
+                    QueryParameters.fromQuery(newQry),
+                    remainingQry,
+                    null,
+                    null,
+                    cmd
+                );
             }
             else if (CommandProcessor.isCommandNoOp(prepared)) {
                 QueryParserResultCommand cmd = new QueryParserResultCommand(null, null, true);
 
-                return new QueryParserResult(newQry, remainingQry, null, null, cmd);
+                return new QueryParserResult(
+                    newQryDesc,
+                    QueryParameters.fromQuery(newQry),
+                    remainingQry,
+                    null,
+                    null,
+                    cmd
+                );
             }
             else if (GridSqlQueryParser.isDml(prepared)) {
-                QueryParserResultDml dml = prepareDmlStatement(schemaName, qry, prepared);
-
-                return new QueryParserResult(newQry, remainingQry, null, dml, null);
+                QueryParserResultDml dml = prepareDmlStatement(newQryDesc, prepared);
+
+                return new QueryParserResult(
+                    newQryDesc,
+                    QueryParameters.fromQuery(newQry),
+                    remainingQry,
+                    null,
+                    dml,
+                    null
+                );
             }
             else if (!prepared.isQuery()) {
                 throw new IgniteSQLException("Unsupported statement: " + newQry.getSql(),
@@ -405,7 +437,14 @@ public class QueryParser {
                     forUpdate
                 );
 
-                return new QueryParserResult(newQry, remainingQry, select, null, null);
+                return new QueryParserResult(
+                    newQryDesc,
+                    QueryParameters.fromQuery(newQry),
+                    remainingQry,
+                    select,
+                    null,
+                    null
+                );
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteSQLException("Failed to parse query: " + newQry.getSql(), IgniteQueryErrorCode.PARSING,
@@ -478,14 +517,13 @@ public class QueryParser {
     /**
      * Prepare DML statement.
      *
-     * @param schemaName Schema name.
-     * @param qry Query.
+     * @param planKey Plan key.
      * @param prepared Prepared.
      * @return Statement.
      */
-    private QueryParserResultDml prepareDmlStatement(String schemaName, SqlFieldsQuery qry, Prepared prepared) {
-        if (F.eq(QueryUtils.SCHEMA_SYS, schemaName))
-            throw new IgniteSQLException("DML statements are not supported on " + schemaName + " schema",
+    private QueryParserResultDml prepareDmlStatement(QueryDescriptor planKey, Prepared prepared) {
+        if (F.eq(QueryUtils.SCHEMA_SYS, planKey.schemaName()))
+            throw new IgniteSQLException("DML statements are not supported on " + planKey.schemaName() + " schema",
                 IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
         // Prepare AST.
@@ -532,11 +570,10 @@ public class QueryParser {
 
         try {
             plan = UpdatePlanBuilder.planForStatement(
-                schemaName,
+                planKey,
                 stmt,
                 mvccEnabled,
-                idx,
-                qry
+                idx
             );
         }
         catch (Exception e) {
@@ -585,4 +622,34 @@ public class QueryParser {
     private static SqlFieldsQuery cloneFieldsQuery(SqlFieldsQuery oldQry) {
         return oldQry.copy().setLocal(oldQry.isLocal()).setPageSize(oldQry.getPageSize());
     }
+
+    /**
+     * Prepare plan key.
+     *
+     * @param schemaName Schema name.
+     * @param qry Query.
+     * @return Plan key.
+     */
+    private static QueryDescriptor queryDescriptor(String schemaName, SqlFieldsQuery qry) {
+        boolean skipReducerOnUpdate = false;
+        boolean batched = false;
+
+        if (qry instanceof SqlFieldsQueryEx) {
+            SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)qry;
+
+            skipReducerOnUpdate = !qry.isLocal() && qry0.isSkipReducerOnUpdate();
+            batched = qry0.isBatched();
+        }
+
+        return new QueryDescriptor(
+            schemaName,
+            qry.getSql(),
+            qry.isCollocated(),
+            qry.isDistributedJoins(),
+            qry.isEnforceJoinOrder(),
+            qry.isLocal(),
+            skipReducerOnUpdate,
+            batched
+        );
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResult.java
index 75caff9..e7c66e7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResult.java
@@ -24,8 +24,11 @@ import org.jetbrains.annotations.Nullable;
  * Result of parsing and splitting SQL from {@link SqlFieldsQuery}.
  */
 public class QueryParserResult {
-    /** New fields query that may be executed right away. */
-    private final SqlFieldsQuery qry;
+    /** Query descriptor. */
+    private final QueryDescriptor qryDesc;
+
+    /** Query parameters. */
+    private final QueryParameters qryParams;
 
     /** Remaining query. */
     private final SqlFieldsQuery remainingQry;
@@ -42,20 +45,23 @@ public class QueryParserResult {
     /**
      * Constructor.
      *
-     * @param qry New query.
+     * @param qryDesc Query descriptor.
+     * @param qryParams Query parameters.
      * @param remainingQry Remaining query.
      * @param select Select.
      * @param dml DML.
      * @param cmd Command.
      */
     public QueryParserResult(
-        SqlFieldsQuery qry,
+        QueryDescriptor qryDesc,
+        QueryParameters qryParams,
         SqlFieldsQuery remainingQry,
         @Nullable QueryParserResultSelect select,
         @Nullable QueryParserResultDml dml,
         @Nullable QueryParserResultCommand cmd
     ) {
-        this.qry = qry;
+        this.qryDesc = qryDesc;
+        this.qryParams = qryParams;
         this.remainingQry = remainingQry;
         this.select = select;
         this.dml = dml;
@@ -63,10 +69,17 @@ public class QueryParserResult {
     }
 
     /**
-     * @return New fields query that may be executed right away.
+     * @return Query descriptor.
+     */
+    public QueryDescriptor queryDescriptor() {
+        return qryDesc;
+    }
+
+    /**
+     * @return Query parameters.
      */
-    public SqlFieldsQuery query() {
-        return qry;
+    public QueryParameters queryParameters() {
+        return qryParams;
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index aa47c1b..6af6955 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -30,11 +30,9 @@ import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -42,6 +40,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessor;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.QueryDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlColumn;
@@ -67,7 +66,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
 import org.h2.table.Column;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Logic for building update plans performed by {@link DmlStatementsProcessor}.
@@ -88,25 +86,23 @@ public final class UpdatePlanBuilder {
      * Generate SELECT statements to retrieve data for modifications from and find fast UPDATE or DELETE args,
      * if available.
      *
-     * @param schemaName Schema name.
+     * @param planKey Plan key.
      * @param stmt Statement.
      * @param mvccEnabled MVCC enabled flag.
      * @param idx Indexing.
-     * @param fieldsQry Original query.
      * @return Update plan.
      */
     @SuppressWarnings("ConstantConditions")
     public static UpdatePlan planForStatement(
-        String schemaName,
+        QueryDescriptor planKey,
         GridSqlStatement stmt,
         boolean mvccEnabled,
-        IgniteH2Indexing idx,
-        @Nullable SqlFieldsQuery fieldsQry
+        IgniteH2Indexing idx
     ) throws IgniteCheckedException {
         if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert)
-            return planForInsert(schemaName, stmt, idx, mvccEnabled, fieldsQry);
+            return planForInsert(planKey, stmt, idx, mvccEnabled);
         else if (stmt instanceof GridSqlUpdate || stmt instanceof GridSqlDelete)
-            return planForUpdate(schemaName, stmt, idx, mvccEnabled, fieldsQry);
+            return planForUpdate(planKey, stmt, idx, mvccEnabled);
         else
             throw new IgniteSQLException("Unsupported operation: " + stmt.getSQL(),
                 IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
@@ -115,21 +111,19 @@ public final class UpdatePlanBuilder {
     /**
      * Prepare update plan for INSERT or MERGE.
      *
-     * @param schemaName Schema name.
+     * @param planKey Plan key.
      * @param stmt INSERT or MERGE statement.
      * @param idx Indexing.
      * @param mvccEnabled Mvcc flag.
-     * @param fieldsQuery Original query.
      * @return Update plan.
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings("ConstantConditions")
     private static UpdatePlan planForInsert(
-        String schemaName,
+        QueryDescriptor planKey,
         GridSqlStatement stmt,
         IgniteH2Indexing idx,
-        boolean mvccEnabled,
-        @Nullable SqlFieldsQuery fieldsQuery
+        boolean mvccEnabled
     ) throws IgniteCheckedException {
         GridSqlQuery sel = null;
 
@@ -258,8 +252,7 @@ public final class UpdatePlanBuilder {
             distributed = checkPlanCanBeDistributed(
                 idx,
                 mvccEnabled,
-                schemaName,
-                fieldsQuery,
+                planKey,
                 selectSql,
                 tbl.dataTable().cacheName()
             );
@@ -333,20 +326,18 @@ public final class UpdatePlanBuilder {
     /**
      * Prepare update plan for UPDATE or DELETE.
      *
-     * @param schemaName Schema name.
+     * @param planKey Plan key.
      * @param stmt UPDATE or DELETE statement.
      * @param idx Indexing.
      * @param mvccEnabled MVCC flag.
-     * @param fieldsQuery Original query.
      * @return Update plan.
      * @throws IgniteCheckedException if failed.
      */
     private static UpdatePlan planForUpdate(
-        String schemaName,
+        QueryDescriptor planKey,
         GridSqlStatement stmt,
         IgniteH2Indexing idx,
-        boolean mvccEnabled,
-        @Nullable SqlFieldsQuery fieldsQuery
+        boolean mvccEnabled
     ) throws IgniteCheckedException {
         GridSqlElement target;
 
@@ -442,8 +433,7 @@ public final class UpdatePlanBuilder {
                     distributed = checkPlanCanBeDistributed(
                         idx,
                         mvccEnabled,
-                        schemaName,
-                        fieldsQuery,
+                        planKey,
                         selectSql,
                         tbl.dataTable().cacheName()
                     );
@@ -477,8 +467,7 @@ public final class UpdatePlanBuilder {
                     distributed = checkPlanCanBeDistributed(
                         idx,
                         mvccEnabled,
-                        schemaName,
-                        fieldsQuery,
+                        planKey,
                         selectSql,
                         tbl.dataTable().cacheName()
                     );
@@ -862,8 +851,7 @@ public final class UpdatePlanBuilder {
      *
      * @param idx Indexing.
      * @param mvccEnabled Mvcc flag.
-     * @param schemaName Schema name.
-     * @param fieldsQry Initial update query.
+     * @param planKey Plan key.
      * @param selectQry Derived select query.
      * @param cacheName Cache name.
      * @return distributed update plan info, or {@code null} if cannot be distributed.
@@ -872,24 +860,23 @@ public final class UpdatePlanBuilder {
     private static DmlDistributedPlanInfo checkPlanCanBeDistributed(
         IgniteH2Indexing idx,
         boolean mvccEnabled,
-        String schemaName,
-        SqlFieldsQuery fieldsQry,
+        QueryDescriptor planKey,
         String selectQry,
         String cacheName
     )
         throws IgniteCheckedException {
-        if ((!mvccEnabled && !isSkipReducerOnUpdateQuery(fieldsQry)) || DmlUtils.isBatched(fieldsQry))
+        if ((!mvccEnabled && !planKey.skipReducerOnUpdate()) || planKey.batched())
             return null;
 
-        try (Connection conn = idx.connections().connectionNoCache(schemaName)) {
+        try (Connection conn = idx.connections().connectionNoCache(planKey.schemaName())) {
             // Get a new prepared statement for derived select query.
             try (PreparedStatement stmt = conn.prepareStatement(selectQry)) {
                 GridCacheTwoStepQuery qry = GridSqlQuerySplitter.split(
                     conn,
                     GridSqlQueryParser.prepared(stmt),
-                    fieldsQry.isCollocated(),
-                    fieldsQry.isDistributedJoins(),
-                    fieldsQry.isEnforceJoinOrder(),
+                    planKey.collocated(),
+                    planKey.distributedJoins(),
+                    planKey.enforceJoinOrder(),
                     false,
                     idx
                 );
@@ -917,17 +904,6 @@ public final class UpdatePlanBuilder {
     }
 
     /**
-     * Checks whether query flags are compatible with server side update.
-     *
-     * @param qry Query.
-     * @return {@code true} if update can be distributed.
-     */
-    private static boolean isSkipReducerOnUpdateQuery(SqlFieldsQuery qry) {
-        return qry != null && !qry.isLocal() &&
-            qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isSkipReducerOnUpdate();
-    }
-
-    /**
      * Simple supplier that just takes specified element of a given row.
      */
     private static final class PlainValueSupplier implements KeyValueSupplier {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
index e7f3b8c..be90a5d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
 import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
 import org.apache.ignite.internal.util.typedef.G;
@@ -65,6 +64,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath;
@@ -312,6 +312,7 @@ public class RunningQueriesTest extends AbstractIndexingCommonTest {
      *
      * @throws Exception Exception in case of failure.
      */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-11510")
     @Test
     public void testQueryDmlDelete() throws Exception {
         testQueryDML("DELETE FROM /* comment */ Integer");
@@ -322,6 +323,7 @@ public class RunningQueriesTest extends AbstractIndexingCommonTest {
      *
      * @throws Exception Exception in case of failure.
      */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-11510")
     @Test
     public void testQueryDmlInsert() throws Exception {
         testQueryDML("INSERT INTO Integer(_key, _val) VALUES(1,1)");
@@ -332,6 +334,7 @@ public class RunningQueriesTest extends AbstractIndexingCommonTest {
      *
      * @throws Exception Exception in case of failure.
      */
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-11510")
     @Test
     public void testQueryDmlUpdate() throws Exception {
         testQueryDML("UPDATE Integer set _val = 1 where 1=1");
@@ -644,11 +647,23 @@ public class RunningQueriesTest extends AbstractIndexingCommonTest {
      */
     private static class BlockingIndexing extends IgniteH2Indexing {
         /** {@inheritDoc} */
-        @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
-            @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts,
-            MvccQueryTracker tracker, GridQueryCancel cancel, boolean registerAsNewQry) {
-            List<FieldsQueryCursor<List<?>>> res = super.querySqlFields(schemaName, qry, cliCtx, keepBinary,
-                failOnMultipleStmts, tracker, cancel, registerAsNewQry);
+        @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
+            String schemaName,
+            SqlFieldsQuery qry,
+            @Nullable SqlClientContext cliCtx,
+            boolean keepBinary,
+            boolean failOnMultipleStmts,
+            GridQueryCancel cancel
+        ) {
+            List<FieldsQueryCursor<List<?>>> res = super.querySqlFields(
+                schemaName,
+                qry,
+                cliCtx,
+                keepBinary,
+                failOnMultipleStmts,
+                cancel
+            );
+
             try {
                 awaitTimeouted();
             }