You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/09 08:20:04 UTC

[ignite] branch master updated: IGNITE-11223: SQL: Make two-step plan immutable. Note that GridCacheSqlQuery is still mutable, what will be fixed separately. This closes #6070.

This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 208c53f  IGNITE-11223: SQL: Make two-step plan immutable. Note that GridCacheSqlQuery is still mutable, what will be fixed separately. This closes #6070.
208c53f is described below

commit 208c53f467797606cfd0b116bda3101cf3451d5b
Author: devozerov <pp...@gmail.com>
AuthorDate: Sat Feb 9 11:19:55 2019 +0300

    IGNITE-11223: SQL: Make two-step plan immutable. Note that GridCacheSqlQuery is still mutable, what will be fixed separately. This closes #6070.
---
 .../cache/query/GridCacheTwoStepQuery.java         | 183 ++++-----------
 .../internal/processors/query/h2/H2Utils.java      | 141 ++++++++++++
 .../processors/query/h2/IgniteH2Indexing.java      | 250 +++++----------------
 .../processors/query/h2/dml/UpdatePlanBuilder.java |  21 +-
 .../query/h2/sql/GridSqlQuerySplitter.java         |  59 ++---
 .../query/h2/twostep/GridReduceQueryExecutor.java  |  34 ++-
 .../query/h2/twostep/ReduceQueryRun.java           |   5 +-
 7 files changed, 317 insertions(+), 376 deletions(-)

diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index cc5dd03..aad9cdd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
@@ -30,48 +30,42 @@ import org.apache.ignite.internal.util.typedef.internal.S;
  */
 public class GridCacheTwoStepQuery {
     /** */
-    public static final int DFLT_PAGE_SIZE = 1000;
-
-    /** */
     @GridToStringInclude
-    private List<GridCacheSqlQuery> mapQrys = new ArrayList<>();
+    private final List<GridCacheSqlQuery> mapQrys;
 
     /** */
     @GridToStringInclude
-    private GridCacheSqlQuery rdc;
-
-    /** */
-    private int pageSize = DFLT_PAGE_SIZE;
+    private final GridCacheSqlQuery rdc;
 
     /** */
-    private boolean explain;
+    private final boolean explain;
 
     /** */
-    private String originalSql;
+    private final String originalSql;
 
     /** */
-    private Set<QueryTable> tbls;
+    private final Set<QueryTable> tbls;
 
     /** */
-    private boolean distributedJoins;
+    private final boolean distributedJoins;
 
     /** */
-    private boolean skipMergeTbl;
+    private final boolean skipMergeTbl;
 
     /** */
-    private List<Integer> cacheIds;
+    private final List<Integer> cacheIds;
 
     /** */
-    private boolean local;
+    private final boolean local;
 
     /** */
-    private PartitionResult derivedPartitions;
+    private final PartitionResult derivedPartitions;
 
     /** */
-    private boolean mvccEnabled;
+    private final boolean mvccEnabled;
 
     /** {@code FOR UPDATE} flag. */
-    private boolean forUpdate;
+    private final boolean forUpdate;
 
     /** Number of positional arguments in the sql. */
     private final int paramsCnt;
@@ -80,19 +74,34 @@ public class GridCacheTwoStepQuery {
      * @param originalSql Original query SQL.
      * @param tbls Tables in query.
      */
-    public GridCacheTwoStepQuery(String originalSql, int paramsCnt, Set<QueryTable> tbls) {
+    public GridCacheTwoStepQuery(
+        String originalSql,
+        int paramsCnt,
+        Set<QueryTable> tbls,
+        GridCacheSqlQuery rdc,
+        List<GridCacheSqlQuery> mapQrys,
+        boolean skipMergeTbl,
+        boolean explain,
+        boolean distributedJoins,
+        boolean forUpdate,
+        PartitionResult derivedPartitions,
+        List<Integer> cacheIds,
+        boolean mvccEnabled,
+        boolean local
+    ) {
         this.originalSql = originalSql;
         this.paramsCnt = paramsCnt;
         this.tbls = tbls;
-    }
-
-    /**
-     * Specify if distributed joins are enabled for this query.
-     *
-     * @param distributedJoins Distributed joins enabled.
-     */
-    public void distributedJoins(boolean distributedJoins) {
+        this.rdc = rdc;
+        this.mapQrys = F.isEmpty(mapQrys) ? Collections.emptyList() : mapQrys;
+        this.skipMergeTbl = skipMergeTbl;
+        this.explain = explain;
         this.distributedJoins = distributedJoins;
+        this.forUpdate = forUpdate;
+        this.derivedPartitions = derivedPartitions;
+        this.cacheIds = cacheIds;
+        this.mvccEnabled = mvccEnabled;
+        this.local = local;
     }
 
     /**
@@ -104,7 +113,6 @@ public class GridCacheTwoStepQuery {
         return distributedJoins;
     }
 
-
     /**
      * @return {@code True} if reduce query can skip merge table creation and get data directly from merge index.
      */
@@ -113,13 +121,6 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @param skipMergeTbl Skip merge table.
-     */
-    public void skipMergeTable(boolean skipMergeTbl) {
-        this.skipMergeTbl = skipMergeTbl;
-    }
-
-    /**
      * @return If this is explain query.
      */
     public boolean explain() {
@@ -127,34 +128,6 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @param explain If this is explain query.
-     */
-    public void explain(boolean explain) {
-        this.explain = explain;
-    }
-
-    /**
-     * @param pageSize Page size.
-     */
-    public void pageSize(int pageSize) {
-        this.pageSize = pageSize;
-    }
-
-    /**
-     * @return Page size.
-     */
-    public int pageSize() {
-        return pageSize;
-    }
-
-    /**
-     * @param qry SQL Query.
-     */
-    public void addMapQuery(GridCacheSqlQuery qry) {
-        mapQrys.add(qry);
-    }
-
-    /**
      * @return {@code true} If all the map queries contain only replicated tables.
      */
     public boolean isReplicatedOnly() {
@@ -176,13 +149,6 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @param rdc Reduce query.
-     */
-    public void reduceQuery(GridCacheSqlQuery rdc) {
-        this.rdc = rdc;
-    }
-
-    /**
      * @return Map queries.
      */
     public List<GridCacheSqlQuery> mapQueries() {
@@ -197,13 +163,6 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @param cacheIds Cache IDs.
-     */
-    public void cacheIds(List<Integer> cacheIds) {
-        this.cacheIds = cacheIds;
-    }
-
-    /**
      * @return Original query SQL.
      */
     public String originalSql() {
@@ -218,13 +177,6 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @param local Local query flag.
-     */
-    public void local(boolean local) {
-        this.local = local;
-    }
-
-    /**
      * @return Query derived partitions info.
      */
     public PartitionResult derivedPartitions() {
@@ -232,37 +184,6 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @param derivedPartitions Query derived partitions info.
-     */
-    public void derivedPartitions(PartitionResult derivedPartitions) {
-        this.derivedPartitions = derivedPartitions;
-    }
-
-    /**
-     * @return Copy.
-     */
-    public GridCacheTwoStepQuery copy() {
-        assert !explain;
-
-        GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, paramsCnt, tbls);
-
-        cp.cacheIds = cacheIds;
-        cp.rdc = rdc.copy();
-        cp.skipMergeTbl = skipMergeTbl;
-        cp.pageSize = pageSize;
-        cp.distributedJoins = distributedJoins;
-        cp.derivedPartitions = derivedPartitions;
-        cp.local = local;
-        cp.mvccEnabled = mvccEnabled;
-        cp.forUpdate = forUpdate;
-
-        for (int i = 0; i < mapQrys.size(); i++)
-            cp.mapQrys.add(mapQrys.get(i).copy());
-
-        return cp;
-    }
-
-    /**
      * @return Nuumber of tables.
      */
     public int tablesCount() {
@@ -284,13 +205,6 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @param mvccEnabled Mvcc flag.
-     */
-    public void mvccEnabled(boolean mvccEnabled) {
-        this.mvccEnabled = mvccEnabled;
-    }
-
-    /**
      * @return {@code FOR UPDATE} flag.
      */
     public boolean forUpdate() {
@@ -298,13 +212,6 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @param forUpdate {@code FOR UPDATE} flag.
-     */
-    public void forUpdate(boolean forUpdate) {
-        this.forUpdate = forUpdate;
-    }
-
-    /**
      * @return Number of parameters
      */
     public int parametersCount() {
@@ -315,18 +222,4 @@ public class GridCacheTwoStepQuery {
     @Override public String toString() {
         return S.toString(GridCacheTwoStepQuery.class, this);
     }
-
-    /**
-     * @return {@code True} is system views exist.
-     */
-    public boolean hasSystemViews() {
-        if (tablesCount() > 0) {
-            for (QueryTable tbl : tables()) {
-                if (QueryUtils.SCHEMA_SYS.equals(tbl.schema()))
-                    return true;
-            }
-        }
-
-        return false;
-    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index 684ecf7..50b8def 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -34,6 +34,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -43,10 +44,17 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
@@ -743,4 +751,137 @@ public class H2Utils {
             U.swap(arr, i, i + 1);
         }
     }
+
+    /**
+     * Collect cache identifiers from two-step query.
+     *
+     * @param mainCacheId Id of main cache.
+     * @return Result.
+     */
+    @Nullable public static List<Integer> collectCacheIds(
+        IgniteH2Indexing idx,
+        @Nullable Integer mainCacheId,
+        Collection<QueryTable> tbls
+    ) {
+        LinkedHashSet<Integer> caches0 = new LinkedHashSet<>();
+
+        if (mainCacheId != null)
+            caches0.add(mainCacheId);
+
+        if (!F.isEmpty(tbls)) {
+            for (QueryTable tblKey : tbls) {
+                GridH2Table tbl = idx.schemaManager().dataTable(tblKey.schema(), tblKey.table());
+
+                if (tbl != null) {
+                    checkAndStartNotStartedCache(idx.kernalContext(), tbl);
+
+                    int cacheId = tbl.cacheId();
+
+                    caches0.add(cacheId);
+                }
+            }
+        }
+
+        return caches0.isEmpty() ? Collections.emptyList() : new ArrayList<>(caches0);
+    }
+
+    /**
+     * Collect MVCC enabled flag.
+     *
+     * @param idx Indexing.
+     * @param cacheIds Cache IDs.
+     * @return {@code True} if indexing is enabled.
+     */
+    public static boolean collectMvccEnabled(IgniteH2Indexing idx, List<Integer> cacheIds) {
+        if (cacheIds.isEmpty())
+            return false;
+
+        GridCacheSharedContext sharedCtx = idx.kernalContext().cache().context();
+
+        GridCacheContext cctx0 = null;
+
+        boolean mvccEnabled = false;
+
+        for (int i = 0; i < cacheIds.size(); i++) {
+            Integer cacheId = cacheIds.get(i);
+
+            GridCacheContext cctx = sharedCtx.cacheContext(cacheId);
+
+            assert cctx != null;
+
+            if (i == 0) {
+                mvccEnabled = cctx.mvccEnabled();
+                cctx0 = cctx;
+            }
+            else if (cctx.mvccEnabled() != mvccEnabled)
+                MvccUtils.throwAtomicityModesMismatchException(cctx0, cctx);
+        }
+
+        return mvccEnabled;
+    }
+
+    /**
+     * Check if query is valid.
+     *
+     * @param idx Indexing.
+     * @param cacheIds Cache IDs.
+     * @param mvccEnabled MVCC enabled flag.
+     * @param forUpdate For update flag.
+     * @param tbls Tables.
+     */
+    public static void checkQuery(
+        IgniteH2Indexing idx,
+        List<Integer> cacheIds,
+        boolean mvccEnabled,
+        boolean forUpdate,
+        Collection<QueryTable> tbls
+    ) {
+        GridCacheSharedContext sharedCtx = idx.kernalContext().cache().context();
+
+        // Check query parallelism.
+        int expectedParallelism = 0;
+
+        for (int i = 0; i < cacheIds.size(); i++) {
+            Integer cacheId = cacheIds.get(i);
+
+            GridCacheContext cctx = sharedCtx.cacheContext(cacheId);
+
+            assert cctx != null;
+
+            if (!cctx.isPartitioned())
+                continue;
+
+            if (expectedParallelism == 0)
+                expectedParallelism = cctx.config().getQueryParallelism();
+            else if (cctx.config().getQueryParallelism() != expectedParallelism) {
+                throw new IllegalStateException("Using indexes with different parallelism levels in same query is " +
+                    "forbidden.");
+            }
+        }
+
+        // Check FOR UPDATE invariants: only one table, MVCC is there.
+        if (forUpdate) {
+            if (cacheIds.size() != 1)
+                throw new IgniteSQLException("SELECT FOR UPDATE is supported only for queries " +
+                    "that involve single transactional cache.");
+
+            if (!mvccEnabled)
+                throw new IgniteSQLException("SELECT FOR UPDATE query requires transactional cache " +
+                    "with MVCC enabled.", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+        }
+
+        // Check for joins between system views and normal tables.
+        if (!F.isEmpty(tbls)) {
+            for (QueryTable tbl : tbls) {
+                if (QueryUtils.SCHEMA_SYS.equals(tbl.schema())) {
+                    if (!F.isEmpty(cacheIds)) {
+                        throw new IgniteSQLException("Normal tables and system views cannot be used in the same query.",
+                            IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+                    }
+                    else
+                        return;
+                }
+            }
+        }
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2b60286..afdd9e5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -29,7 +29,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -65,7 +64,6 @@ import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
@@ -81,7 +79,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.cache.query.RegisteredQueryCursor;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
@@ -1277,6 +1274,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param lazy Lazy query execution flag.
      * @param mvccTracker Query tracker.
      * @param dataPageScanEnabled If data page scan is enabled.
+     * @param pageSize Page size.
      * @return Iterable result.
      */
     private Iterable<List<?>> runQueryTwoStep(
@@ -1291,7 +1289,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         final int[] parts,
         final boolean lazy,
         MvccQueryTracker mvccTracker,
-        Boolean dataPageScanEnabled
+        Boolean dataPageScanEnabled,
+        int pageSize
     ) {
         assert !qry.mvccEnabled() || !F.isEmpty(qry.cacheIds());
 
@@ -1301,11 +1300,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             GridNearTxLocal tx = tracker != null ? tx(ctx) : null;
 
-            if (qry.forUpdate()) {
-                // Locking has no meaning if SELECT FOR UPDATE is not executed in explicit transaction.
-                // So, we can can reset forUpdate flag if there is no explicit transaction.
-                qry.forUpdate(checkActive(tx) != null);
-            }
+            // Locking has no meaning if SELECT FOR UPDATE is not executed in explicit transaction.
+            // So, we can can reset forUpdate flag if there is no explicit transaction.
+            boolean forUpdate = qry.forUpdate() && checkActive(tx) != null;
 
             int opTimeout = operationTimeout(qryTimeout, tx);
 
@@ -1313,8 +1310,21 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 @SuppressWarnings("NullableProblems")
                 @Override public Iterator<List<?>> iterator() {
                     try {
-                        return rdcQryExec.query(schemaName, qry, keepCacheObj, enforceJoinOrder, opTimeout,
-                            cancel, params, parts, lazy, tracker, dataPageScanEnabled);
+                        return rdcQryExec.query(
+                            schemaName,
+                            qry,
+                            keepCacheObj,
+                            enforceJoinOrder,
+                            opTimeout,
+                            cancel,
+                            params,
+                            parts,
+                            lazy,
+                            tracker,
+                            dataPageScanEnabled,
+                            forUpdate,
+                            pageSize
+                        );
                     }
                     catch (Throwable e) {
                         if (tracker != null)
@@ -1743,12 +1753,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     res.addAll(qryRes);
 
                     firstArg += parseRes.parametersCount();
-
-                    H2TwoStepCachedQueryKey twoStepKey = parseRes.twoStepQueryKey();
-
-                    // We cannot cache two-step query for multiple statements query except the last statement
-                    if (twoStepQry != null && remainingSql == null && !twoStepQry.explain() && twoStepKey != null)
-                        twoStepCache.putIfAbsent(twoStepKey, new H2TwoStepCachedQuery(meta, twoStepQry.copy()));
                 }
             }
 
@@ -1788,29 +1792,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             qry.isEnforceJoinOrder(),
             qry.isLocal());
 
-        H2TwoStepCachedQuery cachedQry;
-
-        if ((cachedQry = twoStepCache.get(cachedQryKey)) != null) {
-            checkQueryType(qry, true);
-
-            GridCacheTwoStepQuery twoStepQry = cachedQry.query().copy();
-
-            List<GridQueryFieldMetadata> meta = cachedQry.meta();
-
-            ParsingResult parseRes = new ParsingResult(null, qry, null, twoStepQry, cachedQryKey, meta);
+        H2TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
 
-            if (!twoStepQry.explain())
-                twoStepCache.putIfAbsent(cachedQryKey, new H2TwoStepCachedQuery(meta, twoStepQry.copy()));
-
-            return parseRes;
-        }
+        if (cachedQry != null)
+            return new ParsingResult(null, qry, null, cachedQry.query(), cachedQryKey, cachedQry.meta());
 
+        // Try parting as native command.
         ParsingResult parseRes = parseNativeCommand(schemaName, qry);
 
         if (parseRes != null)
             return parseRes;
 
-        // parse with h2 parser:
+        // Parse with H2.
         return parseAndSplit(schemaName, qry, firstArg);
     }
 
@@ -2006,7 +1999,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             args = Arrays.copyOfRange(argsOrig, firstArg, firstArg + paramsCnt);
         }
 
-       if (prepared.isQuery()) {
+        if (prepared.isQuery()) {
             try {
                 H2Utils.bindParameters(stmt, F.asList(args));
             }
@@ -2063,34 +2056,43 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         H2TwoStepCachedQueryKey cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, qry.getSql(),
             qry.isCollocated(), qry.isDistributedJoins(), qry.isEnforceJoinOrder(), qry.isLocal());
 
-        H2TwoStepCachedQuery cachedQry;
+        H2TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
 
-        if ((cachedQry = twoStepCache.get(cachedQryKey)) != null) {
-            checkQueryType(qry, true);
+        if (cachedQry == null) {
+            try {
+                GridCacheTwoStepQuery twoStepQry = GridSqlQuerySplitter.split(
+                    connMgr.connectionForThread().connection(newQry.getSchema()),
+                    prepared,
+                    newQry.getArgs(),
+                    newQry.isCollocated(),
+                    newQry.isDistributedJoins(),
+                    newQry.isEnforceJoinOrder(),
+                    newQry.isLocal(),
+                    this
+                );
 
-            GridCacheTwoStepQuery twoStepQry = cachedQry.query().copy();
+                List<GridQueryFieldMetadata> meta = H2Utils.meta(stmt.getMetaData());
 
-            List<GridQueryFieldMetadata> meta = cachedQry.meta();
+                cachedQry = new H2TwoStepCachedQuery(meta, twoStepQry);
 
-            return new ParsingResult(prepared, newQry, remainingSql, twoStepQry, cachedQryKey, meta);
+                if (remainingSql == null && !twoStepQry.explain())
+                    twoStepCache.putIfAbsent(cachedQryKey, cachedQry);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteSQLException("Failed to bind parameters: [qry=" + newQry.getSql() + ", params=" +
+                    Arrays.deepToString(newQry.getArgs()) + "]", IgniteQueryErrorCode.PARSING, e);
+            }
+            catch (SQLException e) {
+                throw new IgniteSQLException(e);
+            }
+            finally {
+                U.close(stmt, log);
+            }
         }
 
-        try {
-            GridCacheTwoStepQuery twoStepQry = split(prepared, newQry);
+        checkQueryType(qry, true);
 
-            return new ParsingResult(prepared, newQry, remainingSql, twoStepQry,
-                cachedQryKey, H2Utils.meta(stmt.getMetaData()));
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteSQLException("Failed to bind parameters: [qry=" + newQry.getSql() + ", params=" +
-                Arrays.deepToString(newQry.getArgs()) + "]", IgniteQueryErrorCode.PARSING, e);
-        }
-        catch (SQLException e) {
-            throw new IgniteSQLException(e);
-        }
-        finally {
-            U.close(stmt, log);
-        }
+        return new ParsingResult(prepared, newQry, remainingSql, cachedQry.query(), cachedQryKey, cachedQry.meta());
     }
 
     /**
@@ -2103,44 +2105,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * Split query into two-step query.
-     * @param prepared JDBC prepared statement.
-     * @param qry Original fields query.
-     * @return Two-step query.
-     * @throws IgniteCheckedException in case of error inside {@link GridSqlQuerySplitter}.
-     * @throws SQLException in case of error inside {@link GridSqlQuerySplitter}.
-     */
-    private GridCacheTwoStepQuery split(Prepared prepared, SqlFieldsQuery qry) throws IgniteCheckedException,
-        SQLException {
-        GridCacheTwoStepQuery res = GridSqlQuerySplitter.split(
-            connMgr.connectionForThread().connection(qry.getSchema()),
-            prepared,
-            qry.getArgs(),
-            qry.isCollocated(),
-            qry.isDistributedJoins(),
-            qry.isEnforceJoinOrder(),
-            partExtractor);
-
-        List<Integer> cacheIds = collectCacheIds(null, res);
-
-        if (!F.isEmpty(cacheIds) && res.hasSystemViews()) {
-            throw new IgniteSQLException("Normal tables and system views cannot be used in the same query.",
-                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-        }
-
-        if (F.isEmpty(cacheIds))
-            res.local(true);
-        else {
-            res.cacheIds(cacheIds);
-            res.local(qry.isLocal());
-        }
-
-        res.pageSize(qry.getPageSize());
-
-        return res;
-    }
-
-    /**
      * @param qry Sql fields query.autoStartTx(qry)
      * @return {@code True} if need to start transaction.
      */
@@ -2267,8 +2231,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (log.isDebugEnabled())
             log.debug("Parsed query: `" + qry.getSql() + "` into two step query: " + twoStepQry);
 
-        twoStepQry.pageSize(qry.getPageSize());
-
         if (cancel == null)
             cancel = new GridQueryCancel();
 
@@ -2316,7 +2278,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 parts,
                 qry.isLazy(),
                 mvccTracker,
-                qry.isDataPageScanEnabled()
+                qry.isDataPageScanEnabled(),
+                qry.getPageSize()
             );
 
             QueryCursorImpl<List<?>> cursor = registerAsNewQry
@@ -2428,60 +2391,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * @param cacheIds Cache IDs.
-     * @param twoStepQry Query.
-     * @throws IllegalStateException if segmented indices used with non-segmented indices.
-     */
-    private void processCaches(List<Integer> cacheIds, GridCacheTwoStepQuery twoStepQry) {
-        if (cacheIds.isEmpty())
-            return; // Nothing to check
-
-        GridCacheSharedContext sharedCtx = ctx.cache().context();
-
-        int expectedParallelism = 0;
-        GridCacheContext cctx0 = null;
-
-        boolean mvccEnabled = false;
-
-        for (int i = 0; i < cacheIds.size(); i++) {
-            Integer cacheId = cacheIds.get(i);
-
-            GridCacheContext cctx = sharedCtx.cacheContext(cacheId);
-
-            assert cctx != null;
-
-            if (i == 0) {
-                mvccEnabled = cctx.mvccEnabled();
-                cctx0 = cctx;
-            }
-            else if (cctx.mvccEnabled() != mvccEnabled)
-                MvccUtils.throwAtomicityModesMismatchException(cctx0, cctx);
-
-            if (!cctx.isPartitioned())
-                continue;
-
-            if (expectedParallelism == 0)
-                expectedParallelism = cctx.config().getQueryParallelism();
-            else if (cctx.config().getQueryParallelism() != expectedParallelism) {
-                throw new IllegalStateException("Using indexes with different parallelism levels in same query is " +
-                    "forbidden.");
-            }
-        }
-
-        twoStepQry.mvccEnabled(mvccEnabled);
-
-        if (twoStepQry.forUpdate()) {
-            if (cacheIds.size() != 1)
-                throw new IgniteSQLException("SELECT FOR UPDATE is supported only for queries " +
-                    "that involve single transactional cache.");
-
-            if (!mvccEnabled)
-                throw new IgniteSQLException("SELECT FOR UPDATE query requires transactional cache " +
-                    "with MVCC enabled.", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-        }
-    }
-
-    /**
      * Registers new class description.
      *
      * This implementation doesn't support type reregistration.
@@ -3039,47 +2948,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * Collect cache identifiers from two-step query.
-     *
-     * @param mainCacheId Id of main cache.
-     * @param twoStepQry Two-step query.
-     * @return Result.
-     */
-    @Nullable public List<Integer> collectCacheIds(@Nullable Integer mainCacheId, GridCacheTwoStepQuery twoStepQry) {
-        LinkedHashSet<Integer> caches0 = new LinkedHashSet<>();
-
-        int tblCnt = twoStepQry.tablesCount();
-
-        if (mainCacheId != null)
-            caches0.add(mainCacheId);
-
-        if (tblCnt > 0) {
-            for (QueryTable tblKey : twoStepQry.tables()) {
-                GridH2Table tbl = schemaMgr.dataTable(tblKey.schema(), tblKey.table());
-
-                if (tbl != null) {
-                    H2Utils.checkAndStartNotStartedCache(ctx, tbl);
-
-                    int cacheId = tbl.cacheId();
-
-                    caches0.add(cacheId);
-                }
-            }
-        }
-
-        if (caches0.isEmpty())
-            return null;
-        else {
-            //Prohibit usage indices with different numbers of segments in same query.
-            List<Integer> cacheIds = new ArrayList<>(caches0);
-
-            processCaches(cacheIds, twoStepQry);
-
-            return cacheIds;
-        }
-    }
-
-    /**
      * @param schemaName Schema.
      * @param conn Connection.
      * @param prepared Prepared statement.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index d87e7da..484f5b5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -935,19 +935,30 @@ public final class UpdatePlanBuilder {
             try (PreparedStatement stmt = conn.prepareStatement(selectQry)) {
                 H2Utils.bindParameters(stmt, F.asList(fieldsQry.getArgs()));
 
-                GridCacheTwoStepQuery qry = GridSqlQuerySplitter.split(conn,
+                GridCacheTwoStepQuery qry = GridSqlQuerySplitter.split(
+                    conn,
                     GridSqlQueryParser.prepared(stmt),
                     fieldsQry.getArgs(),
                     fieldsQry.isCollocated(),
                     fieldsQry.isDistributedJoins(),
                     fieldsQry.isEnforceJoinOrder(),
-                    idx.partitionExtractor());
+                    false,
+                    idx
+                );
 
-                boolean distributed = qry.skipMergeTable() &&  qry.mapQueries().size() == 1 &&
+                boolean distributed = !qry.isLocal() && qry.skipMergeTable() &&  qry.mapQueries().size() == 1 &&
                     !qry.mapQueries().get(0).hasSubQueries();
 
-                return distributed ? new DmlDistributedPlanInfo(qry.isReplicatedOnly(),
-                    idx.collectCacheIds(CU.cacheId(cacheName), qry)): null;
+                if (distributed) {
+                    List<Integer> cacheIds = H2Utils.collectCacheIds(idx, CU.cacheId(cacheName), qry.tables());
+
+                    H2Utils.collectMvccEnabled(idx, cacheIds);
+                    H2Utils.checkQuery(idx, cacheIds, qry.mvccEnabled(), qry.forUpdate(), qry.tables());
+
+                    return new DmlDistributedPlanInfo(qry.isReplicatedOnly(), cacheIds);
+                }
+                else
+                    return null;
             }
         }
         catch (SQLException e) {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index c6e75dd..406c323 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -170,7 +171,7 @@ public class GridSqlQuerySplitter {
      * @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
      * @param distributedJoins If distributed joins enabled.
      * @param enforceJoinOrder Enforce join order.
-     * @param partExtractor Partition extractor.
+     * @param idx Indexing.
      * @return Two step query.
      * @throws SQLException If failed.
      * @throws IgniteCheckedException If failed.
@@ -182,7 +183,8 @@ public class GridSqlQuerySplitter {
         boolean collocatedGrpBy,
         boolean distributedJoins,
         boolean enforceJoinOrder,
-        PartitionExtractor partExtractor
+        boolean local,
+        IgniteH2Indexing idx
     ) throws SQLException, IgniteCheckedException {
         SplitterContext.set(distributedJoins);
 
@@ -194,7 +196,8 @@ public class GridSqlQuerySplitter {
                 collocatedGrpBy,
                 distributedJoins,
                 enforceJoinOrder,
-                partExtractor
+                local,
+                idx
             );
         }
         finally {
@@ -209,7 +212,7 @@ public class GridSqlQuerySplitter {
      * @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
      * @param distributedJoins If distributed joins enabled.
      * @param enforceJoinOrder Enforce join order.
-     * @param partExtractor Partition extractor.
+     * @param idx Indexing.
      * @return Two step query.
      * @throws SQLException If failed.
      * @throws IgniteCheckedException If failed.
@@ -221,7 +224,8 @@ public class GridSqlQuerySplitter {
         boolean collocatedGrpBy,
         boolean distributedJoins,
         boolean enforceJoinOrder,
-        PartitionExtractor partExtractor
+        boolean local,
+        IgniteH2Indexing idx
     ) throws SQLException, IgniteCheckedException {
         if (params == null)
             params = GridCacheSqlQuery.EMPTY_PARAMS;
@@ -236,8 +240,12 @@ public class GridSqlQuerySplitter {
 
         qry.explain(false);
 
-        GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(params, collocatedGrpBy, distributedJoins,
-            partExtractor);
+        GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(
+            params,
+            collocatedGrpBy,
+            distributedJoins,
+            idx.partitionExtractor()
+        );
 
         // Normalization will generate unique aliases for all the table filters in FROM.
         // Also it will collect all tables and schemas from the query.
@@ -279,26 +287,27 @@ public class GridSqlQuerySplitter {
                 distributedJoins = false;
         }
 
-        // Setup resulting two step query and return it.
-        int paramsCnt = prepared.getParameters().size();
-
-        GridCacheTwoStepQuery twoStepQry = new GridCacheTwoStepQuery(originalSql, paramsCnt, splitter.tbls);
-
-        twoStepQry.reduceQuery(splitter.rdcSqlQry);
-
-        for (GridCacheSqlQuery mapSqlQry : splitter.mapSqlQrys)
-            twoStepQry.addMapQuery(mapSqlQry);
+        List<Integer> cacheIds = H2Utils.collectCacheIds(idx, null, splitter.tbls);
+        boolean mvccEnabled = H2Utils.collectMvccEnabled(idx, cacheIds);
 
-        twoStepQry.skipMergeTable(splitter.skipMergeTbl);
-        twoStepQry.explain(explain);
-        twoStepQry.distributedJoins(distributedJoins);
+        H2Utils.checkQuery(idx, cacheIds, mvccEnabled, forUpdate, splitter.tbls);
 
-        // all map queries must have non-empty derivedPartitions to use this feature.
-        twoStepQry.derivedPartitions(splitter.extractor.mergeMapQueries(twoStepQry.mapQueries()));
-
-        twoStepQry.forUpdate(forUpdate);
-
-        return twoStepQry;
+        // Setup resulting two step query and return it.
+        return new GridCacheTwoStepQuery(
+            originalSql,
+            prepared.getParameters().size(),
+            splitter.tbls,
+            splitter.rdcSqlQry,
+            splitter.mapSqlQrys,
+            splitter.skipMergeTbl,
+            explain,
+            distributedJoins,
+            forUpdate,
+            splitter.extractor.mergeMapQueries(splitter.mapSqlQrys),
+            cacheIds,
+            mvccEnabled,
+            local || F.isEmpty(cacheIds)
+        );
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3ff6dfe..046840e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -386,6 +386,8 @@ public class GridReduceQueryExecutor {
      * @param lazy Lazy execution flag.
      * @param mvccTracker Query tracker.
      * @param dataPageScanEnabled If data page scan is enabled.
+     * @param forUpdate For update flag.
+     * @param pageSize Page size.
      * @return Rows iterator.
      */
     @SuppressWarnings({"BusyWait", "IfMayBeConditional"})
@@ -400,7 +402,9 @@ public class GridReduceQueryExecutor {
         int[] parts,
         boolean lazy,
         MvccQueryTracker mvccTracker,
-        Boolean dataPageScanEnabled
+        Boolean dataPageScanEnabled,
+        boolean forUpdate,
+        int pageSize
     ) {
         if (qry.isLocal() && parts != null)
             parts = null;
@@ -412,9 +416,17 @@ public class GridReduceQueryExecutor {
         if (F.isEmpty(params))
             params = EMPTY_PARAMS;
 
-        final List<GridCacheSqlQuery> mapQueries = singlePartMode ?
-            prepareMapQueryForSinglePartition(qry, params) :
-            qry.mapQueries();
+        List<GridCacheSqlQuery> mapQueries;
+
+        if (singlePartMode)
+            mapQueries = prepareMapQueryForSinglePartition(qry, params);
+        else {
+            mapQueries = new ArrayList<>(qry.mapQueries().size());
+
+            // Copy queries here because node ID will be changed below.
+            for (GridCacheSqlQuery mapQry : qry.mapQueries())
+                mapQueries.add(mapQry.copy());
+        }
 
         final boolean isReplicatedOnly = qry.isReplicatedOnly();
 
@@ -458,7 +470,7 @@ public class GridReduceQueryExecutor {
 
             AffinityTopologyVersion topVer;
 
-            if (qry.forUpdate()) {
+            if (forUpdate) {
                 // Indexing should have started TX at this point for FOR UPDATE query.
                 assert mvccEnabled && curTx != null;
 
@@ -491,8 +503,13 @@ public class GridReduceQueryExecutor {
 
             long qryReqId = qryIdGen.incrementAndGet();
 
-            final ReduceQueryRun r = new ReduceQueryRun(h2.connections().connectionForThread().connection(schemaName),
-                mapQueries.size(), qry.pageSize(), sfuFut, dataPageScanEnabled);
+            final ReduceQueryRun r = new ReduceQueryRun(
+                h2.connections().connectionForThread().connection(schemaName),
+                mapQueries.size(),
+                pageSize,
+                sfuFut,
+                dataPageScanEnabled
+            );
 
             Collection<ClusterNode> nodes;
 
@@ -683,7 +700,7 @@ public class GridReduceQueryExecutor {
 
                 final C2<ClusterNode, Message, Message> spec;
 
-                if (qry.forUpdate()) {
+                if (forUpdate) {
                     final AtomicInteger cnt = new AtomicInteger();
 
                     spec = new C2<ClusterNode, Message, Message>() {
@@ -1344,6 +1361,7 @@ public class GridReduceQueryExecutor {
         for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
             if (mapQry.hasSubQueries()) {
                 hasSubQries = true;
+
                 break;
             }
         }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
index 91ab3e5..ccd14c3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -24,9 +24,10 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
+
+import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSelectForUpdateFuture;
-import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.util.typedef.F;
 import org.h2.jdbc.JdbcConnection;
 import org.jetbrains.annotations.Nullable;
@@ -75,7 +76,7 @@ public class ReduceQueryRun {
 
         idxs = new ArrayList<>(idxsCnt);
 
-        this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
+        this.pageSize = pageSize > 0 ? pageSize : Query.DFLT_PAGE_SIZE;
         this.selectForUpdateFut = selectForUpdateFut;
         this.dataPageScanEnabled  = dataPageScanEnabled;
     }