You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/08 10:14:50 UTC

[ignite] branch master updated: IGNITE-11209: SQL: Merged IgniteH2Indexing and DmlStatemntsProcessor. This closes #6035.

This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ca26ce1 IGNITE-11209: SQL: Merged IgniteH2Indexing and DmlStatemntsProcessor. This closes #6035.
5ca26ce1 is described below

commit 5ca26ce1e3fd0c299dfc9e9ce195f54ac74382ae
Author: devozerov <vo...@gridgain.com>
AuthorDate: Fri Feb 8 13:14:04 2019 +0300

    IGNITE-11209: SQL: Merged IgniteH2Indexing and DmlStatemntsProcessor. This closes #6035.
---
 .../dht/GridDhtTxQueryEnlistFuture.java            |   16 +-
 .../processors/query/GridQueryIndexing.java        |   33 +-
 .../processors/query/GridQueryProcessor.java       |   34 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |   11 +
 .../IgniteClientCacheInitializationFailTest.java   |    9 +-
 .../query/h2/DmlStatementsProcessor.java           | 1457 +-------------------
 .../processors/query/h2/H2CachedStatementKey.java  |   17 +-
 .../processors/query/h2/IgniteH2Indexing.java      |  928 +++++++++++--
 .../internal/processors/query/h2/UpdateResult.java |   22 +
 .../query/h2/ddl/DdlStatementsProcessor.java       |   13 +
 .../processors/query/h2/dml/DmlAstUtils.java       |   47 +-
 .../query/h2/dml/DmlBulkLoadDataConverter.java     |   52 +
 .../query/h2/dml/DmlUpdateResultsIterator.java     |   71 +
 .../query/h2/dml/DmlUpdateSingleEntryIterator.java |   66 +
 .../internal/processors/query/h2/dml/DmlUtils.java |  420 +++++-
 .../processors/query/h2/dml/UpdatePlanBuilder.java |   39 +-
 .../query/h2/sql/GridSqlQueryParser.java           |   10 +
 .../query/h2/twostep/GridMapQueryExecutor.java     |    2 +-
 .../query/h2/GridH2IndexingInMemSelfTest.java      |   25 -
 .../query/h2/GridH2IndexingOffheapSelfTest.java    |   28 -
 .../query/h2/GridIndexingSpiAbstractSelfTest.java  |  174 ---
 .../IgniteBinaryCacheQueryTestSuite.java           |    7 -
 22 files changed, 1594 insertions(+), 1887 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
index cea50d5..ee49c6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
@@ -121,8 +121,20 @@ public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnli
     @Override protected UpdateSourceIterator<?> createIterator() throws IgniteCheckedException {
         checkPartitions(parts);
 
-        return cctx.kernalContext().query().prepareDistributedUpdate(cctx, cacheIds, parts, schema, qry,
-                params, flags, pageSize, 0, tx.topologyVersionSnapshot(), mvccSnapshot, new GridQueryCancel());
+        return cctx.kernalContext().query().executeUpdateOnDataNodeTransactional(
+            cctx,
+            cacheIds,
+            parts,
+            schema,
+            qry,
+            params,
+            flags,
+            pageSize,
+            0,
+            tx.topologyVersionSnapshot(),
+            mvccSnapshot,
+            new GridQueryCancel()
+        );
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 7ee8069..f6215b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -121,21 +121,6 @@ public interface GridQueryIndexing {
         SqlClientContext cliCtx) throws IgniteCheckedException;
 
     /**
-     * Queries individual fields (generally used by JDBC drivers).
-     *
-     * @param schemaName Schema name.
-     * @param qry Query.
-     * @param keepBinary Keep binary flag.
-     * @param filter Cache name and key filter.
-     * @param cancel Query cancel.
-     * @param qryId Running query id. {@code null} in case query is not registered.
-     * @return Cursor.
-     */
-    public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry,
-        boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel,
-        @Nullable Long qryId) throws IgniteCheckedException;
-
-    /**
      * Executes text query.
      *
      * @param schemaName Schema name.
@@ -236,10 +221,20 @@ public interface GridQueryIndexing {
      * @return Cursor over entries which are going to be changed.
      * @throws IgniteCheckedException If failed.
      */
-    public UpdateSourceIterator<?> prepareDistributedUpdate(GridCacheContext<?, ?> cctx, int[] ids, int[] parts,
-        String schema, String qry, Object[] params, int flags,
-        int pageSize, int timeout, AffinityTopologyVersion topVer,
-        MvccSnapshot mvccSnapshot, GridQueryCancel cancel) throws IgniteCheckedException;
+    public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(
+        GridCacheContext<?, ?> cctx,
+        int[] ids,
+        int[] parts,
+        String schema,
+        String qry,
+        Object[] params,
+        int flags,
+        int pageSize,
+        int timeout,
+        AffinityTopologyVersion topVer,
+        MvccSnapshot mvccSnapshot,
+        GridQueryCancel cancel
+    ) throws IgniteCheckedException;
 
     /**
      * Registers type if it was not known before or updates it otherwise.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index e5f323d..e099a1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2046,6 +2046,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Execute update on DHT node (i.e. when it is possible to execute and update on all nodes independently).
      *
      * @param cctx Cache context.
      * @param cacheIds Involved cache ids.
@@ -2062,13 +2063,36 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @return Cursor over entries which are going to be changed.
      * @throws IgniteCheckedException If failed.
      */
-    public UpdateSourceIterator<?> prepareDistributedUpdate(GridCacheContext<?, ?> cctx, int[] cacheIds,
-        int[] parts, String schema, String qry, Object[] params, int flags, int pageSize, int timeout,
-        AffinityTopologyVersion topVer, MvccSnapshot mvccSnapshot,
-        GridQueryCancel cancel) throws IgniteCheckedException {
+    public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(
+        GridCacheContext<?, ?> cctx,
+        int[] cacheIds,
+        int[] parts,
+        String schema,
+        String qry,
+        Object[] params,
+        int flags,
+        int pageSize,
+        int timeout,
+        AffinityTopologyVersion topVer,
+        MvccSnapshot mvccSnapshot,
+        GridQueryCancel cancel
+    ) throws IgniteCheckedException {
         checkxEnabled();
 
-        return idx.prepareDistributedUpdate(cctx, cacheIds, parts, schema, qry, params, flags, pageSize, timeout, topVer, mvccSnapshot, cancel);
+        return idx.executeUpdateOnDataNodeTransactional(
+            cctx,
+            cacheIds,
+            parts,
+            schema,
+            qry,
+            params,
+            flags,
+            pageSize,
+            timeout,
+            topVer,
+            mvccSnapshot,
+            cancel
+        );
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index c57c8bb..51feab6 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -11226,4 +11226,15 @@ public abstract class IgniteUtils {
 
         return Math.abs((Math.abs(grpId) + partId)) % stripes;
     }
+
+    /**
+     * Check if flag set.
+     *
+     * @param flags Flags.
+     * @param flag Flag.
+     * @return {@code True} if set.
+     */
+    public static boolean isFlagSet(int flags, int flag) {
+        return (flags & flag) == flag;
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index 98f296a..5313c09 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -304,13 +304,6 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
         }
 
         /** {@inheritDoc} */
-        @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry,
-            boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel,
-            Long qryId) throws IgniteCheckedException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
         @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String spaceName,
             String cacheName, String qry, String typeName, IndexingQueryFilter filter) throws IgniteCheckedException {
             return null;
@@ -355,7 +348,7 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
         }
 
         /** {@inheritDoc} */
-        @Override public UpdateSourceIterator<?> prepareDistributedUpdate(GridCacheContext<?, ?> cctx, int[] ids, int[] parts,
+        @Override public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(GridCacheContext<?, ?> cctx, int[] ids, int[] parts,
             String schema, String qry, Object[] params, int flags, int pageSize, int timeout,
             AffinityTopologyVersion topVer,
             MvccSnapshot mvccVer, GridQueryCancel cancel) throws IgniteCheckedException {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 569c8b8..937e383 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -17,1278 +17,26 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
-import java.sql.BatchUpdateException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.cache.CacheServerNotFoundException;
-import org.apache.ignite.cache.query.BulkLoadContextCursor;
-import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadParser;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter;
-import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
-import org.apache.ignite.internal.processors.cache.CacheOperationContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.mvcc.StaticMvccQueryTracker;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
-import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
-import org.apache.ignite.internal.processors.odbc.SqlStateCode;
-import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
-import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
-import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender;
-import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
-import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
-import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
-import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
-import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
-import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
-import org.apache.ignite.internal.sql.command.SqlCommand;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.lang.IgniteClosureX;
-import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T3;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.spi.indexing.IndexingQueryFilter;
-import org.apache.ignite.transactions.TransactionDuplicateKeyException;
-import org.h2.command.Prepared;
-import org.h2.command.dml.Delete;
-import org.h2.command.dml.Insert;
-import org.h2.command.dml.Merge;
-import org.h2.command.dml.Update;
-import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccTracker;
-import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.requestSnapshot;
-import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
-import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart;
-import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY;
-import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
-import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
 
 /**
- *
+ * Contains entry processors for DML. Should be modified very carefully to maintain binary compatibility due to
+ * seializable anonymous classes.
  */
+@SuppressWarnings({"Anonymous2MethodRef", "PublicInnerClass", "unused"})
 public class DmlStatementsProcessor {
-    /** Default number of attempts to re-run DELETE and UPDATE queries in case of concurrent modifications of values. */
-    private static final int DFLT_DML_RERUN_ATTEMPTS = 4;
-
-    /** Kernal context. */
-    private final GridKernalContext ctx;
-
-    /** Indexing. */
-    private final IgniteH2Indexing idx;
-
-    /** Object value context. */
-    private final CacheObjectValueContext coCtx;
-
-    /** Connection manager. */
-    private final ConnectionManager connMgr;
-
-    /** Schema manager. */
-    private final SchemaManager schemaMgr;
-
-    /** Logger. */
-    private final IgniteLogger log;
-
-    /** Default size for update plan cache. */
-    private static final int PLAN_CACHE_SIZE = 1024;
-
-    /** Cached value of {@code IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION}. */
-    private final boolean isDmlAllowedOverride;
-
-    /** Update plans cache. */
-    private final ConcurrentMap<H2CachedStatementKey, UpdatePlan> planCache =
-        new GridBoundedConcurrentLinkedHashMap<>(PLAN_CACHE_SIZE);
-
-    /**
-     * Default constructor.
-     */
-    public DmlStatementsProcessor(GridKernalContext ctx, IgniteH2Indexing idx) {
-        this.ctx = ctx;
-        this.idx = idx;
-
-        coCtx = idx.objectContext();
-        connMgr = idx.connections();
-        schemaMgr = idx.schemaManager();
-
-        log = ctx.log(DmlStatementsProcessor.class);
-
-        isDmlAllowedOverride = Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION);
-    }
-
-    /**
-     * Handle cache stop.
-     *
-     * @param cacheName Cache name.
-     */
-    public void onCacheStop(String cacheName) {
-        Iterator<Map.Entry<H2CachedStatementKey, UpdatePlan>> iter = planCache.entrySet().iterator();
-
-        while (iter.hasNext()) {
-            UpdatePlan plan = iter.next().getValue();
-
-            if (F.eq(cacheName, plan.cacheContext().name()))
-                iter.remove();
-        }
-    }
-
-    /**
-     * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
-     *
-     * @param schemaName Schema.
-     * @param conn Connection.
-     * @param prepared Prepared statement.
-     * @param fieldsQry Original query.
-     * @param loc Query locality flag.
-     * @param filters Cache name and key filter.
-     * @param cancel Cancel.
-     * @return Update result (modified items count and failed keys).
-     * @throws IgniteCheckedException if failed.
-     */
-    private UpdateResult updateSqlFields(String schemaName, Connection conn, Prepared prepared,
-        SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel)
-        throws IgniteCheckedException {
-        Object[] errKeys = null;
-
-        long items = 0;
-
-        UpdatePlan plan = getPlanForStatement(schemaName, conn, prepared, fieldsQry, loc, null);
-
-        GridCacheContext<?, ?> cctx = plan.cacheContext();
-
-        for (int i = 0; i < DFLT_DML_RERUN_ATTEMPTS; i++) {
-            CacheOperationContext opCtx = setKeepBinaryContext(cctx);
-
-            UpdateResult r;
-
-            try {
-                r = executeUpdateStatement(schemaName, plan, fieldsQry, loc, filters, cancel);
-            }
-            finally {
-                cctx.operationContextPerCall(opCtx);
-            }
-
-            items += r.counter();
-            errKeys = r.errorKeys();
-
-            if (F.isEmpty(errKeys))
-                break;
-        }
-
-        if (F.isEmpty(errKeys)) {
-            if (items == 1L)
-                return UpdateResult.ONE;
-            else if (items == 0L)
-                return UpdateResult.ZERO;
-        }
-
-        return new UpdateResult(items, errKeys);
-    }
-
-    /**
-     * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
-     *
-     * @param schemaName Schema.
-     * @param conn Connection.
-     * @param prepared Prepared statement.
-     * @param fieldsQry Original query.
-     * @param loc Query locality flag.
-     * @param filters Cache name and key filter.
-     * @param cancel Cancel.
-     * @return Update result (modified items count and failed keys).
-     * @throws IgniteCheckedException if failed.
-     */
-    private Collection<UpdateResult> updateSqlFieldsBatched(String schemaName, Connection conn, Prepared prepared,
-        SqlFieldsQueryEx fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel)
-        throws IgniteCheckedException {
-        List<Object[]> argss = fieldsQry.batchedArguments();
-
-        UpdatePlan plan = getPlanForStatement(schemaName, conn, prepared, fieldsQry, loc, null);
-
-        GridCacheContext<?, ?> cctx = plan.cacheContext();
-
-        // For MVCC case, let's enlist batch elements one by one.
-        if (plan.hasRows() && plan.mode() == UpdateMode.INSERT && !cctx.mvccEnabled()) {
-            CacheOperationContext opCtx = setKeepBinaryContext(cctx);
-
-            try {
-                List<List<List<?>>> cur = plan.createRows(argss);
-
-                return processDmlSelectResultBatched(plan, cur, fieldsQry.getPageSize());
-            }
-            finally {
-                cctx.operationContextPerCall(opCtx);
-            }
-        }
-        else {
-            // Fallback to previous mode.
-            Collection<UpdateResult> ress = new ArrayList<>(argss.size());
-
-            SQLException batchException = null;
-
-            int[] cntPerRow = new int[argss.size()];
-
-            int cntr = 0;
-
-            for (Object[] args : argss) {
-                SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)fieldsQry.copy();
-
-                qry0.clearBatchedArgs();
-                qry0.setArgs(args);
-
-                UpdateResult res;
-
-                try {
-                    res = updateSqlFields(schemaName, conn, prepared, qry0, loc, filters, cancel);
-
-                    cntPerRow[cntr++] = (int)res.counter();
-
-                    ress.add(res);
-                }
-                catch (Exception e ) {
-                    SQLException sqlEx = QueryUtils.toSqlException(e);
-
-                    batchException = chainException(batchException, sqlEx);
-
-                    cntPerRow[cntr++] = Statement.EXECUTE_FAILED;
-                }
-            }
-
-            if (batchException != null) {
-                BatchUpdateException e = new BatchUpdateException(batchException.getMessage(),
-                    batchException.getSQLState(), batchException.getErrorCode(), cntPerRow, batchException);
-
-                throw new IgniteCheckedException(e);
-            }
-
-            return ress;
-        }
-    }
-
-    /**
-     * Makes current operation context as keepBinary.
-     *
-     * @param cctx Cache context.
-     * @return Old operation context.
-     */
-    private CacheOperationContext setKeepBinaryContext(GridCacheContext<?, ?> cctx) {
-        CacheOperationContext opCtx = cctx.operationContextPerCall();
-
-        // Force keepBinary for operation context to avoid binary deserialization inside entry processor
-        if (cctx.binaryMarshaller()) {
-            CacheOperationContext newOpCtx = null;
-
-            if (opCtx == null)
-                // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
-                newOpCtx = new CacheOperationContext(false, null, true, null, false, null, false, true);
-            else if (!opCtx.isKeepBinary())
-                newOpCtx = opCtx.keepBinary();
-
-            if (newOpCtx != null)
-                cctx.operationContextPerCall(newOpCtx);
-        }
-
-        return opCtx;
-    }
-
-    /**
-     * @param schemaName Schema.
-     * @param c Connection.
-     * @param p Prepared statement.
-     * @param fieldsQry Initial query
-     * @param cancel Query cancel.
-     * @return Update result wrapped into {@link GridQueryFieldsResult}
-     * @throws IgniteCheckedException if failed.
-     */
-    @SuppressWarnings("unchecked")
-    List<QueryCursorImpl<List<?>>> updateSqlFieldsDistributed(String schemaName, Connection c, Prepared p,
-        SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
-        if (DmlUtils.isBatched(fieldsQry)) {
-            Collection<UpdateResult> ress = updateSqlFieldsBatched(schemaName, c, p, (SqlFieldsQueryEx)fieldsQry,
-                false, null, cancel);
-
-            ArrayList<QueryCursorImpl<List<?>>> resCurs = new ArrayList<>(ress.size());
-
-            for (UpdateResult res : ress) {
-                checkUpdateResult(res);
-
-                QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
-                    (Collections.singletonList(res.counter())), cancel, false);
-
-                resCur.fieldsMeta(UPDATE_RESULT_META);
-
-                resCurs.add(resCur);
-            }
-
-            return resCurs;
-        }
-        else {
-            UpdateResult res = updateSqlFields(schemaName, c, p, fieldsQry, false, null, cancel);
-
-            checkUpdateResult(res);
-
-            QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
-                (Collections.singletonList(res.counter())), cancel, false);
-
-            resCur.fieldsMeta(UPDATE_RESULT_META);
-
-            return Collections.singletonList(resCur);
-        }
-    }
-
-    /**
-     * Execute DML statement on local cache.
-     *
-     * @param schemaName Schema.
-     * @param conn Connection.
-     * @param prepared H2 prepared command.
-     * @param fieldsQry Fields query.
-     * @param filters Cache name and key filter.
-     * @param cancel Query cancel.
-     * @return Update result wrapped into {@link GridQueryFieldsResult}
-     * @throws IgniteCheckedException if failed.
-     */
-    @SuppressWarnings("unchecked")
-    GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, Connection conn, Prepared prepared,
-        SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel)
-        throws IgniteCheckedException {
-        UpdateResult res = updateSqlFields(schemaName, conn, prepared, fieldsQry, true,
-            filters, cancel);
-
-        return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
-            new IgniteSingletonIterator(Collections.singletonList(res.counter())));
-    }
-
-    /**
-     * Perform given statement against given data streamer. Only rows based INSERT is supported.
-     *
-     * @param qry Query.
-     * @param schemaName Schema name.
-     * @param streamer Streamer to feed data to.
-     * @param stmt Statement.
-     * @param args Statement arguments.
-     * @return Number of rows in given INSERT statement.
-     * @throws IgniteCheckedException if failed.
-     */
-    @SuppressWarnings({"unchecked"})
-    long streamUpdateQuery(String qry, String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt,
-        final Object[] args) throws IgniteCheckedException {
-        Long qryId = idx.runningQueryManager().register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
-
-        boolean fail = false;
-
-        try {
-            idx.checkStatementStreamable(stmt);
-
-            Prepared p = GridSqlQueryParser.prepared(stmt);
-
-            assert p != null;
-
-            final UpdatePlan plan = getPlanForStatement(schemaName, null, p, null, true, null);
-
-            assert plan.isLocalSubquery();
-
-            final GridCacheContext cctx = plan.cacheContext();
-
-            QueryCursorImpl<List<?>> cur;
-
-            final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
-
-            QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
-                @Override public Iterator<List<?>> iterator() {
-                    try {
-                        Iterator<List<?>> it;
-
-                        if (!F.isEmpty(plan.selectQuery())) {
-                            GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()),
-                                plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)),
-                                null, false, false, 0, null, null);
-
-                            it = res.iterator();
-                        }
-                        else
-                            it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator();
-
-                        return new GridQueryCacheObjectsIterator(it, coCtx, cctx.keepBinary());
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteException(e);
-                    }
-                }
-            }, null);
-
-            data.addAll(stepCur.getAll());
-
-            cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
-                @Override public Iterator<List<?>> iterator() {
-                    return data.iterator();
-                }
-            }, null);
-
-            if (plan.rowCount() == 1) {
-                IgniteBiTuple t = plan.processRow(cur.iterator().next());
-
-                streamer.addData(t.getKey(), t.getValue());
-
-                return 1;
-            }
-
-            Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
-
-            for (List<?> row : cur) {
-                final IgniteBiTuple t = plan.processRow(row);
-
-                rows.put(t.getKey(), t.getValue());
-            }
-
-            streamer.addData(rows);
-
-            return rows.size();
-        }
-        catch (IgniteCheckedException e) {
-            fail = true;
-
-            throw e;
-        }
-        finally {
-            idx.runningQueryManager().unregister(qryId, fail);
-        }
-    }
-
-    /**
-     * Actually perform SQL DML operation locally.
-     *
-     * @param schemaName Schema name.
-     * @param plan Cache context.
-     * @param fieldsQry Fields query.
-     * @param loc Local query flag.
-     * @param filters Cache name and key filter.
-     * @param cancel Query cancel state holder.
-     * @return Pair [number of successfully processed items; keys that have failed to be processed]
-     * @throws IgniteCheckedException if failed.
-     */
-    @SuppressWarnings({"ConstantConditions"})
-    private UpdateResult executeUpdateStatement(String schemaName, final UpdatePlan plan,
-        SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters,
-        GridQueryCancel cancel) throws IgniteCheckedException {
-        GridCacheContext cctx = plan.cacheContext();
-
-        if (cctx != null && cctx.mvccEnabled()) {
-            assert cctx.transactional();
-
-            DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
-
-            GridNearTxLocal tx = tx(cctx.kernalContext());
-
-            boolean implicit = (tx == null);
-
-            boolean commit = implicit && (!(fieldsQry instanceof SqlFieldsQueryEx) ||
-                ((SqlFieldsQueryEx)fieldsQry).isAutoCommit());
-
-            if (implicit)
-                tx = txStart(cctx, fieldsQry.getTimeout());
-
-            requestSnapshot(cctx, tx);
-
-            try (GridNearTxLocal toCommit = commit ? tx : null) {
-                long timeout = implicit
-                    ? tx.remainingTime()
-                    : IgniteH2Indexing.operationTimeout(fieldsQry.getTimeout(), tx);
-
-                if (cctx.isReplicated() || distributedPlan == null || ((plan.mode() == UpdateMode.INSERT
-                    || plan.mode() == UpdateMode.MERGE) && !plan.isLocalSubquery())) {
-
-                    boolean sequential = true;
-
-                    UpdateSourceIterator<?> it;
-
-                    if (plan.fastResult()) {
-                        IgniteBiTuple row = plan.getFastRow(fieldsQry.getArgs());
-
-                        EnlistOperation op = UpdatePlan.enlistOperation(plan.mode());
-
-                        it = new DmlUpdateSingleEntryIterator<>(op, op.isDeleteOrLock() ? row.getKey() : row);
-                    }
-                    else if (plan.hasRows())
-                        it = new DmlUpdateResultsIterator(UpdatePlan.enlistOperation(plan.mode()), plan, plan.createRows(fieldsQry.getArgs()));
-                    else {
-                        // TODO IGNITE-8865 if there is no ORDER BY statement it's no use to retain entries order on locking (sequential = false).
-                        SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
-                            .setArgs(fieldsQry.getArgs())
-                            .setDistributedJoins(fieldsQry.isDistributedJoins())
-                            .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
-                            .setLocal(fieldsQry.isLocal())
-                            .setPageSize(fieldsQry.getPageSize())
-                            .setTimeout((int)timeout, TimeUnit.MILLISECONDS)
-                            .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
-
-                        FieldsQueryCursor<List<?>> cur = idx.querySqlFields(schemaName, newFieldsQry, null,
-                            true, true, mvccTracker(cctx, tx), cancel, false).get(0);
-
-                        it = plan.iteratorForTransaction(connMgr, cur);
-                    }
-
-                    IgniteInternalFuture<Long> fut = tx.updateAsync(cctx, it,
-                        fieldsQry.getPageSize(), timeout, sequential);
-
-                    UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
-
-                    if (commit)
-                        toCommit.commit();
-
-                    return res;
-                }
-
-                int[] ids = U.toIntArray(distributedPlan.getCacheIds());
-
-                int flags = 0;
-
-                if (fieldsQry.isEnforceJoinOrder())
-                    flags |= GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
-
-                if (distributedPlan.isReplicatedOnly())
-                    flags |= GridH2QueryRequest.FLAG_REPLICATED;
-
-                flags = GridH2QueryRequest.setDataPageScanEnabled(flags,
-                    fieldsQry.isDataPageScanEnabled());
-
-                int[] parts = fieldsQry.getPartitions();
-
-                IgniteInternalFuture<Long> fut = tx.updateAsync(
-                    cctx,
-                    ids,
-                    parts,
-                    schemaName,
-                    fieldsQry.getSql(),
-                    fieldsQry.getArgs(),
-                    flags,
-                    fieldsQry.getPageSize(),
-                    timeout);
-
-                UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
-
-                if (commit)
-                    toCommit.commit();
-
-                return res;
-            }
-            catch (ClusterTopologyServerNotFoundException e) {
-                throw new CacheServerNotFoundException(e.getMessage(), e);
-            }
-            catch (IgniteCheckedException e) {
-                checkSqlException(e);
-
-                Exception ex = IgniteUtils.convertExceptionNoWrap(e);
-
-                if (ex instanceof IgniteException)
-                    throw (IgniteException)ex;
-
-                U.error(log, "Error during update [localNodeId=" + cctx.localNodeId() + "]", ex);
-
-                throw new IgniteSQLException("Failed to run update. " + ex.getMessage(), ex);
-            }
-            finally {
-                if (commit)
-                    cctx.tm().resetContext();
-            }
-        }
-
-        UpdateResult fastUpdateRes = plan.processFast(fieldsQry.getArgs());
-
-        if (fastUpdateRes != null)
-            return fastUpdateRes;
-
-        if (plan.distributedPlan() != null) {
-            UpdateResult result = doDistributedUpdate(schemaName, fieldsQry, plan, cancel);
-
-            // null is returned in case not all nodes support distributed DML.
-            if (result != null)
-                return result;
-        }
-
-        Iterable<List<?>> cur;
-
-        // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
-        // sub-query and not some dummy stuff like "select 1, 2, 3;"
-        if (!loc && !plan.isLocalSubquery()) {
-            assert !F.isEmpty(plan.selectQuery());
-
-            SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
-                .setArgs(fieldsQry.getArgs())
-                .setDistributedJoins(fieldsQry.isDistributedJoins())
-                .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
-                .setLocal(fieldsQry.isLocal())
-                .setPageSize(fieldsQry.getPageSize())
-                .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS)
-                .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
-
-            cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, newFieldsQry, null, true, true,
-                null, cancel, false).get(0);
-        }
-        else if (plan.hasRows())
-            cur = plan.createRows(fieldsQry.getArgs());
-        else {
-            final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQuery(),
-                F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), false, fieldsQry.getTimeout(),
-                cancel, null);
-
-            cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
-                @Override public Iterator<List<?>> iterator() {
-                    try {
-                        return new GridQueryCacheObjectsIterator(res.iterator(), coCtx, true);
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteException(e);
-                    }
-                }
-            }, cancel);
-        }
-
-        int pageSize = loc ? 0 : fieldsQry.getPageSize();
-
-        return processDmlSelectResult(plan, cur, pageSize);
-    }
-
-    /**
-     * @param e Exception.
-     */
-    private void checkSqlException(IgniteCheckedException e) {
-        IgniteSQLException sqlEx = X.cause(e, IgniteSQLException.class);
-
-        if(sqlEx != null)
-            throw sqlEx;
-    }
-
-    /**
-     * Performs the planned update.
-     * @param plan Update plan.
-     * @param rows Rows to update.
-     * @param pageSize Page size.
-     * @return {@link List} of update results.
-     * @throws IgniteCheckedException If failed.
-     */
-    private List<UpdateResult> processDmlSelectResultBatched(UpdatePlan plan, List<List<List<?>>> rows, int pageSize)
-        throws IgniteCheckedException {
-        switch (plan.mode()) {
-            case MERGE:
-                // TODO
-                throw new IgniteCheckedException("Unsupported, fix");
-
-            case INSERT:
-                return doInsertBatched(plan, rows, pageSize);
-
-            default:
-                throw new IgniteSQLException("Unexpected batched DML operation [mode=" + plan.mode() + ']',
-                    IgniteQueryErrorCode.UNEXPECTED_OPERATION);
-        }
-    }
-
-    /**
-     * @param plan Update plan.
-     * @param cursor Cursor over select results.
-     * @param pageSize Page size.
-     * @return Pair [number of successfully processed items; keys that have failed to be processed]
-     * @throws IgniteCheckedException if failed.
-     */
-    private UpdateResult processDmlSelectResult(UpdatePlan plan, Iterable<List<?>> cursor,
-        int pageSize) throws IgniteCheckedException {
-        switch (plan.mode()) {
-            case MERGE:
-                return new UpdateResult(doMerge(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
-
-            case INSERT:
-                return new UpdateResult(doInsert(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
-
-            case UPDATE:
-                return doUpdate(plan, cursor, pageSize);
-
-            case DELETE:
-                return doDelete(plan.cacheContext(), cursor, pageSize);
-
-            default:
-                throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode() + ']',
-                    IgniteQueryErrorCode.UNEXPECTED_OPERATION);
-        }
-    }
-
-    /**
-     * Generate SELECT statements to retrieve data for modifications from and find fast UPDATE or DELETE args,
-     * if available.
-     *
-     * @param schema Schema.
-     * @param conn Connection.
-     * @param p Prepared statement.
-     * @param fieldsQry Original fields query.
-     * @param loc Local query flag.
-     * @return Update plan.
-     */
-    UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry,
-        boolean loc, @Nullable Integer errKeysPos) throws IgniteCheckedException {
-        isDmlOnSchemaSupported(schema);
-
-        H2CachedStatementKey planKey = H2CachedStatementKey.forDmlStatement(schema, p.getSQL(), fieldsQry, loc);
-
-        UpdatePlan res = (errKeysPos == null ? planCache.get(planKey) : null);
-
-        if (res != null)
-            return res;
-
-        res = UpdatePlanBuilder.planForStatement(p, loc, idx, conn, fieldsQry, errKeysPos, isDmlAllowedOverride);
-
-        // Don't cache re-runs
-        if (errKeysPos == null)
-            return U.firstNotNull(planCache.putIfAbsent(planKey, res), res);
-        else
-            return res;
-    }
-
-    /**
-     * @param schemaName Schema name.
-     * @param fieldsQry Initial query.
-     * @param plan Update plan.
-     * @param cancel Cancel state.
-     * @return Update result.
-     * @throws IgniteCheckedException if failed.
-     */
-    private UpdateResult doDistributedUpdate(String schemaName, SqlFieldsQuery fieldsQry, UpdatePlan plan,
-        GridQueryCancel cancel) throws IgniteCheckedException {
-        DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
-
-        assert distributedPlan != null;
-
-        if (cancel == null)
-            cancel = new GridQueryCancel();
-
-        return idx.runDistributedUpdate(schemaName, fieldsQry, distributedPlan.getCacheIds(),
-            distributedPlan.isReplicatedOnly(), cancel);
-    }
-
-    /**
-     * Perform DELETE operation on top of results of SELECT.
-     * @param cctx Cache context.
-     * @param cursor SELECT results.
-     * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
-     * @return Results of DELETE (number of items affected AND keys that failed to be updated).
-     */
-    private UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize)
-        throws IgniteCheckedException {
-        DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
-
-        for (List<?> row : cursor) {
-            if (row.size() != 2) {
-                U.warn(log, "Invalid row size on DELETE - expected 2, got " + row.size());
-
-                continue;
-            }
-
-            sender.add(row.get(0), new ModifyingEntryProcessor(row.get(1), RMV),  0);
-        }
-
-        sender.flush();
-
-        SQLException resEx = sender.error();
-
-        if (resEx != null) {
-            if (!F.isEmpty(sender.failedKeys())) {
-                // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
-                // had been modified concurrently right away.
-                String msg = "Failed to DELETE some keys because they had been modified concurrently " +
-                    "[keys=" + sender.failedKeys() + ']';
-
-                SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
-
-                conEx.setNextException(resEx);
-
-                resEx = conEx;
-            }
-
-            throw new IgniteSQLException(resEx);
-        }
-
-        return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
-    }
-
-    /**
-     * Perform UPDATE operation on top of results of SELECT.
-     * @param cursor SELECT results.
-     * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
-     * @return Pair [cursor corresponding to results of UPDATE (contains number of items affected); keys whose values
-     *     had been modified concurrently (arguments for a re-run)].
-     */
-    private UpdateResult doUpdate(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize)
-        throws IgniteCheckedException {
-        GridCacheContext cctx = plan.cacheContext();
-
-        DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
-
-        for (List<?> row : cursor) {
-            T3<Object, Object, Object> row0 = plan.processRowForUpdate(row);
-
-            Object key = row0.get1();
-            Object oldVal = row0.get2();
-            Object newVal = row0.get3();
-
-            sender.add(key, new ModifyingEntryProcessor(oldVal, new EntryValueUpdater(newVal)), 0);
-        }
-
-        sender.flush();
-
-        SQLException resEx = sender.error();
-
-        if (resEx != null) {
-            if (!F.isEmpty(sender.failedKeys())) {
-                // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
-                // had been modified concurrently right away.
-                String msg = "Failed to UPDATE some keys because they had been modified concurrently " +
-                    "[keys=" + sender.failedKeys() + ']';
-
-                SQLException dupEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
-
-                dupEx.setNextException(resEx);
-
-                resEx = dupEx;
-            }
-
-            throw new IgniteSQLException(resEx);
-        }
-
-        return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
-    }
-
-    /**
-     * Execute MERGE statement plan.
-     * @param cursor Cursor to take inserted data from.
-     * @param pageSize Batch size to stream data from {@code cursor}, anything <= 0 for single page operations.
-     * @return Number of items affected.
-     * @throws IgniteCheckedException if failed.
-     */
-    @SuppressWarnings("unchecked")
-    private long doMerge(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
-        GridCacheContext cctx = plan.cacheContext();
-
-        // If we have just one item to put, just do so
-        if (plan.rowCount() == 1) {
-            IgniteBiTuple t = plan.processRow(cursor.iterator().next());
-
-            cctx.cache().put(t.getKey(), t.getValue());
-
-            return 1;
-        }
-        else {
-            int resCnt = 0;
-
-            Map<Object, Object> rows = new LinkedHashMap<>();
-
-            for (Iterator<List<?>> it = cursor.iterator(); it.hasNext();) {
-                List<?> row = it.next();
-
-                IgniteBiTuple t = plan.processRow(row);
-
-                rows.put(t.getKey(), t.getValue());
-
-                if ((pageSize > 0 && rows.size() == pageSize) || !it.hasNext()) {
-                    cctx.cache().putAll(rows);
-
-                    resCnt += rows.size();
-
-                    if (it.hasNext())
-                        rows.clear();
-                }
-            }
-
-            return resCnt;
-        }
-    }
-
-    /**
-     * Execute INSERT statement plan.
-     * @param cursor Cursor to take inserted data from.
-     * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
-     * @return Number of items affected.
-     * @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
-     */
-    @SuppressWarnings({"unchecked"})
-    private long doInsert(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
-        GridCacheContext cctx = plan.cacheContext();
-
-        // If we have just one item to put, just do so
-        if (plan.rowCount() == 1) {
-            IgniteBiTuple t = plan.processRow(cursor.iterator().next());
-
-            if (cctx.cache().putIfAbsent(t.getKey(), t.getValue()))
-                return 1;
-            else
-                throw new TransactionDuplicateKeyException("Duplicate key during INSERT [key=" + t.getKey() + ']');
-        }
-        else {
-            // Keys that failed to INSERT due to duplication.
-            DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
-
-            for (List<?> row : cursor) {
-                final IgniteBiTuple keyValPair = plan.processRow(row);
-
-                sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue()),  0);
-            }
-
-            sender.flush();
-
-            SQLException resEx = sender.error();
-
-            if (!F.isEmpty(sender.failedKeys())) {
-                String msg = "Failed to INSERT some keys because they are already in cache " +
-                    "[keys=" + sender.failedKeys() + ']';
-
-                SQLException dupEx = new SQLException(msg, SqlStateCode.CONSTRAINT_VIOLATION);
-
-                if (resEx == null)
-                    resEx = dupEx;
-                else
-                    resEx.setNextException(dupEx);
-            }
-
-            if (resEx != null)
-                throw new IgniteSQLException(resEx);
-
-            return sender.updateCount();
-        }
-    }
-
-    /**
-     * Execute INSERT statement plan.
-     *
-     * @param plan Plan to execute.
-     * @param cursor Cursor to take inserted data from. I.e. list of batch arguments for each query.
-     * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
-     * @return Number of items affected.
-     * @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
-     */
-    private List<UpdateResult> doInsertBatched(UpdatePlan plan, List<List<List<?>>> cursor, int pageSize)
-        throws IgniteCheckedException {
-        GridCacheContext cctx = plan.cacheContext();
-
-        DmlBatchSender snd = new DmlBatchSender(cctx, pageSize, cursor.size());
-
-        int rowNum = 0;
-
-        SQLException resEx = null;
-
-        for (List<List<?>> qryRow : cursor) {
-            for (List<?> row : qryRow) {
-                try {
-                    final IgniteBiTuple keyValPair = plan.processRow(row);
-
-                    snd.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue()), rowNum);
-                }
-                catch (Exception e) {
-                    String sqlState;
-
-                    int code;
-
-                    if (e instanceof IgniteSQLException) {
-                        sqlState = ((IgniteSQLException)e).sqlState();
-
-                        code = ((IgniteSQLException)e).statusCode();
-                    } else {
-                        sqlState = SqlStateCode.INTERNAL_ERROR;
-
-                        code = IgniteQueryErrorCode.UNKNOWN;
-                    }
-
-                    resEx = chainException(resEx, new SQLException(e.getMessage(), sqlState, code, e));
-
-                    snd.setFailed(rowNum);
-                }
-            }
-
-            rowNum++;
-        }
-
-        try {
-            snd.flush();
-        }
-        catch (Exception e) {
-            resEx = chainException(resEx, new SQLException(e.getMessage(), SqlStateCode.INTERNAL_ERROR,
-                IgniteQueryErrorCode.UNKNOWN, e));
-        }
-
-        resEx = chainException(resEx, snd.error());
-
-        if (!F.isEmpty(snd.failedKeys())) {
-            SQLException e = new SQLException("Failed to INSERT some keys because they are already in cache [keys=" +
-                snd.failedKeys() + ']', SqlStateCode.CONSTRAINT_VIOLATION, DUPLICATE_KEY);
-
-            resEx = chainException(resEx, e);
-        }
-
-        if (resEx != null) {
-            BatchUpdateException e = new BatchUpdateException(resEx.getMessage(), resEx.getSQLState(),
-                resEx.getErrorCode(), snd.perRowCounterAsArray(), resEx);
-
-            throw new IgniteCheckedException(e);
-        }
-
-        int[] cntPerRow = snd.perRowCounterAsArray();
-
-        List<UpdateResult> res = new ArrayList<>(cntPerRow.length);
-
-        for (int i = 0; i < cntPerRow.length; i++ ) {
-            int cnt = cntPerRow[i];
-
-            res.add(new UpdateResult(cnt , X.EMPTY_OBJECT_ARRAY));
-        }
-
-        return res;
-    }
-
-    /**
-     * Adds exception to the chain.
-     *
-     * @param main Exception to add another exception to.
-     * @param add Exception which should be added to chain.
-     * @return Chained exception.
-     */
-    private SQLException chainException(SQLException main, SQLException add) {
-        if (main == null) {
-            if (add != null) {
-                main = add;
-
-                return main;
-            }
-            else
-                return null;
-        }
-        else {
-            main.setNextException(add);
-
-            return main;
-        }
-    }
-
-    /**
-     *
-     * @param schemaName Schema name.
-     * @param stmt Prepared statement.
-     * @param fldsQry Query.
-     * @param filter Filter.
-     * @param cancel Cancel state.
-     * @param local Locality flag.
-     * @return Update result.
-     * @throws IgniteCheckedException if failed.
-     */
-    UpdateResult mapDistributedUpdate(String schemaName, PreparedStatement stmt, SqlFieldsQuery fldsQry,
-        IndexingQueryFilter filter, GridQueryCancel cancel, boolean local) throws IgniteCheckedException {
-        Connection c;
-
-        try {
-            c = stmt.getConnection();
-        }
-        catch (SQLException e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        return updateSqlFields(schemaName, c, GridSqlQueryParser.prepared(stmt), fldsQry, local, filter, cancel);
-    }
-
-    /**
-     * @param schema Schema name.
-     * @param conn Connection.
-     * @param stmt Prepared statement.
-     * @param qry Sql fields query
-     * @param filter Backup filter.
-     * @param cancel Query cancel object.
-     * @param local {@code true} if should be executed locally.
-     * @param topVer Topology version.
-     * @param mvccSnapshot MVCC snapshot.
-     * @return Iterator upon updated values.
-     * @throws IgniteCheckedException If failed.
-     */
-    public UpdateSourceIterator<?> prepareDistributedUpdate(String schema, Connection conn,
-        PreparedStatement stmt, SqlFieldsQuery qry,
-        IndexingQueryFilter filter, GridQueryCancel cancel, boolean local,
-        AffinityTopologyVersion topVer, MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
-
-        Prepared prepared = GridSqlQueryParser.prepared(stmt);
-
-        UpdatePlan plan = getPlanForStatement(schema, conn, prepared, qry, local, null);
-
-        GridCacheContext cctx = plan.cacheContext();
-
-        CacheOperationContext opCtx = cctx.operationContextPerCall();
-
-        // Force keepBinary for operation context to avoid binary deserialization inside entry processor
-        if (cctx.binaryMarshaller()) {
-            CacheOperationContext newOpCtx = null;
-
-            if (opCtx == null)
-                newOpCtx = new CacheOperationContext().keepBinary();
-            else if (!opCtx.isKeepBinary())
-                newOpCtx = opCtx.keepBinary();
-
-            if (newOpCtx != null)
-                cctx.operationContextPerCall(newOpCtx);
-        }
-
-        QueryCursorImpl<List<?>> cur;
-
-        // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
-        // sub-query and not some dummy stuff like "select 1, 2, 3;"
-        if (!local && !plan.isLocalSubquery()) {
-            SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), qry.isCollocated())
-                .setArgs(qry.getArgs())
-                .setDistributedJoins(qry.isDistributedJoins())
-                .setEnforceJoinOrder(qry.isEnforceJoinOrder())
-                .setLocal(qry.isLocal())
-                .setPageSize(qry.getPageSize())
-                .setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS)
-                .setDataPageScanEnabled(qry.isDataPageScanEnabled());
-
-            cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schema, newFieldsQry, null, true, true,
-                new StaticMvccQueryTracker(cctx, mvccSnapshot), cancel, false).get(0);
-        }
-        else {
-            final GridQueryFieldsResult res = idx.queryLocalSqlFields(schema, plan.selectQuery(),
-                F.asList(qry.getArgs()), filter, qry.isEnforceJoinOrder(), false, qry.getTimeout(), cancel,
-                new StaticMvccQueryTracker(cctx, mvccSnapshot), null);
-
-            cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
-                @Override public Iterator<List<?>> iterator() {
-                    try {
-                        return res.iterator();
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteException(e);
-                    }
-                }
-            }, cancel);
-        }
-
-        return plan.iteratorForTransaction(connMgr, cur);
-    }
-
-    /**
-     * Runs a DML statement for which we have internal command executor.
-     *
-     * @param schemaName Schema name.
-     * @param sql The SQL command text to execute.
-     * @param cmd The command to execute.
-     * @return The cursor returned by the statement.
-     * @throws IgniteSQLException If failed.
-     */
-    public FieldsQueryCursor<List<?>> runNativeDmlStatement(String schemaName, String sql, SqlCommand cmd) {
-        Long qryId = idx.runningQueryManager().register(sql, GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
-
-        try {
-            if (cmd instanceof SqlBulkLoadCommand)
-                return processBulkLoadCommand((SqlBulkLoadCommand)cmd, qryId);
-            else
-                throw new IgniteSQLException("Unsupported DML operation: " + sql,
-                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-
-        }
-        catch (IgniteSQLException e) {
-            idx.runningQueryManager().unregister(qryId, true);
-
-            throw e;
-        }
-        catch (Exception e) {
-            idx.runningQueryManager().unregister(qryId, true);
-
-            throw new IgniteSQLException("Unexpected DML operation failure: " + e.getMessage(), e);
-        }
-    }
-
-    /**
-     * Process bulk load COPY command.
-     *
-     * @param cmd The command.
-     * @param qryId Query id.
-     * @return The context (which is the result of the first request/response).
-     * @throws IgniteCheckedException If something failed.
-     */
-    public FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd,
-        Long qryId) throws IgniteCheckedException {
-        if (cmd.packetSize() == null)
-            cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE);
-
-        GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName());
-
-        if (tbl == null) {
-            throw new IgniteSQLException("Table does not exist: " + cmd.tableName(),
-                IgniteQueryErrorCode.TABLE_NOT_FOUND);
-        }
-
-        H2Utils.checkAndStartNotStartedCache(ctx, tbl);
-
-        UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl);
-
-        IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new BulkLoadDataConverter(plan);
-
-        IgniteDataStreamer<Object, Object> streamer = ctx.grid().dataStreamer(tbl.cacheName());
-
-        BulkLoadCacheWriter outputWriter = new BulkLoadStreamerWriter(streamer);
-
-        BulkLoadParser inputParser = BulkLoadParser.createParser(cmd.inputFormat());
-
-        BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter,
-            idx.runningQueryManager(), qryId);
-
-        BulkLoadAckClientParameters params = new BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize());
-
-        return new BulkLoadContextCursor(processor, params);
-    }
-
     /** */
-    private static final class InsertEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
+    public static final class InsertEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
         /** Value to set. */
         private final Object val;
 
         /** */
-        private InsertEntryProcessor(Object val) {
+        public InsertEntryProcessor(Object val) {
             this.val = val;
         }
 
@@ -1306,7 +54,7 @@ public class DmlStatementsProcessor {
     /**
      * Entry processor invoked by UPDATE and DELETE operations.
      */
-    private static final class ModifyingEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
+    public static final class ModifyingEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
         /** Value to expect. */
         private final Object val;
 
@@ -1314,7 +62,7 @@ public class DmlStatementsProcessor {
         private final IgniteInClosure<MutableEntry<Object, Object>> entryModifier;
 
         /** */
-        private ModifyingEntryProcessor(Object val, IgniteInClosure<MutableEntry<Object, Object>> entryModifier) {
+        public ModifyingEntryProcessor(Object val, IgniteInClosure<MutableEntry<Object, Object>> entryModifier) {
             assert val != null;
 
             this.val = val;
@@ -1342,174 +90,63 @@ public class DmlStatementsProcessor {
         }
     }
 
-    /** */
-    private static IgniteInClosure<MutableEntry<Object, Object>> RMV = new IgniteInClosure<MutableEntry<Object, Object>>() {
-        /** {@inheritDoc} */
-        @Override public void apply(MutableEntry<Object, Object> e) {
-            e.remove();
+    /** Dummy anonymous class to advance RMV anonymous value to 5. */
+    private static final Runnable DUMMY_1 = new Runnable() {
+        @Override public void run() {
+            // No-op.
         }
     };
 
-    /**
-     *
-     */
-    private static final class EntryValueUpdater implements IgniteInClosure<MutableEntry<Object, Object>> {
-        /** Value to set. */
-        private final Object val;
-
-        /** */
-        private EntryValueUpdater(Object val) {
-            assert val != null;
-
-            this.val = val;
+    /** Dummy anonymous class to advance RMV anonymous value to 5. */
+    private static final Runnable DUMMY_2 = new Runnable() {
+        @Override public void run() {
+            // No-op.
         }
+    };
 
-        /** {@inheritDoc} */
-        @Override public void apply(MutableEntry<Object, Object> e) {
-            e.setValue(val);
+    /** Dummy anonymous class to advance RMV anonymous value to 5. */
+    private static final Runnable DUMMY_3 = new Runnable() {
+        @Override public void run() {
+            // No-op.
         }
-    }
-
-    /**
-     * Check whether statement is DML statement.
-     *
-     * @param stmt Statement.
-     * @return {@code True} if this is DML.
-     */
-    static boolean isDmlStatement(Prepared stmt) {
-        return stmt instanceof Merge || stmt instanceof Insert || stmt instanceof Update || stmt instanceof Delete;
-    }
-
-    /**
-     * Check if schema supports DDL statement.
-     *
-     * @param schemaName Schema name.
-     */
-    private static void isDmlOnSchemaSupported(String schemaName) {
-        if (F.eq(QueryUtils.SCHEMA_SYS, schemaName))
-            throw new IgniteSQLException("DML statements are not supported on " + schemaName + " schema",
-                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-    }
-
-    /**
-     * Check update result for erroneous keys and throws concurrent update exception if necessary.
-     *
-     * @param r Update result.
-     */
-    static void checkUpdateResult(UpdateResult r) {
-        if (!F.isEmpty(r.errorKeys())) {
-            String msg = "Failed to update some keys because they had been modified concurrently " +
-                "[keys=" + Arrays.toString(r.errorKeys()) + ']';
+    };
 
-            SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+    /** Dummy anonymous class to advance RMV anonymous value to 5. */
+    private static final Runnable DUMMY_4 = new Runnable() {
+        @Override public void run() {
+            // No-op.
+        }
+    };
 
-            throw new IgniteSQLException(conEx);
+    /** Remove updater. Must not be moved around to keep at anonymous position 5. */
+    public static final IgniteInClosure<MutableEntry<Object, Object>> RMV =
+        new IgniteInClosure<MutableEntry<Object, Object>>() {
+        @Override public void apply(MutableEntry<Object, Object> e) {
+            e.remove();
         }
-    }
+    };
 
     /**
-     * Converts a row of values to actual key+value using {@link UpdatePlan#processRow(List)}.
+     * Entry value updater.
      */
-    private static class BulkLoadDataConverter extends IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> {
-        /** Update plan to convert incoming rows. */
-        private final UpdatePlan plan;
-
-        /**
-         * Creates the converter with the given update plan.
-         *
-         * @param plan The update plan to use.
-         */
-        private BulkLoadDataConverter(UpdatePlan plan) {
-            this.plan = plan;
-        }
+    public static final class EntryValueUpdater implements IgniteInClosure<MutableEntry<Object, Object>> {
+        /** Value to set. */
+        private final Object val;
 
         /**
-         * Converts the record to a key+value.
+         * Constructor.
          *
-         * @param record The record to convert.
-         * @return The key+value.
-         * @throws IgniteCheckedException If conversion failed for some reason.
+         * @param val Value.
          */
-        @Override public IgniteBiTuple<?, ?> applyx(List<?> record) throws IgniteCheckedException {
-            return plan.processRow(record);
-        }
-    }
-
-    /** */
-    private static class DmlUpdateResultsIterator
-        implements UpdateSourceIterator<Object> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private EnlistOperation op;
-
-        /** */
-        private UpdatePlan plan;
-
-        /** */
-        private Iterator<List<?>> it;
-
-        /** */
-        DmlUpdateResultsIterator(EnlistOperation op, UpdatePlan plan, Iterable<List<?>> rows) {
-            this.op = op;
-            this.plan = plan;
-            this.it = rows.iterator();
-        }
-
-        /** {@inheritDoc} */
-        @Override public EnlistOperation operation() {
-            return op;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNextX() {
-            return it.hasNext();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object nextX() throws IgniteCheckedException {
-            return plan.processRowForTx(it.next());
-        }
-    }
-
-    /** */
-    private static class DmlUpdateSingleEntryIterator<T> implements UpdateSourceIterator<T> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private EnlistOperation op;
-
-        /** */
-        private boolean first = true;
-
-        /** */
-        private T entry;
-
-        /** */
-        DmlUpdateSingleEntryIterator(EnlistOperation op, T entry) {
-            this.op = op;
-            this.entry = entry;
-        }
-
-        /** {@inheritDoc} */
-        @Override public EnlistOperation operation() {
-            return op;
-        }
+        public EntryValueUpdater(Object val) {
+            assert val != null;
 
-        /** {@inheritDoc} */
-        @Override public boolean hasNextX() {
-            return first;
+            this.val = val;
         }
 
         /** {@inheritDoc} */
-        @Override public T nextX() {
-            T res = first ? entry : null;
-
-            first = false;
-
-            return res;
+        @Override public void apply(MutableEntry<Object, Object> e) {
+            e.setValue(val);
         }
     }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
index 7b43f52..300ed6c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
@@ -46,21 +46,6 @@ class H2CachedStatementKey {
     }
 
     /**
-     * Build key with details relevant to DML plans cache.
-     *
-     * @param schemaName Schema name.
-     * @param sql SQL.
-     * @param fieldsQry Query with flags.
-     * @param loc DML {@code SELECT} Locality flag.
-     * @return Statement key.
-     * @see UpdatePlanBuilder
-     * @see DmlStatementsProcessor#getPlanForStatement
-     */
-    static H2CachedStatementKey forDmlStatement(String schemaName, String sql, SqlFieldsQuery fieldsQry, boolean loc) {
-        return new H2CachedStatementKey(schemaName, sql, fieldsQry, loc);
-    }
-
-    /**
      * Full-fledged constructor.
      *
      * @param schemaName Schema name.
@@ -68,7 +53,7 @@ class H2CachedStatementKey {
      * @param fieldsQry Query with flags.
      * @param loc DML {@code SELECT} Locality flag.
      */
-    private H2CachedStatementKey(String schemaName, String sql, SqlFieldsQuery fieldsQry, boolean loc) {
+    public H2CachedStatementKey(String schemaName, String sql, SqlFieldsQuery fieldsQry, boolean loc) {
         this.schemaName = schemaName;
         this.sql = sql;
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 6d346c0..2b60286 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -17,19 +17,23 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
+import java.sql.BatchUpdateException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import javax.cache.CacheException;
@@ -38,6 +42,8 @@ import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheServerNotFoundException;
+import org.apache.ignite.cache.query.BulkLoadContextCursor;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -46,9 +52,16 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadParser;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter;
 import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
@@ -61,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.TxTopologyVe
 import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
+import org.apache.ignite.internal.processors.cache.mvcc.StaticMvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
@@ -74,6 +88,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.query.CacheQueryObjectValueContext;
+import org.apache.ignite.internal.processors.query.EnlistOperation;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -89,10 +104,21 @@ import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.QueryHistoryMetrics;
 import org.apache.ignite.internal.processors.query.QueryHistoryMetricsKey;
 import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
+import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.RunningQueryManager;
 import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
 import org.apache.ignite.internal.processors.query.h2.affinity.H2PartitionResolver;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlBulkLoadDataConverter;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateResultsIterator;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateSingleEntryIterator;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
+import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
+import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
+import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
 import org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeClientIndex;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
@@ -107,8 +133,6 @@ import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuery;
@@ -119,7 +143,6 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable;
 import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
 import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
 import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
-import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
@@ -139,15 +162,17 @@ import org.apache.ignite.internal.sql.command.SqlDropIndexCommand;
 import org.apache.ignite.internal.sql.command.SqlDropUserCommand;
 import org.apache.ignite.internal.sql.command.SqlRollbackTransactionCommand;
 import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
-import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.lang.IgniteClosureX;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -178,6 +203,7 @@ import org.jetbrains.annotations.Nullable;
 import static java.lang.Boolean.FALSE;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.requestSnapshot;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
@@ -217,6 +243,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** */
     private static final int TWO_STEP_QRY_CACHE_SIZE = 1024;
 
+    /** Default number of attempts to re-run DELETE and UPDATE queries in case of concurrent modifications of values. */
+    private static final int DFLT_UPDATE_RERUN_ATTEMPTS = 4;
+
+    /** Default size for update plan cache. */
+    private static final int UPDATE_PLAN_CACHE_SIZE = 1024;
+
+    /** Cached value of {@code IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION}. */
+    private final boolean updateInTxAllowed =
+        Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION);
+
+    /** Update plans cache. */
+    private final ConcurrentMap<H2CachedStatementKey, UpdatePlan> updatePlanCache =
+        new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE);
+
     /** Logger. */
     @LoggerResource
     private IgniteLogger log;
@@ -249,9 +289,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private final QueryContextRegistry qryCtxRegistry = new QueryContextRegistry();
 
     /** */
-    private DmlStatementsProcessor dmlProc;
-
-    /** */
     private DdlStatementsProcessor ddlProc;
 
     /** Partition reservation manager. */
@@ -467,35 +504,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param filter Cache name and key filter.
      * @param enforceJoinOrder Enforce join order of tables in the query.
      * @param startTx Start transaction flag.
-     * @param timeout Query timeout in milliseconds.
-     * @param cancel Query cancel.
-     * @param dataPageScanEnabled If data page scan is enabled.
-     * @return Query result.
-     * @throws IgniteCheckedException If failed.
-     */
-    public GridQueryFieldsResult queryLocalSqlFields(
-        String schemaName,
-        String qry,
-        @Nullable Collection<Object> params,
-        IndexingQueryFilter filter,
-        boolean enforceJoinOrder,
-        boolean startTx,
-        int timeout,
-        GridQueryCancel cancel,
-        Boolean dataPageScanEnabled
-    ) throws IgniteCheckedException {
-        return queryLocalSqlFields(schemaName, qry, params, filter, enforceJoinOrder, startTx, timeout, cancel, null, dataPageScanEnabled);
-    }
-
-    /**
-     * Queries individual fields (generally used by JDBC drivers).
-     *
-     * @param schemaName Schema name.
-     * @param qry Query.
-     * @param params Query parameters.
-     * @param filter Cache name and key filter.
-     * @param enforceJoinOrder Enforce join order of tables in the query.
-     * @param startTx Start transaction flag.
      * @param qryTimeout Query timeout in milliseconds.
      * @param cancel Query cancel.
      * @param mvccTracker Query tracker.
@@ -503,7 +511,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @return Query result.
      * @throws IgniteCheckedException If failed.
      */
-    GridQueryFieldsResult queryLocalSqlFields(
+    private GridQueryFieldsResult executeQueryLocal0(
         final String schemaName,
         String qry,
         @Nullable final Collection<Object> params,
@@ -533,7 +541,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             Prepared p = GridSqlQueryParser.prepared(stmt);
 
-            if (DmlStatementsProcessor.isDmlStatement(p)) {
+            if (GridSqlQueryParser.isDml(p)) {
                 SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry);
 
                 if (params != null)
@@ -543,7 +551,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 fldsQry.setTimeout(qryTimeout, TimeUnit.MILLISECONDS);
                 fldsQry.setDataPageScanEnabled(dataPageScanEnabled);
 
-                return dmlProc.updateSqlFieldsLocal(schemaName, conn, p, fldsQry, filter, cancel);
+                UpdateResult updRes = executeUpdate(schemaName, conn, p, fldsQry, true, filter, cancel);
+
+                List<?> updResRow = Collections.singletonList(updRes.counter());
+
+                return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META, new IgniteSingletonIterator<>(updResRow));
             }
             else if (DdlStatementsProcessor.isDdlStatement(p)) {
                 throw new IgniteSQLException("DDL statements are supported for the whole cluster only.",
@@ -761,7 +773,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             throw new IgniteSQLException(e);
         }
 
-        return dmlProc.streamUpdateQuery(qry, schemaName, streamer, stmt, params);
+        return streamQuery0(qry, schemaName, streamer, stmt, params);
     }
 
     /** {@inheritDoc} */
@@ -786,7 +798,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         Prepared p = GridSqlQueryParser.prepared(stmt);
 
-        UpdatePlan plan = dmlProc.getPlanForStatement(schemaName, conn, p, null, true, null);
+        UpdatePlan plan = updatePlan(schemaName, conn, p, null, true);
 
         IgniteDataStreamer<?, ?> streamer = cliCtx.streamerForCache(plan.cacheContext().name());
 
@@ -795,12 +807,119 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         List<Long> res = new ArrayList<>(params.size());
 
         for (int i = 0; i < params.size(); i++)
-            res.add(dmlProc.streamUpdateQuery(qry, schemaName, streamer, stmt, params.get(i)));
+            res.add(streamQuery0(qry, schemaName, streamer, stmt, params.get(i)));
 
         return res;
     }
 
     /**
+     * Perform given statement against given data streamer. Only rows based INSERT is supported.
+     *
+     * @param qry Query.
+     * @param schemaName Schema name.
+     * @param streamer Streamer to feed data to.
+     * @param stmt Statement.
+     * @param args Statement arguments.
+     * @return Number of rows in given INSERT statement.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings({"unchecked", "Anonymous2MethodRef"})
+    private long streamQuery0(String qry, String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt,
+        final Object[] args) throws IgniteCheckedException {
+        Long qryId = runningQueryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
+
+        boolean fail = false;
+
+        try {
+            checkStatementStreamable(stmt);
+
+            Prepared p = GridSqlQueryParser.prepared(stmt);
+
+            assert p != null;
+
+            final UpdatePlan plan = updatePlan(schemaName, null, p, null, true);
+
+            assert plan.isLocalSubquery();
+
+            final GridCacheContext cctx = plan.cacheContext();
+
+            QueryCursorImpl<List<?>> cur;
+
+            final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
+
+            QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    try {
+                        Object[] params = args != null ? args : X.EMPTY_OBJECT_ARRAY;
+
+                        Iterator<List<?>> it;
+
+                        if (!F.isEmpty(plan.selectQuery())) {
+                            GridQueryFieldsResult res = executeQueryLocal0(
+                                schema(cctx.name()),
+                                plan.selectQuery(),
+                                F.asList(params),
+                                null,
+                                false,
+                                false,
+                                0,
+                                null,
+                                null,
+                                null
+                            );
+
+                            it = res.iterator();
+                        }
+                        else
+                            it = plan.createRows(params).iterator();
+
+                        return new GridQueryCacheObjectsIterator(it, objectContext(), cctx.keepBinary());
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+            }, null);
+
+            data.addAll(stepCur.getAll());
+
+            cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    return data.iterator();
+                }
+            }, null);
+
+            if (plan.rowCount() == 1) {
+                IgniteBiTuple t = plan.processRow(cur.iterator().next());
+
+                streamer.addData(t.getKey(), t.getValue());
+
+                return 1;
+            }
+
+            Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
+
+            for (List<?> row : cur) {
+                final IgniteBiTuple t = plan.processRow(row);
+
+                rows.put(t.getKey(), t.getValue());
+            }
+
+            streamer.addData(rows);
+
+            return rows.size();
+        }
+        catch (IgniteCheckedException e) {
+            fail = true;
+
+            throw e;
+        }
+        finally {
+            runningQueryMgr.unregister(qryId, fail);
+        }
+    }
+
+    /**
      * @param size Result size.
      * @return List of given size filled with 0Ls.
      */
@@ -1000,17 +1119,39 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry,
-        final boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel,
-        Long qryId) throws IgniteCheckedException {
-        String sql = qry.getSql();
-        List<Object> params = F.asList(qry.getArgs());
-        boolean enforceJoinOrder = qry.isEnforceJoinOrder(), startTx = autoStartTx(qry);
-        int timeout = qry.getTimeout();
+    /**
+     * Queries individual fields (generally used by JDBC drivers).
+     *
+     * @param schemaName Schema name.
+     * @param qry Query.
+     * @param keepBinary Keep binary flag.
+     * @param filter Cache name and key filter.
+     * @param cancel Query cancel.
+     * @param qryId Running query id. {@code null} in case query is not registered.
+     * @return Cursor.
+     */
+    private FieldsQueryCursor<List<?>> executeQueryLocal(
+        String schemaName,
+        SqlFieldsQuery qry,
+        final boolean keepBinary,
+        IndexingQueryFilter filter,
+        GridQueryCancel cancel,
+        Long qryId
+    ) throws IgniteCheckedException {
+        boolean startTx = autoStartTx(qry);
 
-        final GridQueryFieldsResult res = queryLocalSqlFields(schemaName, sql, params, filter,
-            enforceJoinOrder, startTx, timeout, cancel, qry.isDataPageScanEnabled());
+        final GridQueryFieldsResult res = executeQueryLocal0(
+            schemaName,
+            qry.getSql(),
+            F.asList(qry.getArgs()),
+            filter,
+            qry.isEnforceJoinOrder(),
+            startTx,
+            qry.getTimeout(),
+            cancel,
+            null,
+            qry.isDataPageScanEnabled()
+        );
 
         Iterable<List<?>> iter = () -> {
             try {
@@ -1323,38 +1464,27 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private List<FieldsQueryCursor<List<?>>> queryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry,
         SqlCommand cmd, @Nullable SqlClientContext cliCtx) {
         boolean fail = false;
+        boolean unregister = true;
 
-        // Execute.
-        if (cmd instanceof SqlBulkLoadCommand)
-            return Collections.singletonList(dmlProc.runNativeDmlStatement(schemaName, qry.getSql(), cmd));
-
-        //Always registry new running query for native commands except COPY. Currently such operations don't support cancellation.
         Long qryId = registerRunningQuery(schemaName, null, qry.getSql(), qry.isLocal(), true);
 
         try {
-            if (cmd instanceof SqlCreateIndexCommand
-                || cmd instanceof SqlDropIndexCommand
-                || cmd instanceof SqlAlterTableCommand
-                || cmd instanceof SqlCreateUserCommand
-                || cmd instanceof SqlAlterUserCommand
-                || cmd instanceof SqlDropUserCommand)
-                return Collections.singletonList(ddlProc.runDdlStatement(qry.getSql(), cmd));
-            else if (cmd instanceof SqlSetStreamingCommand) {
-                if (cliCtx == null)
-                    throw new IgniteSQLException("SET STREAMING command can only be executed from JDBC or ODBC driver.");
-
-                SqlSetStreamingCommand setCmd = (SqlSetStreamingCommand)cmd;
-
-                if (setCmd.isTurnOn())
-                    cliCtx.enableStreaming(setCmd.allowOverwrite(), setCmd.flushFrequency(),
-                        setCmd.perNodeBufferSize(), setCmd.perNodeParallelOperations(), setCmd.isOrdered());
-                else
-                    cliCtx.disableStreaming();
+            FieldsQueryCursor<List<?>> cur;
+
+            if (DdlStatementsProcessor.isDdlCommand(cmd))
+                cur = ddlProc.runDdlStatement(qry.getSql(), cmd);
+            else if (cmd instanceof SqlBulkLoadCommand) {
+                // Query will be unregistered when cursor is closed.
+                unregister = false;
+
+                cur = processBulkLoadCommand((SqlBulkLoadCommand) cmd, qryId);
             }
+            else if (cmd instanceof SqlSetStreamingCommand)
+                cur = processSetStreamingCommand((SqlSetStreamingCommand)cmd, cliCtx);
             else
-                processTxCommand(cmd, qry);
+                cur = processTxCommand(cmd, qry);
 
-            return Collections.singletonList(H2Utils.zeroCursor());
+            return Collections.singletonList(cur);
         }
         catch (IgniteCheckedException e) {
             fail = true;
@@ -1363,17 +1493,44 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 ", err=" + e.getMessage() + ']', e);
         }
         finally {
-            runningQueryMgr.unregister(qryId, fail);
+            if (unregister || fail)
+                runningQueryMgr.unregister(qryId, fail);
         }
     }
 
     /**
+     * Process SET STREAMING command.
+     *
+     * @param cmd Command.
+     * @param cliCtx Client context.
+     * @return Cursor.
+     */
+    private FieldsQueryCursor<List<?>> processSetStreamingCommand(SqlSetStreamingCommand cmd,
+        @Nullable SqlClientContext cliCtx) {
+        if (cliCtx == null)
+            throw new IgniteSQLException("SET STREAMING command can only be executed from JDBC or ODBC driver.");
+
+        if (cmd.isTurnOn())
+            cliCtx.enableStreaming(
+                cmd.allowOverwrite(),
+                cmd.flushFrequency(),
+                cmd.perNodeBufferSize(),
+                cmd.perNodeParallelOperations(),
+                cmd.isOrdered()
+            );
+        else
+            cliCtx.disableStreaming();
+
+        return H2Utils.zeroCursor();
+    }
+
+    /**
      * Check expected statement type (when it is set by JDBC) and given statement type.
      *
      * @param qry Query.
      * @param isQry {@code true} for select queries, otherwise (DML/DDL queries) {@code false}.
      */
-    private void checkQueryType(SqlFieldsQuery qry, boolean isQry) {
+    private static void checkQueryType(SqlFieldsQuery qry, boolean isQry) {
         Boolean qryFlag = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx) qry).isQuery() : null;
 
         if (qryFlag != null && qryFlag != isQry)
@@ -1387,7 +1544,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param qry Query.
      * @throws IgniteCheckedException if failed.
      */
-    private void processTxCommand(SqlCommand cmd, SqlFieldsQuery qry) throws IgniteCheckedException {
+    private FieldsQueryCursor<List<?>> processTxCommand(SqlCommand cmd, SqlFieldsQuery qry)
+        throws IgniteCheckedException {
         NestedTxMode nestedTxMode = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx)qry).getNestedTxMode() :
             NestedTxMode.DEFAULT;
 
@@ -1439,6 +1597,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (tx != null)
                 doRollback(tx);
         }
+
+        return H2Utils.zeroCursor();
     }
 
     /**
@@ -1448,9 +1608,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     private void doCommit(@NotNull GridNearTxLocal tx) throws IgniteCheckedException {
         try {
-            // TODO: Why checking for rollback only?
-            //if (!tx.isRollbackOnly())
-                tx.commit();
+            tx.commit();
         }
         finally {
             closeTx(tx);
@@ -1485,6 +1643,46 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     }
 
+    /**
+     * Process bulk load COPY command.
+     *
+     * @param cmd The command.
+     * @param qryId Query id.
+     * @return The context (which is the result of the first request/response).
+     * @throws IgniteCheckedException If something failed.
+     */
+    private FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd, Long qryId)
+        throws IgniteCheckedException {
+        if (cmd.packetSize() == null)
+            cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE);
+
+        GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName());
+
+        if (tbl == null) {
+            throw new IgniteSQLException("Table does not exist: " + cmd.tableName(),
+                IgniteQueryErrorCode.TABLE_NOT_FOUND);
+        }
+
+        H2Utils.checkAndStartNotStartedCache(ctx, tbl);
+
+        UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl);
+
+        IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new DmlBulkLoadDataConverter(plan);
+
+        IgniteDataStreamer<Object, Object> streamer = ctx.grid().dataStreamer(tbl.cacheName());
+
+        BulkLoadCacheWriter outputWriter = new BulkLoadStreamerWriter(streamer);
+
+        BulkLoadParser inputParser = BulkLoadParser.createParser(cmd.inputFormat());
+
+        BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter,
+            runningQueryMgr, qryId);
+
+        BulkLoadAckClientParameters params = new BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize());
+
+        return new BulkLoadContextCursor(processor, params);
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"StringEquality", "unchecked"})
     @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
@@ -1625,14 +1823,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param meta Metadata for {@code twoStepQry}.
      * @param keepBinary Whether binary objects must not be deserialized automatically.
      * @param startTx Start transaction flag.
-     * @param tracker MVCC tracker.
+     * @param mvccTracker MVCC tracker.
      * @param cancel Query cancel state holder.
      * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
      * @return Query result.
      */
     private List<? extends FieldsQueryCursor<List<?>>> doRunPrepared(String schemaName, Prepared prepared,
         SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, boolean keepBinary,
-        boolean startTx, MvccQueryTracker tracker, GridQueryCancel cancel, boolean registerAsNewQry) {
+        boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel, boolean registerAsNewQry) {
         String sqlQry = qry.getSql();
 
         boolean loc = qry.isLocal();
@@ -1645,26 +1843,19 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             boolean fail = false;
 
             try {
-                if (DmlStatementsProcessor.isDmlStatement(prepared)) {
+                if (GridSqlQueryParser.isDml(prepared)) {
                     try {
                         Connection conn = connMgr.connectionForThread().connection(schemaName);
 
                         if (!loc)
-                            return dmlProc.updateSqlFieldsDistributed(schemaName, conn, prepared, qry, cancel);
+                            return executeUpdateDistributed(schemaName, conn, prepared, qry, cancel);
                         else {
-                            final GridQueryFieldsResult updRes =
-                                dmlProc.updateSqlFieldsLocal(schemaName, conn, prepared, qry, filter, cancel);
+                            UpdateResult updRes = executeUpdate(schemaName, conn, prepared, qry, true, filter, cancel);
 
                             return Collections.singletonList(new QueryCursorImpl<>(new Iterable<List<?>>() {
                                 @SuppressWarnings("NullableProblems")
                                 @Override public Iterator<List<?>> iterator() {
-                                    try {
-                                        return new GridQueryCacheObjectsIterator(updRes.iterator(), objectContext(),
-                                            true);
-                                    }
-                                    catch (IgniteCheckedException e) {
-                                        throw new IgniteException(e);
-                                    }
+                                    return new IgniteSingletonIterator<>(Collections.singletonList(updRes.counter()));
                                 }
                             }, cancel));
                         }
@@ -1710,16 +1901,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (ctx.security().enabled())
                 checkSecurity(twoStepQry.cacheIds());
 
-            return Collections.singletonList(doRunDistributedQuery(schemaName, qry, twoStepQry, meta, keepBinary,
-                startTx, tracker, cancel, registerAsNewQry));
-
+            return Collections.singletonList(executeQuery(schemaName, qry, twoStepQry, meta, keepBinary,
+                startTx, mvccTracker, cancel, registerAsNewQry));
         }
 
         // We've encountered a local query, let's just run it.
         Long qryId = registerRunningQuery(schemaName, cancel, sqlQry, loc, registerAsNewQry);
 
         try {
-            return Collections.singletonList(queryLocalSqlFields(schemaName, qry, keepBinary, filter, cancel, qryId));
+            return Collections.singletonList(executeQueryLocal(schemaName, qry, keepBinary, filter, cancel, qryId));
         }
         catch (IgniteCheckedException e) {
             runningQueryMgr.unregister(qryId, true);
@@ -1963,17 +2153,26 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public UpdateSourceIterator<?> prepareDistributedUpdate(GridCacheContext<?, ?> cctx, int[] ids,
-        int[] parts, String schema, String qry, Object[] params, int flags,
-        int pageSize, int timeout, AffinityTopologyVersion topVer,
-        MvccSnapshot mvccSnapshot, GridQueryCancel cancel) throws IgniteCheckedException {
-
+    @Override public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(
+        GridCacheContext<?, ?> cctx,
+        int[] ids,
+        int[] parts,
+        String schema,
+        String qry,
+        Object[] params,
+        int flags,
+        int pageSize,
+        int timeout,
+        AffinityTopologyVersion topVer,
+        MvccSnapshot mvccSnapshot,
+        GridQueryCancel cancel
+    ) throws IgniteCheckedException {
         SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry);
 
         if (params != null)
             fldsQry.setArgs(params);
 
-        fldsQry.setEnforceJoinOrder(isFlagSet(flags, GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER));
+        fldsQry.setEnforceJoinOrder(U.isFlagSet(flags, GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER));
         fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
         fldsQry.setPageSize(pageSize);
         fldsQry.setLocal(true);
@@ -1981,7 +2180,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         boolean loc = true;
 
-        final boolean replicated = isFlagSet(flags, GridH2QueryRequest.FLAG_REPLICATED);
+        final boolean replicated = U.isFlagSet(flags, GridH2QueryRequest.FLAG_REPLICATED);
 
         GridCacheContext<?, ?> cctx0;
 
@@ -2001,19 +2200,52 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         PreparedStatement stmt = preparedStatementWithParams(conn, fldsQry.getSql(),
             F.asList(fldsQry.getArgs()), true);
 
-        return dmlProc.prepareDistributedUpdate(schema, conn, stmt, fldsQry, backupFilter(topVer, parts), cancel, loc,
-            topVer, mvccSnapshot);
-    }
+        IndexingQueryFilter filter = backupFilter(topVer, parts);
 
-    /**
-     * Check if flag set.
-     *
-     * @param flags Flags.
-     * @param flag Flag.
-     * @return {@code True} if set.
-     */
-    private boolean isFlagSet(int flags, int flag) {
-        return (flags & flag) == flag;
+        Prepared prepared = GridSqlQueryParser.prepared(stmt);
+
+        UpdatePlan plan = updatePlan(schema, conn, prepared, fldsQry, loc);
+
+        GridCacheContext planCctx = plan.cacheContext();
+
+        // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+        DmlUtils.setKeepBinaryContext(planCctx);
+
+        QueryCursorImpl<List<?>> cur;
+
+        // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
+        // sub-query and not some dummy stuff like "select 1, 2, 3;"
+        if (!loc && !plan.isLocalSubquery()) {
+            SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fldsQry.isCollocated())
+                .setArgs(fldsQry.getArgs())
+                .setDistributedJoins(fldsQry.isDistributedJoins())
+                .setEnforceJoinOrder(fldsQry.isEnforceJoinOrder())
+                .setLocal(fldsQry.isLocal())
+                .setPageSize(fldsQry.getPageSize())
+                .setTimeout(fldsQry.getTimeout(), TimeUnit.MILLISECONDS)
+                .setDataPageScanEnabled(fldsQry.isDataPageScanEnabled());
+
+            cur = (QueryCursorImpl<List<?>>)querySqlFields(schema, newFieldsQry, null, true, true,
+                new StaticMvccQueryTracker(planCctx, mvccSnapshot), cancel, false).get(0);
+        }
+        else {
+            final GridQueryFieldsResult res = executeQueryLocal0(schema, plan.selectQuery(),
+                F.asList(fldsQry.getArgs()), filter, fldsQry.isEnforceJoinOrder(), false, fldsQry.getTimeout(), cancel,
+                new StaticMvccQueryTracker(planCctx, mvccSnapshot), null);
+
+            cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    try {
+                        return res.iterator();
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+            }, cancel);
+        }
+
+        return plan.iteratorForTransaction(connMgr, cur);
     }
 
     /**
@@ -2029,7 +2261,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
      * @return Cursor representing distributed query result.
      */
-    private FieldsQueryCursor<List<?>> doRunDistributedQuery(String schemaName, SqlFieldsQuery qry,
+    private FieldsQueryCursor<List<?>> executeQuery(String schemaName, SqlFieldsQuery qry,
         GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, boolean keepBinary,
         boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel, boolean registerAsNewQry) {
         if (log.isDebugEnabled())
@@ -2159,26 +2391,40 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * Run DML request from other node.
+     * Executes DML request on map node. Happens only for "skip reducer" mode.
      *
      * @param schemaName Schema name.
-     * @param fldsQry Query.
+     * @param qry Query.
      * @param filter Filter.
      * @param cancel Cancel state.
-     * @param local Locality flag.
+     * @param loc Locality flag.
      * @return Update result.
      * @throws IgniteCheckedException if failed.
      */
-    public UpdateResult mapDistributedUpdate(String schemaName, SqlFieldsQuery fldsQry, IndexingQueryFilter filter,
-        GridQueryCancel cancel, boolean local) throws IgniteCheckedException {
+    public UpdateResult executeUpdateOnDataNode(
+        String schemaName,
+        SqlFieldsQuery qry,
+        IndexingQueryFilter filter,
+        GridQueryCancel cancel,
+        boolean loc
+    ) throws IgniteCheckedException {
         Connection conn = connMgr.connectionForThread().connection(schemaName);
 
-        H2Utils.setupConnection(conn, false, fldsQry.isEnforceJoinOrder());
+        H2Utils.setupConnection(conn, false, qry.isEnforceJoinOrder());
 
-        PreparedStatement stmt = preparedStatementWithParams(conn, fldsQry.getSql(),
-            Arrays.asList(fldsQry.getArgs()), true);
+        PreparedStatement stmt = preparedStatementWithParams(conn, qry.getSql(),
+            Arrays.asList(qry.getArgs()), true);
+
+        Connection c;
+
+        try {
+            c = stmt.getConnection();
+        }
+        catch (SQLException e) {
+            throw new IgniteCheckedException(e);
+        }
 
-        return dmlProc.mapDistributedUpdate(schemaName, stmt, fldsQry, filter, cancel, local);
+        return executeUpdate(schemaName, c, GridSqlQueryParser.prepared(stmt), qry, loc, filter, cancel);
     }
 
     /**
@@ -2438,7 +2684,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         mapQryExec.start(ctx, this);
         rdcQryExec.start(ctx, this);
 
-        dmlProc = new DmlStatementsProcessor(ctx, this);
         ddlProc = new DdlStatementsProcessor(ctx, schemaMgr);
 
         partExtractor = new PartitionExtractor(new H2PartitionResolver(this));
@@ -2630,7 +2875,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         String cacheName = cacheInfo.name();
 
         partReservationMgr.onCacheStop(cacheName);
-        dmlProc.onCacheStop(cacheName);
+
+        // Remove cached DML plans.
+        Iterator<Map.Entry<H2CachedStatementKey, UpdatePlan>> iter = updatePlanCache.entrySet().iterator();
+
+        while (iter.hasNext()) {
+            UpdatePlan plan = iter.next().getValue();
+
+            if (F.eq(cacheName, plan.cacheContext().name()))
+                iter.remove();
+        }
 
         // Drop schema (needs to be called after callback to DML processor because the latter depends on schema).
         schemaMgr.onCacheDestroyed(cacheName, rmvIdx);
@@ -2824,4 +3078,416 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             return cacheIds;
         }
     }
+
+    /**
+     * @param schemaName Schema.
+     * @param conn Connection.
+     * @param prepared Prepared statement.
+     * @param fieldsQry Initial query
+     * @param cancel Query cancel.
+     * @return Update result wrapped into {@link GridQueryFieldsResult}
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("unchecked")
+    private List<QueryCursorImpl<List<?>>> executeUpdateDistributed(
+        String schemaName,
+        Connection conn,
+        Prepared prepared,
+        SqlFieldsQuery fieldsQry,
+        GridQueryCancel cancel
+    ) throws IgniteCheckedException {
+        if (DmlUtils.isBatched(fieldsQry)) {
+            SqlFieldsQueryEx fieldsQry0 = (SqlFieldsQueryEx)fieldsQry;
+
+            Collection<UpdateResult> ress;
+
+            List<Object[]> argss = fieldsQry0.batchedArguments();
+
+            UpdatePlan plan = updatePlan(schemaName, conn, prepared, fieldsQry0, false);
+
+            GridCacheContext<?, ?> cctx = plan.cacheContext();
+
+            // For MVCC case, let's enlist batch elements one by one.
+            if (plan.hasRows() && plan.mode() == UpdateMode.INSERT && !cctx.mvccEnabled()) {
+                CacheOperationContext opCtx = DmlUtils.setKeepBinaryContext(cctx);
+
+                try {
+                    List<List<List<?>>> cur = plan.createRows(argss);
+
+                    ress = DmlUtils.processSelectResultBatched(plan, cur, fieldsQry0.getPageSize());
+                }
+                finally {
+                    DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
+                }
+            }
+            else {
+                // Fallback to previous mode.
+                ress = new ArrayList<>(argss.size());
+
+                SQLException batchException = null;
+
+                int[] cntPerRow = new int[argss.size()];
+
+                int cntr = 0;
+
+                for (Object[] args : argss) {
+                    SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)fieldsQry0.copy();
+
+                    qry0.clearBatchedArgs();
+                    qry0.setArgs(args);
+
+                    UpdateResult res;
+
+                    try {
+                        res = executeUpdate(schemaName, conn, prepared, qry0, false, null, cancel);
+
+                        cntPerRow[cntr++] = (int)res.counter();
+
+                        ress.add(res);
+                    }
+                    catch (Exception e ) {
+                        SQLException sqlEx = QueryUtils.toSqlException(e);
+
+                        batchException = DmlUtils.chainException(batchException, sqlEx);
+
+                        cntPerRow[cntr++] = Statement.EXECUTE_FAILED;
+                    }
+                }
+
+                if (batchException != null) {
+                    BatchUpdateException e = new BatchUpdateException(batchException.getMessage(),
+                        batchException.getSQLState(), batchException.getErrorCode(), cntPerRow, batchException);
+
+                    throw new IgniteCheckedException(e);
+                }
+            }
+
+            ArrayList<QueryCursorImpl<List<?>>> resCurs = new ArrayList<>(ress.size());
+
+            for (UpdateResult res : ress) {
+                res.throwIfError();
+
+                QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
+                    (Collections.singletonList(res.counter())), cancel, false);
+
+                resCur.fieldsMeta(UPDATE_RESULT_META);
+
+                resCurs.add(resCur);
+            }
+
+            return resCurs;
+        }
+        else {
+            UpdateResult res = executeUpdate(schemaName, conn, prepared, fieldsQry, false, null, cancel);
+
+            res.throwIfError();
+
+            QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
+                (Collections.singletonList(res.counter())), cancel, false);
+
+            resCur.fieldsMeta(UPDATE_RESULT_META);
+
+            return Collections.singletonList(resCur);
+        }
+    }
+
+    /**
+     * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
+     *
+     * @param schemaName Schema.
+     * @param conn Connection.
+     * @param prepared Prepared statement.
+     * @param fieldsQry Original query.
+     * @param loc Query locality flag.
+     * @param filters Cache name and key filter.
+     * @param cancel Cancel.
+     * @return Update result (modified items count and failed keys).
+     * @throws IgniteCheckedException if failed.
+     */
+    private UpdateResult executeUpdate(String schemaName, Connection conn, Prepared prepared,
+        SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel)
+        throws IgniteCheckedException {
+        Object[] errKeys = null;
+
+        long items = 0;
+
+        UpdatePlan plan = updatePlan(schemaName, conn, prepared, fieldsQry, loc);
+
+        GridCacheContext<?, ?> cctx = plan.cacheContext();
+
+        for (int i = 0; i < DFLT_UPDATE_RERUN_ATTEMPTS; i++) {
+            CacheOperationContext opCtx = DmlUtils.setKeepBinaryContext(cctx);
+
+            UpdateResult r;
+
+            try {
+                r = executeUpdate0(schemaName, plan, fieldsQry, loc, filters, cancel);
+            }
+            finally {
+                DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
+            }
+
+            items += r.counter();
+            errKeys = r.errorKeys();
+
+            if (F.isEmpty(errKeys))
+                break;
+        }
+
+        if (F.isEmpty(errKeys)) {
+            if (items == 1L)
+                return UpdateResult.ONE;
+            else if (items == 0L)
+                return UpdateResult.ZERO;
+        }
+
+        return new UpdateResult(items, errKeys);
+    }
+
+    /**
+     * Actually perform SQL DML operation locally.
+     *
+     * @param schemaName Schema name.
+     * @param plan Cache context.
+     * @param fieldsQry Fields query.
+     * @param loc Local query flag.
+     * @param filters Cache name and key filter.
+     * @param cancel Query cancel state holder.
+     * @return Pair [number of successfully processed items; keys that have failed to be processed]
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings({"ConstantConditions"})
+    private UpdateResult executeUpdate0(
+        String schemaName,
+        final UpdatePlan plan,
+        SqlFieldsQuery fieldsQry,
+        boolean loc,
+        IndexingQueryFilter filters,
+        GridQueryCancel cancel
+    ) throws IgniteCheckedException {
+        GridCacheContext cctx = plan.cacheContext();
+
+        if (cctx != null && cctx.mvccEnabled()) {
+            assert cctx.transactional();
+
+            DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
+
+            GridNearTxLocal tx = tx(cctx.kernalContext());
+
+            boolean implicit = (tx == null);
+
+            boolean commit = implicit && (!(fieldsQry instanceof SqlFieldsQueryEx) ||
+                ((SqlFieldsQueryEx)fieldsQry).isAutoCommit());
+
+            if (implicit)
+                tx = txStart(cctx, fieldsQry.getTimeout());
+
+            requestSnapshot(cctx, tx);
+
+            try (GridNearTxLocal toCommit = commit ? tx : null) {
+                long timeout = implicit
+                    ? tx.remainingTime()
+                    : operationTimeout(fieldsQry.getTimeout(), tx);
+
+                if (cctx.isReplicated() || distributedPlan == null || ((plan.mode() == UpdateMode.INSERT
+                    || plan.mode() == UpdateMode.MERGE) && !plan.isLocalSubquery())) {
+
+                    boolean sequential = true;
+
+                    UpdateSourceIterator<?> it;
+
+                    if (plan.fastResult()) {
+                        IgniteBiTuple row = plan.getFastRow(fieldsQry.getArgs());
+
+                        EnlistOperation op = UpdatePlan.enlistOperation(plan.mode());
+
+                        it = new DmlUpdateSingleEntryIterator<>(op, op.isDeleteOrLock() ? row.getKey() : row);
+                    }
+                    else if (plan.hasRows())
+                        it = new DmlUpdateResultsIterator(UpdatePlan.enlistOperation(plan.mode()), plan, plan.createRows(fieldsQry.getArgs()));
+                    else {
+                        // TODO IGNITE-8865 if there is no ORDER BY statement it's no use to retain entries order on locking (sequential = false).
+                        SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
+                            .setArgs(fieldsQry.getArgs())
+                            .setDistributedJoins(fieldsQry.isDistributedJoins())
+                            .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
+                            .setLocal(fieldsQry.isLocal())
+                            .setPageSize(fieldsQry.getPageSize())
+                            .setTimeout((int)timeout, TimeUnit.MILLISECONDS)
+                            .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
+
+                        FieldsQueryCursor<List<?>> cur = querySqlFields(schemaName, newFieldsQry, null,
+                            true, true, MvccUtils.mvccTracker(cctx, tx), cancel, false).get(0);
+
+                        it = plan.iteratorForTransaction(connMgr, cur);
+                    }
+
+                    IgniteInternalFuture<Long> fut = tx.updateAsync(cctx, it,
+                        fieldsQry.getPageSize(), timeout, sequential);
+
+                    UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
+
+                    if (commit)
+                        toCommit.commit();
+
+                    return res;
+                }
+
+                int[] ids = U.toIntArray(distributedPlan.getCacheIds());
+
+                int flags = 0;
+
+                if (fieldsQry.isEnforceJoinOrder())
+                    flags |= GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
+
+                if (distributedPlan.isReplicatedOnly())
+                    flags |= GridH2QueryRequest.FLAG_REPLICATED;
+
+                flags = GridH2QueryRequest.setDataPageScanEnabled(flags,
+                    fieldsQry.isDataPageScanEnabled());
+
+                int[] parts = fieldsQry.getPartitions();
+
+                IgniteInternalFuture<Long> fut = tx.updateAsync(
+                    cctx,
+                    ids,
+                    parts,
+                    schemaName,
+                    fieldsQry.getSql(),
+                    fieldsQry.getArgs(),
+                    flags,
+                    fieldsQry.getPageSize(),
+                    timeout);
+
+                UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
+
+                if (commit)
+                    toCommit.commit();
+
+                return res;
+            }
+            catch (ClusterTopologyServerNotFoundException e) {
+                throw new CacheServerNotFoundException(e.getMessage(), e);
+            }
+            catch (IgniteCheckedException e) {
+                IgniteSQLException sqlEx = X.cause(e, IgniteSQLException.class);
+
+                if(sqlEx != null)
+                    throw sqlEx;
+
+                Exception ex = IgniteUtils.convertExceptionNoWrap(e);
+
+                if (ex instanceof IgniteException)
+                    throw (IgniteException)ex;
+
+                U.error(log, "Error during update [localNodeId=" + cctx.localNodeId() + "]", ex);
+
+                throw new IgniteSQLException("Failed to run update. " + ex.getMessage(), ex);
+            }
+            finally {
+                if (commit)
+                    cctx.tm().resetContext();
+            }
+        }
+
+        UpdateResult fastUpdateRes = plan.processFast(fieldsQry.getArgs());
+
+        if (fastUpdateRes != null)
+            return fastUpdateRes;
+
+        if (plan.distributedPlan() != null) {
+            DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
+
+            assert distributedPlan != null;
+
+            if (cancel == null)
+                cancel = new GridQueryCancel();
+
+            UpdateResult result = runDistributedUpdate(schemaName, fieldsQry, distributedPlan.getCacheIds(),
+                distributedPlan.isReplicatedOnly(), cancel);
+
+            // null is returned in case not all nodes support distributed DML.
+            if (result != null)
+                return result;
+        }
+
+        Iterable<List<?>> cur;
+
+        // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
+        // sub-query and not some dummy stuff like "select 1, 2, 3;"
+        if (!loc && !plan.isLocalSubquery()) {
+            assert !F.isEmpty(plan.selectQuery());
+
+            SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
+                .setArgs(fieldsQry.getArgs())
+                .setDistributedJoins(fieldsQry.isDistributedJoins())
+                .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
+                .setLocal(fieldsQry.isLocal())
+                .setPageSize(fieldsQry.getPageSize())
+                .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS)
+                .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
+
+            cur = querySqlFields(schemaName, newFieldsQry, null, true, true, null, cancel, false).get(0);
+        }
+        else if (plan.hasRows())
+            cur = plan.createRows(fieldsQry.getArgs());
+        else {
+            final GridQueryFieldsResult res = executeQueryLocal0(schemaName, plan.selectQuery(),
+                F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), false, fieldsQry.getTimeout(),
+                cancel, null, null);
+
+            cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    try {
+                        return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), true);
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+            }, cancel);
+        }
+
+        int pageSize = loc ? 0 : fieldsQry.getPageSize();
+
+        return DmlUtils.processSelectResult(plan, cur, pageSize);
+    }
+
+    /**
+     * Generate SELECT statements to retrieve data for modifications from and find fast UPDATE or DELETE args,
+     * if available.
+     *
+     * @param schema Schema.
+     * @param conn Connection.
+     * @param p Prepared statement.
+     * @param fieldsQry Original fields query.
+     * @param loc Local query flag.
+     * @return Update plan.
+     */
+    @SuppressWarnings("IfMayBeConditional")
+    private UpdatePlan updatePlan(
+        String schema,
+        Connection conn,
+        Prepared p,
+        SqlFieldsQuery fieldsQry,
+        boolean loc
+    ) throws IgniteCheckedException {
+        if (F.eq(QueryUtils.SCHEMA_SYS, schema))
+            throw new IgniteSQLException("DML statements are not supported on " + schema + " schema",
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+        H2CachedStatementKey planKey = new H2CachedStatementKey(schema, p.getSQL(), fieldsQry, loc);
+
+        UpdatePlan res = updatePlanCache.get(planKey);
+
+        if (res != null)
+            return res;
+
+        res = UpdatePlanBuilder.planForStatement(p, loc, this, conn, fieldsQry, updateInTxAllowed);
+
+        // Don't cache re-runs
+        UpdatePlan oldRes = updatePlanCache.putIfAbsent(planKey, res);
+
+        return oldRes != null ? oldRes : res;
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
index 32d84e1..cac2d06 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
@@ -17,9 +17,17 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
+import java.sql.SQLException;
+import java.util.Arrays;
+
+import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
+
 /**
  * Update result - modifications count and keys to re-run query with, if needed.
  */
@@ -60,4 +68,18 @@ public final class UpdateResult {
     public Object[] errorKeys() {
         return errKeys;
     }
+
+    /**
+     * Check update result for erroneous keys and throws concurrent update exception if necessary.
+     */
+    public void throwIfError() {
+        if (!F.isEmpty(errKeys)) {
+            String msg = "Failed to update some keys because they had been modified concurrently " +
+                "[keys=" + Arrays.toString(errKeys) + ']';
+
+            SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+
+            throw new IgniteSQLException(conEx);
+        }
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
index f55abfa..84dfea2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
@@ -124,6 +124,19 @@ public class DdlStatementsProcessor {
     }
 
     /**
+     * @param cmd Command.
+     * @return {@code True} if this is supported DDL command.
+     */
+    public static boolean isDdlCommand(SqlCommand cmd) {
+        return cmd instanceof SqlCreateIndexCommand
+            || cmd instanceof SqlDropIndexCommand
+            || cmd instanceof SqlAlterTableCommand
+            || cmd instanceof SqlCreateUserCommand
+            || cmd instanceof SqlAlterUserCommand
+            || cmd instanceof SqlDropUserCommand;
+    }
+
+    /**
      * Run DDL statement.
      *
      * @param sql Original SQL.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
index 7e7f224..c90d149 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
@@ -138,10 +138,9 @@ public final class DmlAstUtils {
      * Generate SQL SELECT based on DELETE's WHERE, LIMIT, etc.
      *
      * @param del Delete statement.
-     * @param keysParamIdx Index for .
      * @return SELECT statement.
      */
-    public static GridSqlSelect selectForDelete(GridSqlDelete del, @Nullable Integer keysParamIdx) {
+    public static GridSqlSelect selectForDelete(GridSqlDelete del) {
         GridSqlSelect mapQry = new GridSqlSelect();
 
         mapQry.from(del.from());
@@ -172,8 +171,6 @@ public final class DmlAstUtils {
         mapQry.addColumn(valCol, true);
 
         GridSqlElement where = del.where();
-        if (keysParamIdx != null)
-            where = injectKeysFilterParam(where, keyCol, keysParamIdx);
 
         mapQry.where(where);
         mapQry.limit(del.limit());
@@ -231,7 +228,7 @@ public final class DmlAstUtils {
      */
     @SuppressWarnings("RedundantCast")
     private static IgnitePair<GridSqlElement> findKeyValueEqualityCondition(GridSqlElement where) {
-        if (where == null || !(where instanceof GridSqlOperation))
+        if (!(where instanceof GridSqlOperation))
             return null;
 
         GridSqlOperation whereOp = (GridSqlOperation) where;
@@ -319,10 +316,9 @@ public final class DmlAstUtils {
      * Generate SQL SELECT based on UPDATE's WHERE, LIMIT, etc.
      *
      * @param update Update statement.
-     * @param keysParamIdx Index of new param for the array of keys.
      * @return SELECT statement.
      */
-    public static GridSqlSelect selectForUpdate(GridSqlUpdate update, @Nullable Integer keysParamIdx) {
+    public static GridSqlSelect selectForUpdate(GridSqlUpdate update) {
         GridSqlSelect mapQry = new GridSqlSelect();
 
         mapQry.from(update.target());
@@ -362,8 +358,6 @@ public final class DmlAstUtils {
         }
 
         GridSqlElement where = update.where();
-        if (keysParamIdx != null)
-            where = injectKeysFilterParam(where, keyCol, keysParamIdx);
 
         mapQry.where(where);
         mapQry.limit(update.limit());
@@ -422,39 +416,6 @@ public final class DmlAstUtils {
     }
 
     /**
-     * Append additional condition to WHERE for it to select only specific keys.
-     *
-     * @param where Initial condition.
-     * @param keyCol Column to base the new condition on.
-     * @return New condition.
-     */
-    private static GridSqlElement injectKeysFilterParam(GridSqlElement where, GridSqlColumn keyCol, int paramIdx) {
-        // Yes, we need a subquery for "WHERE _key IN ?" to work with param being an array without dirty query rewriting.
-        GridSqlSelect sel = new GridSqlSelect();
-
-        GridSqlFunction from = new GridSqlFunction(GridSqlFunctionType.TABLE);
-
-        sel.from(from);
-
-        GridSqlColumn col = new GridSqlColumn(null, from, null, "TABLE", "_IGNITE_ERR_KEYS");
-
-        sel.addColumn(col, true);
-
-        GridSqlAlias alias = new GridSqlAlias("_IGNITE_ERR_KEYS", new GridSqlParameter(paramIdx));
-
-        alias.resultType(keyCol.resultType());
-
-        from.addChild(alias);
-
-        GridSqlElement e = new GridSqlOperation(GridSqlOperationType.IN, keyCol, new GridSqlSubquery(sel));
-
-        if (where == null)
-            return e;
-        else
-            return new GridSqlOperation(GridSqlOperationType.AND, where, e);
-    }
-
-    /**
      * @param qry Select.
      * @param params Parameters.
      * @param target Extracted parameters.
@@ -550,7 +511,7 @@ public final class DmlAstUtils {
      * @param c Closure each found table and subquery will be passed to. If returns {@code true} the we need to stop.
      * @return {@code true} If we have found.
      */
-    @SuppressWarnings("RedundantCast")
+    @SuppressWarnings({"RedundantCast", "RedundantIfStatement"})
     private static boolean findTablesInFrom(GridSqlElement from, IgnitePredicate<GridSqlElement> c) {
         if (from == null)
             return false;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBulkLoadDataConverter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBulkLoadDataConverter.java
new file mode 100644
index 0000000..e6084d4
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBulkLoadDataConverter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.dml;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.lang.IgniteClosureX;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import java.util.List;
+
+/**
+ * Converts a row of values to actual key+value using {@link UpdatePlan#processRow(List)}.
+ */
+public class DmlBulkLoadDataConverter extends IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> {
+    /** Update plan to convert incoming rows. */
+    private final UpdatePlan plan;
+
+    /**
+     * Creates the converter with the given update plan.
+     *
+     * @param plan The update plan to use.
+     */
+    public DmlBulkLoadDataConverter(UpdatePlan plan) {
+        this.plan = plan;
+    }
+
+    /**
+     * Converts the record to a key+value.
+     *
+     * @param record The record to convert.
+     * @return The key+value.
+     * @throws IgniteCheckedException If conversion failed for some reason.
+     */
+    @Override public IgniteBiTuple<?, ?> applyx(List<?> record) throws IgniteCheckedException {
+        return plan.processRow(record);
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUpdateResultsIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUpdateResultsIterator.java
new file mode 100644
index 0000000..b850c8a
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUpdateResultsIterator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.dml;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.query.EnlistOperation;
+import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * DML update results iterator.
+ */
+public class DmlUpdateResultsIterator implements UpdateSourceIterator<Object> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final EnlistOperation op;
+
+    /** */
+    private final UpdatePlan plan;
+
+    /** */
+    private Iterator<List<?>> it;
+
+    /**
+     * Constructor.
+     *
+     * @param op Operation.
+     * @param plan Plan.
+     * @param rows Rows.
+     */
+    public DmlUpdateResultsIterator(EnlistOperation op, UpdatePlan plan, Iterable<List<?>> rows) {
+        this.op = op;
+        this.plan = plan;
+
+        it = rows.iterator();
+    }
+
+    /** {@inheritDoc} */
+    @Override public EnlistOperation operation() {
+        return op;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNextX() {
+        return it.hasNext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object nextX() throws IgniteCheckedException {
+        return plan.processRowForTx(it.next());
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUpdateSingleEntryIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUpdateSingleEntryIterator.java
new file mode 100644
index 0000000..0266806
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUpdateSingleEntryIterator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.dml;
+
+import org.apache.ignite.internal.processors.query.EnlistOperation;
+import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
+
+/** */
+public class DmlUpdateSingleEntryIterator<T> implements UpdateSourceIterator<T> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final EnlistOperation op;
+
+    /** */
+    private final T entry;
+
+    /** */
+    private boolean first = true;
+
+    /**
+     * Constructor.
+     *
+     * @param op Operation.
+     * @param entry Entry.
+     */
+    public DmlUpdateSingleEntryIterator(EnlistOperation op, T entry) {
+        this.op = op;
+        this.entry = entry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public EnlistOperation operation() {
+        return op;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNextX() {
+        return first;
+    }
+
+    /** {@inheritDoc} */
+    @Override public T nextX() {
+        T res = first ? entry : null;
+
+        first = false;
+
+        return res;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
index 7be823d..fab9869 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
@@ -18,17 +18,35 @@
 package org.apache.ignite.internal.processors.query.h2.dml;
 
 import java.lang.reflect.Array;
+import java.sql.BatchUpdateException;
+import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessor;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.UpdateResult;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.transactions.TransactionDuplicateKeyException;
 import org.h2.util.DateTimeUtils;
 import org.h2.util.LocalDateTimeUtils;
 import org.h2.value.Value;
@@ -36,6 +54,9 @@ import org.h2.value.ValueDate;
 import org.h2.value.ValueTime;
 import org.h2.value.ValueTimestamp;
 
+import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY;
+import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
+
 /**
  * DML utility methods.
  */
@@ -48,11 +69,9 @@ public class DmlUtils {
      * @param expCls Expected value class.
      * @param type Expected column type to convert to.
      * @return Converted object.
-     * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings({"ConstantConditions", "SuspiciousSystemArraycopy"})
-    public static Object convert(Object val, GridH2RowDescriptor desc, Class<?> expCls, int type)
-        throws IgniteCheckedException {
+    public static Object convert(Object val, GridH2RowDescriptor desc, Class<?> expCls, int type) {
         if (val == null)
             return null;
 
@@ -128,6 +147,401 @@ public class DmlUtils {
     }
 
     /**
+     * @param plan Update plan.
+     * @param cursor Cursor over select results.
+     * @param pageSize Page size.
+     * @return Pair [number of successfully processed items; keys that have failed to be processed]
+     * @throws IgniteCheckedException if failed.
+     */
+    public static UpdateResult processSelectResult(UpdatePlan plan, Iterable<List<?>> cursor,
+        int pageSize) throws IgniteCheckedException {
+        switch (plan.mode()) {
+            case MERGE:
+                return new UpdateResult(doMerge(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
+
+            case INSERT:
+                return new UpdateResult(dmlDoInsert(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
+
+            case UPDATE:
+                return doUpdate(plan, cursor, pageSize);
+
+            case DELETE:
+                return doDelete(plan.cacheContext(), cursor, pageSize);
+
+            default:
+                throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode() + ']',
+                    IgniteQueryErrorCode.UNEXPECTED_OPERATION);
+        }
+    }
+
+    /**
+     * Execute INSERT statement plan.
+     * @param cursor Cursor to take inserted data from.
+     * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+     * @return Number of items affected.
+     * @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
+     */
+    @SuppressWarnings({"unchecked"})
+    private static long dmlDoInsert(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
+        GridCacheContext cctx = plan.cacheContext();
+
+        // If we have just one item to put, just do so
+        if (plan.rowCount() == 1) {
+            IgniteBiTuple t = plan.processRow(cursor.iterator().next());
+
+            if (cctx.cache().putIfAbsent(t.getKey(), t.getValue()))
+                return 1;
+            else
+                throw new TransactionDuplicateKeyException("Duplicate key during INSERT [key=" + t.getKey() + ']');
+        }
+        else {
+            // Keys that failed to INSERT due to duplication.
+            DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
+
+            for (List<?> row : cursor) {
+                final IgniteBiTuple keyValPair = plan.processRow(row);
+
+                sender.add(keyValPair.getKey(), new DmlStatementsProcessor.InsertEntryProcessor(keyValPair.getValue()),  0);
+            }
+
+            sender.flush();
+
+            SQLException resEx = sender.error();
+
+            if (!F.isEmpty(sender.failedKeys())) {
+                String msg = "Failed to INSERT some keys because they are already in cache " +
+                    "[keys=" + sender.failedKeys() + ']';
+
+                SQLException dupEx = new SQLException(msg, SqlStateCode.CONSTRAINT_VIOLATION);
+
+                if (resEx == null)
+                    resEx = dupEx;
+                else
+                    resEx.setNextException(dupEx);
+            }
+
+            if (resEx != null)
+                throw new IgniteSQLException(resEx);
+
+            return sender.updateCount();
+        }
+    }
+
+    /**
+     * Perform UPDATE operation on top of results of SELECT.
+     * @param cursor SELECT results.
+     * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+     * @return Pair [cursor corresponding to results of UPDATE (contains number of items affected); keys whose values
+     *     had been modified concurrently (arguments for a re-run)].
+     */
+    private static UpdateResult doUpdate(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize)
+        throws IgniteCheckedException {
+        GridCacheContext cctx = plan.cacheContext();
+
+        DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
+
+        for (List<?> row : cursor) {
+            T3<Object, Object, Object> row0 = plan.processRowForUpdate(row);
+
+            Object key = row0.get1();
+            Object oldVal = row0.get2();
+            Object newVal = row0.get3();
+
+            sender.add(key, new DmlStatementsProcessor.ModifyingEntryProcessor(
+                oldVal,
+                new DmlStatementsProcessor.EntryValueUpdater(newVal)),
+                0
+            );
+        }
+
+        sender.flush();
+
+        SQLException resEx = sender.error();
+
+        if (resEx != null) {
+            if (!F.isEmpty(sender.failedKeys())) {
+                // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
+                // had been modified concurrently right away.
+                String msg = "Failed to UPDATE some keys because they had been modified concurrently " +
+                    "[keys=" + sender.failedKeys() + ']';
+
+                SQLException dupEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+
+                dupEx.setNextException(resEx);
+
+                resEx = dupEx;
+            }
+
+            throw new IgniteSQLException(resEx);
+        }
+
+        return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
+    }
+
+    /**
+     * Execute MERGE statement plan.
+     * @param cursor Cursor to take inserted data from.
+     * @param pageSize Batch size to stream data from {@code cursor}, anything <= 0 for single page operations.
+     * @return Number of items affected.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("unchecked")
+    private static long doMerge(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
+        GridCacheContext cctx = plan.cacheContext();
+
+        // If we have just one item to put, just do so
+        if (plan.rowCount() == 1) {
+            IgniteBiTuple t = plan.processRow(cursor.iterator().next());
+
+            cctx.cache().put(t.getKey(), t.getValue());
+
+            return 1;
+        }
+        else {
+            int resCnt = 0;
+
+            Map<Object, Object> rows = new LinkedHashMap<>();
+
+            for (Iterator<List<?>> it = cursor.iterator(); it.hasNext();) {
+                List<?> row = it.next();
+
+                IgniteBiTuple t = plan.processRow(row);
+
+                rows.put(t.getKey(), t.getValue());
+
+                if ((pageSize > 0 && rows.size() == pageSize) || !it.hasNext()) {
+                    cctx.cache().putAll(rows);
+
+                    resCnt += rows.size();
+
+                    if (it.hasNext())
+                        rows.clear();
+                }
+            }
+
+            return resCnt;
+        }
+    }
+
+    /**
+     * Perform DELETE operation on top of results of SELECT.
+     *
+     * @param cctx Cache context.
+     * @param cursor SELECT results.
+     * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+     * @return Results of DELETE (number of items affected AND keys that failed to be updated).
+     */
+    private static UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize)
+        throws IgniteCheckedException {
+        DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
+
+        for (List<?> row : cursor) {
+            if (row.size() != 2)
+                continue;
+
+            sender.add(
+                row.get(0),
+                new DmlStatementsProcessor.ModifyingEntryProcessor(row.get(1), DmlStatementsProcessor.RMV),
+                0
+            );
+        }
+
+        sender.flush();
+
+        SQLException resEx = sender.error();
+
+        if (resEx != null) {
+            if (!F.isEmpty(sender.failedKeys())) {
+                // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
+                // had been modified concurrently right away.
+                String msg = "Failed to DELETE some keys because they had been modified concurrently " +
+                    "[keys=" + sender.failedKeys() + ']';
+
+                SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+
+                conEx.setNextException(resEx);
+
+                resEx = conEx;
+            }
+
+            throw new IgniteSQLException(resEx);
+        }
+
+        return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
+    }
+
+    /**
+     * Performs the planned update.
+     * @param plan Update plan.
+     * @param rows Rows to update.
+     * @param pageSize Page size.
+     * @return {@link List} of update results.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static List<UpdateResult> processSelectResultBatched(UpdatePlan plan, List<List<List<?>>> rows, int pageSize)
+        throws IgniteCheckedException {
+        switch (plan.mode()) {
+            case MERGE:
+                // TODO
+                throw new IgniteCheckedException("Unsupported, fix");
+
+            case INSERT:
+                return doInsertBatched(plan, rows, pageSize);
+
+            default:
+                throw new IgniteSQLException("Unexpected batched DML operation [mode=" + plan.mode() + ']',
+                    IgniteQueryErrorCode.UNEXPECTED_OPERATION);
+        }
+    }
+
+    /**
+     * Execute INSERT statement plan.
+     *
+     * @param plan Plan to execute.
+     * @param cursor Cursor to take inserted data from. I.e. list of batch arguments for each query.
+     * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+     * @return Number of items affected.
+     * @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
+     */
+    private static List<UpdateResult> doInsertBatched(UpdatePlan plan, List<List<List<?>>> cursor, int pageSize)
+        throws IgniteCheckedException {
+        GridCacheContext cctx = plan.cacheContext();
+
+        DmlBatchSender snd = new DmlBatchSender(cctx, pageSize, cursor.size());
+
+        int rowNum = 0;
+
+        SQLException resEx = null;
+
+        for (List<List<?>> qryRow : cursor) {
+            for (List<?> row : qryRow) {
+                try {
+                    final IgniteBiTuple keyValPair = plan.processRow(row);
+
+                    snd.add(keyValPair.getKey(), new DmlStatementsProcessor.InsertEntryProcessor(keyValPair.getValue()), rowNum);
+                }
+                catch (Exception e) {
+                    String sqlState;
+
+                    int code;
+
+                    if (e instanceof IgniteSQLException) {
+                        sqlState = ((IgniteSQLException)e).sqlState();
+
+                        code = ((IgniteSQLException)e).statusCode();
+                    } else {
+                        sqlState = SqlStateCode.INTERNAL_ERROR;
+
+                        code = IgniteQueryErrorCode.UNKNOWN;
+                    }
+
+                    resEx = chainException(resEx, new SQLException(e.getMessage(), sqlState, code, e));
+
+                    snd.setFailed(rowNum);
+                }
+            }
+
+            rowNum++;
+        }
+
+        try {
+            snd.flush();
+        }
+        catch (Exception e) {
+            resEx = chainException(resEx, new SQLException(e.getMessage(), SqlStateCode.INTERNAL_ERROR,
+                IgniteQueryErrorCode.UNKNOWN, e));
+        }
+
+        resEx = chainException(resEx, snd.error());
+
+        if (!F.isEmpty(snd.failedKeys())) {
+            SQLException e = new SQLException("Failed to INSERT some keys because they are already in cache [keys=" +
+                snd.failedKeys() + ']', SqlStateCode.CONSTRAINT_VIOLATION, DUPLICATE_KEY);
+
+            resEx = chainException(resEx, e);
+        }
+
+        if (resEx != null) {
+            BatchUpdateException e = new BatchUpdateException(resEx.getMessage(), resEx.getSQLState(),
+                resEx.getErrorCode(), snd.perRowCounterAsArray(), resEx);
+
+            throw new IgniteCheckedException(e);
+        }
+
+        int[] cntPerRow = snd.perRowCounterAsArray();
+
+        List<UpdateResult> res = new ArrayList<>(cntPerRow.length);
+
+        for (int i = 0; i < cntPerRow.length; i++ ) {
+            int cnt = cntPerRow[i];
+
+            res.add(new UpdateResult(cnt , X.EMPTY_OBJECT_ARRAY));
+        }
+
+        return res;
+    }
+
+    /**
+     * Adds exception to the chain.
+     *
+     * @param main Exception to add another exception to.
+     * @param add Exception which should be added to chain.
+     * @return Chained exception.
+     */
+    public static SQLException chainException(SQLException main, SQLException add) {
+        if (main == null) {
+            if (add != null) {
+                main = add;
+
+                return main;
+            }
+            else
+                return null;
+        }
+        else {
+            main.setNextException(add);
+
+            return main;
+        }
+    }
+
+    /**
+     * Makes current operation context as keepBinary.
+     *
+     * @param cctx Cache context.
+     * @return Old operation context.
+     */
+    public static CacheOperationContext setKeepBinaryContext(GridCacheContext<?, ?> cctx) {
+        CacheOperationContext opCtx = cctx.operationContextPerCall();
+
+        // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+        if (cctx.binaryMarshaller()) {
+            CacheOperationContext newOpCtx = null;
+
+            if (opCtx == null)
+                // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
+                newOpCtx = new CacheOperationContext(false, null, true, null, false, null, false, true);
+            else if (!opCtx.isKeepBinary())
+                newOpCtx = opCtx.keepBinary();
+
+            if (newOpCtx != null)
+                cctx.operationContextPerCall(newOpCtx);
+        }
+
+        return opCtx;
+    }
+
+    /**
+     * Restore previous binary context.
+     *
+     * @param cctx Cache context.
+     * @param oldOpCtx Old operation context.
+     */
+    public static void restoreKeepBinaryContext(GridCacheContext<?, ?> cctx, CacheOperationContext oldOpCtx) {
+        cctx.operationContextPerCall(oldOpCtx);
+    }
+
+    /**
      * Private constructor.
      */
     private DmlUtils() {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 068e3b7..d87e7da 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -100,9 +100,14 @@ public final class UpdatePlanBuilder {
      * @return Update plan.
      */
     @SuppressWarnings("ConstantConditions")
-    public static UpdatePlan planForStatement(Prepared prepared, boolean loc, IgniteH2Indexing idx,
-        @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQry, @Nullable Integer errKeysPos,
-        boolean dmlInsideTxAllowed)
+    public static UpdatePlan planForStatement(
+        Prepared prepared,
+        boolean loc,
+        IgniteH2Indexing idx,
+        @Nullable Connection conn,
+        @Nullable SqlFieldsQuery fieldsQry,
+        boolean dmlInsideTxAllowed
+    )
         throws IgniteCheckedException {
         assert !prepared.isQuery();
 
@@ -140,7 +145,7 @@ public final class UpdatePlanBuilder {
         if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert)
             return planForInsert(stmt, loc, idx, mvccEnabled, conn, fieldsQry);
         else if (stmt instanceof GridSqlUpdate || stmt instanceof GridSqlDelete)
-            return planForUpdate(stmt, loc, idx, mvccEnabled, conn, fieldsQry, errKeysPos);
+            return planForUpdate(stmt, loc, idx, mvccEnabled, conn, fieldsQry);
         else
             throw new IgniteSQLException("Unsupported operation: " + prepared.getSQL(),
                 IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
@@ -402,13 +407,17 @@ public final class UpdatePlanBuilder {
      * @param mvccEnabled Mvcc flag.
      * @param conn Connection.
      * @param fieldsQuery Original query.
-     * @param errKeysPos index to inject param for re-run keys at. Null if it's not a re-run plan.
      * @return Update plan.
      * @throws IgniteCheckedException if failed.
      */
-    private static UpdatePlan planForUpdate(GridSqlStatement stmt, boolean loc, IgniteH2Indexing idx,
-        boolean mvccEnabled, @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery,
-        @Nullable Integer errKeysPos) throws IgniteCheckedException {
+    private static UpdatePlan planForUpdate(
+        GridSqlStatement stmt,
+        boolean loc,
+        IgniteH2Indexing idx,
+        boolean mvccEnabled,
+        @Nullable Connection conn,
+        @Nullable SqlFieldsQuery fieldsQuery
+    ) throws IgniteCheckedException {
         GridSqlElement target;
 
         FastUpdate fastUpdate;
@@ -493,7 +502,7 @@ public final class UpdatePlanBuilder {
                 KeyValueSupplier valSupplier = createSupplier(desc.context(), desc.type(), newValColIdx, hasProps,
                     false, true);
 
-                sel = DmlAstUtils.selectForUpdate((GridSqlUpdate) stmt, errKeysPos);
+                sel = DmlAstUtils.selectForUpdate((GridSqlUpdate)stmt);
 
                 String selectSql = sel.getSQL();
 
@@ -519,7 +528,7 @@ public final class UpdatePlanBuilder {
                 );
             }
             else {
-                sel = DmlAstUtils.selectForDelete((GridSqlDelete) stmt, errKeysPos);
+                sel = DmlAstUtils.selectForDelete((GridSqlDelete)stmt);
 
                 String selectSql = sel.getSQL();
 
@@ -641,7 +650,7 @@ public final class UpdatePlanBuilder {
      * @return Closure returning key or value.
      * @throws IgniteCheckedException If failed.
      */
-    @SuppressWarnings({"ConstantConditions", "unchecked"})
+    @SuppressWarnings({"ConstantConditions", "unchecked", "IfMayBeConditional"})
     private static KeyValueSupplier createSupplier(final GridCacheContext<?, ?> cctx, GridQueryTypeDescriptor desc,
         final int colIdx, boolean hasProps, final boolean key, boolean forUpdate) throws IgniteCheckedException {
         final String typeName = key ? desc.keyTypeName() : desc.valueTypeName();
@@ -666,7 +675,7 @@ public final class UpdatePlanBuilder {
                 // If we have key or value explicitly present in query, create new builder upon them...
                 return new KeyValueSupplier() {
                     /** {@inheritDoc} */
-                    @Override public Object apply(List<?> arg) throws IgniteCheckedException {
+                    @Override public Object apply(List<?> arg) {
                         Object obj = arg.get(colIdx);
 
                         if (obj == null)
@@ -686,7 +695,7 @@ public final class UpdatePlanBuilder {
                 // ...and if we don't, just create a new builder.
                 return new KeyValueSupplier() {
                     /** {@inheritDoc} */
-                    @Override public Object apply(List<?> arg) throws IgniteCheckedException {
+                    @Override public Object apply(List<?> arg) {
                         BinaryObjectBuilder builder = cctx.grid().binary().builder(typeName);
 
                         cctx.prepareAffinityField(builder);
@@ -776,7 +785,7 @@ public final class UpdatePlanBuilder {
      * @param statement Statement.
      */
     private static void verifyUpdateColumns(GridSqlStatement statement) {
-        if (statement == null || !(statement instanceof GridSqlUpdate))
+        if (!(statement instanceof GridSqlUpdate))
             return;
 
         GridSqlUpdate update = (GridSqlUpdate) statement;
@@ -974,7 +983,7 @@ public final class UpdatePlanBuilder {
         }
 
         /** {@inheritDoc} */
-        @Override public Object apply(List<?> arg) throws IgniteCheckedException {
+        @Override public Object apply(List<?> arg) {
             return arg.get(colIdx);
         }
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index f66a289..5b1609d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -1688,6 +1688,16 @@ public class GridSqlQueryParser {
     }
 
     /**
+     * Check whether statement is DML statement.
+     *
+     * @param stmt Statement.
+     * @return {@code True} if this is DML.
+     */
+    public static boolean isDml(Prepared stmt) {
+        return stmt instanceof Merge || stmt instanceof Insert || stmt instanceof Update || stmt instanceof Delete;
+    }
+
+    /**
      * @param stmt Prepared.
      * @return Target table.
      */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 9b35f04..7a104b0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -913,7 +913,7 @@ public class GridMapQueryExecutor {
                 local = false;
             }
 
-            UpdateResult updRes = h2.mapDistributedUpdate(req.schemaName(), fldsQry, filter, cancel, local);
+            UpdateResult updRes = h2.executeUpdateOnDataNode(req.schemaName(), fldsQry, filter, cancel, local);
 
             GridCacheContext<?, ?> mainCctx =
                 !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingInMemSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingInMemSelfTest.java
deleted file mode 100644
index abf1f11..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingInMemSelfTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-/**
- * Tests for H2 indexing SPI.
- */
-public class GridH2IndexingInMemSelfTest extends GridIndexingSpiAbstractSelfTest {
-    // No-op.
-}
\ No newline at end of file
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingOffheapSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingOffheapSelfTest.java
deleted file mode 100644
index 4722d76..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingOffheapSelfTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-/**
- * Tests for H2 indexing SPI.
- */
-public class GridH2IndexingOffheapSelfTest extends GridIndexingSpiAbstractSelfTest {
-    /** {@inheritDoc} */
-    @Override protected boolean offheap() {
-        return true;
-    }
-}
\ No newline at end of file
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
deleted file mode 100644
index 7367245..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.QueryEntity;
-import org.apache.ignite.cache.QueryIndex;
-import org.apache.ignite.cache.QueryIndexType;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridStringLogger;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.junit.Test;
-
-/**
- * Tests for all SQL based indexing SPI implementations.
- */
-public abstract class GridIndexingSpiAbstractSelfTest extends AbstractIndexingCommonTest {
-    /** */
-    private static final LinkedHashMap<String, String> fieldsAA = new LinkedHashMap<>();
-
-    /** */
-    private static final LinkedHashMap<String, String> fieldsAB = new LinkedHashMap<>();
-
-    /** */
-    private IgniteEx ignite0;
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("deprecation")
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setMarshaller(new BinaryMarshaller());
-
-        return cfg;
-    }
-
-    /*
-     * Fields initialization.
-     */
-    static {
-        fieldsAA.put("id", Long.class.getName());
-        fieldsAA.put("name", String.class.getName());
-        fieldsAA.put("age", Integer.class.getName());
-
-        fieldsAB.putAll(fieldsAA);
-        fieldsAB.put("txt", String.class.getName());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        ignite0 = startGrid(0);
-    }
-
-    /**
-     */
-    private CacheConfiguration cacheACfg() {
-        CacheConfiguration<?,?> cfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
-
-        cfg.setName("A");
-
-        QueryEntity eA = new QueryEntity(Integer.class.getName(), "A");
-        eA.setFields(fieldsAA);
-
-        QueryEntity eB = new QueryEntity(Integer.class.getName(), "B");
-        eB.setFields(fieldsAB);
-
-        List<QueryEntity> list = new ArrayList<>(2);
-
-        list.add(eA);
-        list.add(eB);
-
-        QueryIndex idx = new QueryIndex("txt");
-        idx.setIndexType(QueryIndexType.FULLTEXT);
-        eB.setIndexes(Collections.singleton(idx));
-
-        cfg.setQueryEntities(list);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-    }
-
-    /**
-     * @return Indexing.
-     */
-    private IgniteH2Indexing getIndexing() {
-        return U.field(ignite0.context().query(), "idx");
-    }
-
-    /**
-     * @return {@code true} if OFF-HEAP mode should be tested.
-     */
-    protected boolean offheap() {
-        return false;
-    }
-
-    /**
-     * Test long queries write explain warnings into log.
-     *
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings({"unchecked", "deprecation"})
-    @Test
-    public void testLongQueries() throws Exception {
-        IgniteH2Indexing spi = getIndexing();
-
-        ignite0.createCache(cacheACfg());
-
-        long longQryExecTime = IgniteConfiguration.DFLT_LONG_QRY_WARN_TIMEOUT;
-
-        GridStringLogger log = new GridStringLogger(false, this.log);
-
-        IgniteLogger oldLog = GridTestUtils.getFieldValue(spi, "log");
-
-        try {
-            GridTestUtils.setFieldValue(spi, "log", log);
-
-            String sql = "select sum(x) FROM SYSTEM_RANGE(?, ?)";
-
-            long now = U.currentTimeMillis();
-            long time = now;
-
-            long range = 1000000L;
-
-            while (now - time <= longQryExecTime * 3 / 2) {
-                time = now;
-                range *= 3;
-
-                GridQueryFieldsResult res = spi.queryLocalSqlFields(spi.schema("A"), sql, Arrays.<Object>asList(1,
-                    range), null, false, false, 0, null, null);
-
-                assert res.iterator().hasNext();
-
-                now = U.currentTimeMillis();
-            }
-
-            String res = log.toString();
-
-            assertTrue(res.contains("/* PUBLIC.RANGE_INDEX */"));
-        }
-        finally {
-            GridTestUtils.setFieldValue(spi, "log", oldLog);
-        }
-    }
-}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index e337fcc..095518c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -203,8 +203,6 @@ import org.apache.ignite.internal.processors.query.SqlQueryHistoryFromClientSelf
 import org.apache.ignite.internal.processors.query.SqlQueryHistorySelfTest;
 import org.apache.ignite.internal.processors.query.SqlSchemaSelfTest;
 import org.apache.ignite.internal.processors.query.SqlSystemViewsSelfTest;
-import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest;
-import org.apache.ignite.internal.processors.query.h2.GridH2IndexingOffheapSelfTest;
 import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildSelfTest;
 import org.apache.ignite.internal.processors.query.h2.H2ResultSetIteratorNullifyOnEndSelfTest;
 import org.apache.ignite.internal.processors.query.h2.H2StatementCacheSelfTest;
@@ -292,11 +290,6 @@ import org.junit.runners.Suite;
     DynamicIndexServerNodeFIlterBasicSelfTest.class,
     DynamicIndexClientBasicSelfTest.class,
 
-    // H2 tests.
-
-    GridH2IndexingInMemSelfTest.class,
-    GridH2IndexingOffheapSelfTest.class,
-
     // Parsing
     GridQueryParsingTest.class,
     IgniteCacheSqlQueryErrorSelfTest.class,