You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/03 10:46:09 UTC

[ignite] branch master updated: IGNITE-11169: SQL: encapsulated distributed join information into separate context object to simplify (and possibly remove in future) GridH2QueryContext object. This closes #6005.

This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c71e7df  IGNITE-11169: SQL: encapsulated distributed join information into separate context object to simplify (and possibly remove in future) GridH2QueryContext object. This closes #6005.
c71e7df is described below

commit c71e7dfed112b57861620270eb31c11226ebf7b9
Author: devozerov <pp...@gmail.com>
AuthorDate: Sun Feb 3 13:45:58 2019 +0300

    IGNITE-11169: SQL: encapsulated distributed join information into separate context object to simplify (and possibly remove in future) GridH2QueryContext object. This closes #6005.
---
 .../processors/query/h2/IgniteH2Indexing.java      |  48 ++--
 .../processors/query/h2/opt/GridH2IndexBase.java   |  52 ++--
 .../query/h2/opt/GridH2QueryContext.java           | 290 ++-------------------
 .../processors/query/h2/opt/GridH2QueryType.java   |   7 +-
 .../query/h2/opt/join/CollocationModel.java        |  70 +++--
 .../query/h2/opt/join/DistributedJoinContext.java  | 263 +++++++++++++++++++
 .../query/h2/opt/join/DistributedJoinMode.java     |  51 ----
 .../query/h2/opt/join/DistributedLookupBatch.java  |  64 ++---
 .../processors/query/h2/opt/join/RangeStream.java  |  13 +-
 .../query/h2/sql/GridSqlQuerySplitter.java         |  43 +++
 .../processors/query/h2/sql/SplitterContext.java   |  85 ++++++
 .../query/h2/twostep/GridMapQueryExecutor.java     | 118 ++++-----
 .../query/h2/twostep/GridReduceQueryExecutor.java  |  13 +-
 13 files changed, 610 insertions(+), 507 deletions(-)

diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 1579ec6..5b83088 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -182,9 +182,6 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
 import static org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_CACHE_ID;
 import static org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_STATE;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.isDataPageScanEnabled;
 
 /**
@@ -545,8 +542,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
             }
 
-            final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL)
-                .filter(filter).distributedJoinMode(OFF);
+            final GridH2QueryContext ctx = new GridH2QueryContext(
+                nodeId,
+                nodeId,
+                0,
+                0,
+                LOCAL,
+                filter
+            );
 
             boolean forUpdate = GridSqlQueryParser.isForUpdateQuery(p);
 
@@ -691,8 +694,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                             }
                         }
 
-                        return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 != null,
-                            detachedConn);
+                        return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 != null, detachedConn);
                     }
                     catch (IgniteCheckedException | RuntimeException | Error e) {
                         detachedConn.recycle();
@@ -1843,8 +1845,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (!hasTwoStep)
             return new ParsingResult(prepared, newQry, remainingSql, null, null, null);
 
-        final UUID locNodeId = ctx.localNodeId();
-
         // Now we're sure to have a distributed query. Let's try to get a two-step plan from the cache, or perform the
         // split if needed.
         H2TwoStepCachedQueryKey cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, qry.getSql(),
@@ -1863,28 +1863,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
 
         try {
-            GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE)
-                .distributedJoinMode(distributedJoinMode(qry.isLocal(), qry.isDistributedJoins())));
-
-            try {
-                GridCacheTwoStepQuery twoStepQry = split(prepared, newQry);
+            GridCacheTwoStepQuery twoStepQry = split(prepared, newQry);
 
-                return new ParsingResult(prepared, newQry, remainingSql, twoStepQry,
-                    cachedQryKey, H2Utils.meta(stmt.getMetaData()));
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteSQLException("Failed to bind parameters: [qry=" + newQry.getSql() + ", params=" +
-                    Arrays.deepToString(newQry.getArgs()) + "]", IgniteQueryErrorCode.PARSING, e);
-            }
-            catch (SQLException e) {
-                throw new IgniteSQLException(e);
-            }
-            finally {
-                U.close(stmt, log);
-            }
+            return new ParsingResult(prepared, newQry, remainingSql, twoStepQry,
+                cachedQryKey, H2Utils.meta(stmt.getMetaData()));
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteSQLException("Failed to bind parameters: [qry=" + newQry.getSql() + ", params=" +
+                Arrays.deepToString(newQry.getArgs()) + "]", IgniteQueryErrorCode.PARSING, e);
+        }
+        catch (SQLException e) {
+            throw new IgniteSQLException(e);
         }
         finally {
-            GridH2QueryContext.clearThreadLocal();
+            U.close(stmt, log);
         }
     }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index ab9929d..c53d0ec 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -31,11 +31,13 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.H2Cursor;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
 import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch;
 import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel;
 import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource;
 import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream;
 import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey;
+import org.apache.ignite.internal.processors.query.h2.sql.SplitterContext;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -55,7 +57,6 @@ import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
 import org.h2.index.IndexCondition;
 import org.h2.index.IndexLookupBatch;
-import org.h2.index.ViewIndex;
 import org.h2.message.DbException;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
@@ -73,10 +74,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import static java.util.Collections.singletonList;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel.buildCollocationModel;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
@@ -206,41 +204,23 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
     /**
      * @param ses Session.
-     */
-    private static void clearViewIndexCache(Session ses) {
-        Map<Object,ViewIndex> viewIdxCache = ses.getViewIndexCache(true);
-
-        if (!viewIdxCache.isEmpty())
-            viewIdxCache.clear();
-    }
-
-    /**
-     * @param ses Session.
      * @param filters All joined table filters.
      * @param filter Current filter.
      * @return Multiplier.
      */
     public final int getDistributedMultiplier(Session ses, TableFilter[] filters, int filter) {
-        GridH2QueryContext qctx = GridH2QueryContext.get();
-
         // We do optimizations with respect to distributed joins only on PREPARE stage only.
         // Notice that we check for isJoinBatchEnabled, because we can do multiple different
         // optimization passes on PREPARE stage.
         // Query expressions can not be distributed as well.
-        if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF ||
-            !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression())
-            return CollocationModel.MULTIPLIER_COLLOCATED;
+        SplitterContext ctx = SplitterContext.get();
 
-        // We have to clear this cache because normally sub-query plan cost does not depend on anything
-        // other than index condition masks and sort order, but in our case it can depend on order
-        // of previous table filters.
-        clearViewIndexCache(ses);
+        if (!ctx.distributedJoins() || !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression())
+            return CollocationModel.MULTIPLIER_COLLOCATED;
 
         assert filters != null;
 
-        CollocationModel c = buildCollocationModel(qctx, ses.getSubQueryInfo(), filters, filter, false);
-
-        return c.calculateMultiplier();
+        return CollocationModel.distributedMultiplier(ctx, ses, filters, filter);
     }
 
     /** {@inheritDoc} */
@@ -287,7 +267,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
     @Override public IndexLookupBatch createLookupBatch(TableFilter[] filters, int filter) {
         GridH2QueryContext qctx = GridH2QueryContext.get();
 
-        if (qctx == null || qctx.distributedJoinMode() == OFF || !getTable().isPartitioned())
+        if (qctx == null || qctx.distributedJoinContext() == null || !getTable().isPartitioned())
             return null;
 
         IndexColumn affCol = getTable().getAffinityKeyColumn();
@@ -387,6 +367,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
         if (qctx == null)
             res.status(STATUS_NOT_FOUND);
         else {
+            DistributedJoinContext joinCtx = qctx.distributedJoinContext();
+
+            assert joinCtx != null;
+
             try {
                 RangeSource src;
 
@@ -398,14 +382,14 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 }
                 else {
                     // This is request to fetch next portion of data.
-                    src = qctx.getSource(node.id(), msg.segment(), msg.batchLookupId());
+                    src = joinCtx.getSource(node.id(), msg.segment(), msg.batchLookupId());
 
                     assert src != null;
                 }
 
                 List<GridH2RowRange> ranges = new ArrayList<>();
 
-                int maxRows = qctx.pageSize();
+                int maxRows = joinCtx.pageSize();
 
                 assert maxRows > 0 : maxRows;
 
@@ -426,11 +410,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 if (src.hasMoreRows()) {
                     // Save source for future fetches.
                     if (msg.bounds() != null)
-                        qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
+                        joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
                 }
                 else if (msg.bounds() == null) {
                     // Drop saved source.
-                    qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
+                    joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
                 }
 
                 res.ranges(ranges);
@@ -466,7 +450,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
         if (qctx == null)
             return;
 
-        Map<SegmentKey, RangeStream> streams = qctx.getStreams(msg.batchLookupId());
+        DistributedJoinContext joinCtx = qctx.distributedJoinContext();
+
+        assert joinCtx != null;
+
+        Map<SegmentKey, RangeStream> streams = joinCtx.getStreams(msg.batchLookupId());
 
         if (streams == null)
             return;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index babced3..40e7edb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -17,26 +17,19 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode;
-import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel;
-import org.apache.ignite.internal.processors.query.h2.opt.join.SourceKey;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
 import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 
 /**
@@ -53,40 +46,13 @@ public class GridH2QueryContext {
     private final QueryContextKey key;
 
     /** */
-    private volatile boolean cleared;
-
-    /** */
     private List<GridReservable> reservations;
 
-    /** Range streams for indexes. */
-    private Map<Integer, Object> streams;
-
-    /** Range sources for indexes. */
-    private Map<SourceKey, Object> sources;
-
-    /** */
-    private int batchLookupIdGen;
-
-    /** */
-    private IndexingQueryFilter filter;
-
-    /** */
-    private AffinityTopologyVersion topVer;
-
     /** */
-    private Map<UUID, int[]> partsMap;
+    private final IndexingQueryFilter filter;
 
-    /** */
-    private UUID[] partsNodes;
-
-    /** */
-    private DistributedJoinMode distributedJoinMode;
-
-    /** */
-    private int pageSize;
-
-    /** */
-    private CollocationModel qryCollocationMdl;
+    /** Distributed join context. */
+    private DistributedJoinContext distributedJoinCtx;
 
     /** */
     private MvccSnapshot mvccSnapshot;
@@ -98,29 +64,22 @@ public class GridH2QueryContext {
      * @param locNodeId Local node ID.
      * @param nodeId The node who initiated the query.
      * @param qryId The query ID.
-     * @param type Query type.
-     */
-    public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
-        assert type != MAP;
-
-        key = new QueryContextKey(locNodeId, nodeId, qryId, 0, type);
-    }
-
-    /**
-     * @param locNodeId Local node ID.
-     * @param nodeId The node who initiated the query.
-     * @param qryId The query ID.
      * @param segmentId Index segment ID.
      * @param type Query type.
      */
-    public GridH2QueryContext(UUID locNodeId,
+    public GridH2QueryContext(
+        UUID locNodeId,
         UUID nodeId,
         long qryId,
         int segmentId,
-        GridH2QueryType type) {
+        GridH2QueryType type,
+        IndexingQueryFilter filter
+    ) {
         assert segmentId == 0 || type == MAP;
 
         key = new QueryContextKey(locNodeId, nodeId, qryId, segmentId, type);
+
+        this.filter = filter;
     }
 
     /**
@@ -141,61 +100,27 @@ public class GridH2QueryContext {
     }
 
     /**
-     * @return Type.
-     */
-    public GridH2QueryType type() {
-        return key.type();
-    }
-
-    /**
-     * @return Origin node ID.
-     */
-    public UUID originNodeId() {
-        return key.nodeId();
-    }
-
-    /**
-     * @return Query request ID.
-     */
-    public long queryId() {
-        return key.queryId();
-    }
-
-    /**
-     * @return Query collocation model.
-     */
-    public CollocationModel queryCollocationModel() {
-        return qryCollocationMdl;
-    }
-
-    /**
-     * @param qryCollocationMdl Query collocation model.
+     * @param distributedJoinCtx Distributed join context.
+     * @return This instance for chaining.
      */
-    public void queryCollocationModel(CollocationModel qryCollocationMdl) {
-        this.qryCollocationMdl = qryCollocationMdl;
-    }
-
-    /**
-     * @param distributedJoinMode Distributed join mode.
-     * @return {@code this}.
-     */
-    public GridH2QueryContext distributedJoinMode(DistributedJoinMode distributedJoinMode) {
-        this.distributedJoinMode = distributedJoinMode;
+    public GridH2QueryContext distributedJoinContext(@Nullable DistributedJoinContext distributedJoinCtx) {
+        this.distributedJoinCtx = distributedJoinCtx;
 
         return this;
     }
 
     /**
-     * @return Distributed join mode.
+     * @return Distributed join context.
      */
-    public DistributedJoinMode distributedJoinMode() {
-        return distributedJoinMode;
+    @Nullable public DistributedJoinContext distributedJoinContext() {
+        return distributedJoinCtx;
     }
 
     /**
      * @param reservations Reserved partitions or group reservations.
      * @return {@code this}.
      */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     public GridH2QueryContext reservations(List<GridReservable> reservations) {
         this.reservations = reservations;
 
@@ -203,148 +128,13 @@ public class GridH2QueryContext {
     }
 
     /**
-     * @param topVer Topology version.
-     * @return {@code this}.
+     * @return index segment ID.
      */
-    public GridH2QueryContext topologyVersion(AffinityTopologyVersion topVer) {
-        this.topVer = topVer;
-
-        return this;
-    }
-
-    /**
-     * @return Topology version.
-     */
-    public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-
-    /**
-     * @param partsMap Partitions map.
-     * @return {@code this}.
-     */
-    public GridH2QueryContext partitionsMap(Map<UUID,int[]> partsMap) {
-        this.partsMap = partsMap;
-
-        return this;
-    }
-
-    /**
-     * @return Partitions map.
-     */
-    public Map<UUID,int[]> partitionsMap() {
-        return partsMap;
-    }
-
-    /**
-     * @param p Partition.
-     * @param cctx Cache context.
-     * @return Owning node ID.
-     */
-    public UUID nodeForPartition(int p, GridCacheContext<?, ?> cctx) {
-        UUID[] nodeIds = partsNodes;
-
-        if (nodeIds == null) {
-            assert partsMap != null;
-
-            nodeIds = new UUID[cctx.affinity().partitions()];
-
-            for (Map.Entry<UUID, int[]> e : partsMap.entrySet()) {
-                UUID nodeId = e.getKey();
-                int[] nodeParts = e.getValue();
-
-                assert nodeId != null;
-                assert !F.isEmpty(nodeParts);
-
-                for (int part : nodeParts) {
-                    assert nodeIds[part] == null;
-
-                    nodeIds[part] = nodeId;
-                }
-            }
-
-            partsNodes = nodeIds;
-        }
-
-        return nodeIds[p];
-    }
-
-    /** @return index segment ID. */
     public int segment() {
         return key.segmentId();
     }
 
     /**
-     * @param batchLookupId Batch lookup ID.
-     * @param streams Range streams.
-     */
-    public synchronized void putStreams(int batchLookupId, Object streams) {
-        if (this.streams == null) {
-            if (streams == null)
-                return;
-
-            this.streams = new HashMap<>();
-        }
-
-        if (streams == null)
-            this.streams.remove(batchLookupId);
-        else
-            this.streams.put(batchLookupId, streams);
-    }
-
-    /**
-     * @param batchLookupId Batch lookup ID.
-     * @return Range streams.
-     */
-    @SuppressWarnings("unchecked")
-    public synchronized <T> T getStreams(int batchLookupId) {
-        if (streams == null)
-            return null;
-
-        return (T)streams.get(batchLookupId);
-    }
-
-    /**
-     * @param ownerId Owner node ID.
-     * @param segmentId Index segment ID.
-     * @param batchLookupId Batch lookup ID.
-     * @param src Range source.
-     */
-    public synchronized void putSource(UUID ownerId, int segmentId, int batchLookupId, Object src) {
-        SourceKey srcKey = new SourceKey(ownerId, segmentId, batchLookupId);
-
-        if (src != null) {
-            if (sources == null)
-                sources = new HashMap<>();
-
-            sources.put(srcKey, src);
-        }
-        else if (sources != null)
-            sources.remove(srcKey);
-    }
-
-    /**
-     * @param ownerId Owner node ID.
-     * @param segmentId Index segment ID.
-     * @param batchLookupId Batch lookup ID.
-     * @return Range source.
-     */
-    @SuppressWarnings("unchecked")
-    public synchronized <T> T getSource(UUID ownerId, int segmentId, int batchLookupId) {
-        if (sources == null)
-            return null;
-
-        return (T)sources.get(new SourceKey(ownerId, segmentId, batchLookupId));
-    }
-
-    /**
-     * @return Next batch ID.
-     */
-    public int nextBatchLookupId() {
-        return ++batchLookupIdGen;
-    }
-
-    /**
      * Sets current thread local context. This method must be called when all the non-volatile properties are
      * already set to ensure visibility for other threads.
      *
@@ -354,7 +144,7 @@ public class GridH2QueryContext {
          assert qctx.get() == null;
 
          // We need MAP query context to be available to other threads to run distributed joins.
-         if (x.key.type() == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null)
+         if (x.key.type() == MAP && x.distributedJoinContext() != null && qctxs.putIfAbsent(x.key, x) != null)
              throw new IllegalStateException("Query context is already set.");
 
          qctx.set(x);
@@ -421,7 +211,8 @@ public class GridH2QueryContext {
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     public void clearContext(boolean nodeStop) {
-        cleared = true;
+        if (distributedJoinCtx != null)
+            distributedJoinCtx.cancel();
 
         List<GridReservable> r = reservations;
 
@@ -432,13 +223,6 @@ public class GridH2QueryContext {
     }
 
     /**
-     * @return {@code true} If the context is cleared.
-     */
-    public boolean isCleared() {
-        return cleared;
-    }
-
-    /**
      * @param locNodeId Local node ID.
      * @param nodeId Dead node ID.
      */
@@ -496,33 +280,6 @@ public class GridH2QueryContext {
     }
 
     /**
-     * @param filter Filter.
-     * @return {@code this}.
-     */
-    public GridH2QueryContext filter(IndexingQueryFilter filter) {
-        this.filter = filter;
-
-        return this;
-    }
-
-    /**
-     * @return Page size.
-     */
-    public int pageSize() {
-        return pageSize;
-    }
-
-    /**
-     * @param pageSize Page size.
-     * @return {@code this}.
-     */
-    public GridH2QueryContext pageSize(int pageSize) {
-        this.pageSize = pageSize;
-
-        return this;
-    }
-
-    /**
      * @return Lazy worker, if any, or {@code null} if none.
      */
     public MapQueryLazyWorker lazyWorker() {
@@ -543,5 +300,4 @@ public class GridH2QueryContext {
     @Override public String toString() {
         return S.toString(GridH2QueryContext.class, this);
     }
-
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java
index f6d0408..ae0b053 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java
@@ -40,10 +40,5 @@ public enum GridH2QueryType {
      * Replicated query over a network. Such a query can be sent from a client node or node which
      * did not load all the partitions yet.
      */
-    REPLICATED,
-
-    /**
-     * Parsing and optimization stage.
-     */
-    PREPARE,
+    REPLICATED;
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
index 03653c7..e5eea1d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
@@ -20,16 +20,18 @@ package org.apache.ignite.internal.processors.query.h2.opt.join;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import javax.cache.CacheException;
 
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.processors.query.h2.sql.SplitterContext;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.h2.command.dml.Query;
 import org.h2.command.dml.Select;
 import org.h2.command.dml.SelectUnion;
+import org.h2.engine.Session;
 import org.h2.expression.Comparison;
 import org.h2.expression.Expression;
 import org.h2.expression.ExpressionColumn;
@@ -513,14 +515,6 @@ public final class CollocationModel {
     }
 
     /**
-     * @return Multiplier.
-     */
-    public int calculateMultiplier() {
-        // We don't need multiplier for union here because it will be summarized in H2.
-        return multiplier(false);
-    }
-
-    /**
      * @param withUnion With respect to union.
      * @return Multiplier.
      */
@@ -629,29 +623,56 @@ public final class CollocationModel {
     }
 
     /**
-     * @param qctx Query context.
+     * Get distributed multiplier for the given sequence of tables.
+     *
+     * @param ctx Splitter context.
+     * @param ses Session.
+     * @param filters Filters.
+     * @param filter Filter index.
+     * @return Multiplier.
+     */
+    public static int distributedMultiplier(
+        SplitterContext ctx,
+        Session ses,
+        TableFilter[] filters,
+        int filter
+    ) {
+        clearViewIndexCache(ses);
+
+        CollocationModel model = buildCollocationModel(ctx, ses.getSubQueryInfo(), filters, filter, false);
+
+        return model.multiplier(false);
+    }
+
+    /**
+     * @param ctx Splitter context.
      * @param info Sub-query info.
      * @param filters Filters.
      * @param filter Filter.
      * @param validate Query validation flag.
      * @return Collocation.
      */
-    public static CollocationModel buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info,
-        TableFilter[] filters, int filter, boolean validate) {
+    private static CollocationModel buildCollocationModel(
+        SplitterContext ctx,
+        SubQueryInfo info,
+        TableFilter[] filters,
+        int filter,
+        boolean validate
+    ) {
         CollocationModel cm;
 
         if (info != null) {
             // Go up until we reach the root query.
-            cm = buildCollocationModel(qctx, info.getUpper(), info.getFilters(), info.getFilter(), validate);
+            cm = buildCollocationModel(ctx, info.getUpper(), info.getFilters(), info.getFilter(), validate);
         }
         else {
             // We are at the root query.
-            cm = qctx.queryCollocationModel();
+            cm = ctx.collocationModel();
 
             if (cm == null) {
                 cm = createChildModel(null, -1, null, true, validate);
 
-                qctx.queryCollocationModel(cm);
+                ctx.collocationModel(cm);
             }
         }
 
@@ -713,11 +734,13 @@ public final class CollocationModel {
      * @param validate Query validation flag.
      * @return Built model.
      */
-    private static CollocationModel buildCollocationModel(CollocationModel upper,
+    private static CollocationModel buildCollocationModel(
+        CollocationModel upper,
         int filter,
         Query qry,
         List<CollocationModel> unions,
-        boolean validate) {
+        boolean validate
+    ) {
         if (qry.isUnion()) {
             if (unions == null)
                 unions = new ArrayList<>();
@@ -769,6 +792,19 @@ public final class CollocationModel {
     }
 
     /**
+     * @param ses Session.
+     */
+    private static void clearViewIndexCache(Session ses) {
+        // We have to clear this cache because normally sub-query plan cost does not depend on anything
+        // other than index condition masks and sort order, but in our case it can depend on order
+        // of previous table filters.
+        Map<Object,ViewIndex> viewIdxCache = ses.getViewIndexCache(true);
+
+        if (!viewIdxCache.isEmpty())
+            viewIdxCache.clear();
+    }
+
+    /**
      * Collocation type.
      */
     private enum Type {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java
new file mode 100644
index 0000000..3960602
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinContext.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Context for distributed joins.
+ */
+public class DistributedJoinContext {
+    /** Local flag. */
+    private final boolean loc;
+
+    /** */
+    private final AffinityTopologyVersion topVer;
+
+    /** */
+    private final Map<UUID, int[]> partsMap;
+
+    /** */
+    private final UUID originNodeId;
+
+    /** */
+    private final long qryId;
+
+    /** */
+    private final int segment;
+
+    /** */
+    private final int pageSize;
+
+    /** Range streams for indexes. */
+    private Map<Integer, Object> streams;
+
+    /** Range sources for indexes. */
+    private Map<SourceKey, Object> sources;
+
+    /** */
+    private int batchLookupIdGen;
+
+    /** */
+    private UUID[] partsNodes;
+
+    /** */
+    private boolean cancelled;
+
+    /**
+     * Constructor.
+     *
+     * @param loc Local flag.
+     * @param topVer Topology version.
+     * @param partsMap Partitions map.
+     * @param originNodeId ID of the node started the query.
+     * @param qryId Query ID.
+     * @param segment Segment.
+     * @param pageSize Pahe size.
+     */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    public DistributedJoinContext(boolean loc, AffinityTopologyVersion topVer, Map<UUID, int[]> partsMap,
+        UUID originNodeId, long qryId, int segment, int pageSize) {
+        this.loc = loc;
+        this.topVer = topVer;
+        this.partsMap = partsMap;
+        this.originNodeId = originNodeId;
+        this.qryId = qryId;
+        this.segment = segment;
+        this.pageSize = pageSize;
+    }
+
+    /**
+     * @return Local flag.
+     */
+    public boolean local() {
+        return loc;
+    }
+
+    /**
+     * @return Affinity topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Partitions map.
+     */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    public Map<UUID,int[]> partitionsMap() {
+        return partsMap;
+    }
+
+    /**
+     * @return Origin node ID.
+     */
+    public UUID originNodeId() {
+        return originNodeId;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryId() {
+        return qryId;
+    }
+
+    /**
+     * @return index segment ID.
+     */
+    public int segment() {
+        return segment;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+
+    /**
+     * @param p Partition.
+     * @param cctx Cache context.
+     * @return Owning node ID.
+     */
+    public UUID nodeForPartition(int p, GridCacheContext<?, ?> cctx) {
+        UUID[] nodeIds = partsNodes;
+
+        if (nodeIds == null) {
+            assert partsMap != null;
+
+            nodeIds = new UUID[cctx.affinity().partitions()];
+
+            for (Map.Entry<UUID, int[]> e : partsMap.entrySet()) {
+                UUID nodeId = e.getKey();
+                int[] nodeParts = e.getValue();
+
+                assert nodeId != null;
+                assert !F.isEmpty(nodeParts);
+
+                for (int part : nodeParts) {
+                    assert nodeIds[part] == null;
+
+                    nodeIds[part] = nodeId;
+                }
+            }
+
+            partsNodes = nodeIds;
+        }
+
+        return nodeIds[p];
+    }
+
+    /**
+     * @param batchLookupId Batch lookup ID.
+     * @param streams Range streams.
+     */
+    public synchronized void putStreams(int batchLookupId, Object streams) {
+        if (this.streams == null) {
+            if (streams == null)
+                return;
+
+            this.streams = new HashMap<>();
+        }
+
+        if (streams == null)
+            this.streams.remove(batchLookupId);
+        else
+            this.streams.put(batchLookupId, streams);
+    }
+
+    /**
+     * @param batchLookupId Batch lookup ID.
+     * @return Range streams.
+     */
+    @SuppressWarnings("unchecked")
+    public synchronized <T> T getStreams(int batchLookupId) {
+        if (streams == null)
+            return null;
+
+        return (T)streams.get(batchLookupId);
+    }
+
+    /**
+     * @param ownerId Owner node ID.
+     * @param segmentId Index segment ID.
+     * @param batchLookupId Batch lookup ID.
+     * @param src Range source.
+     */
+    public synchronized void putSource(UUID ownerId, int segmentId, int batchLookupId, Object src) {
+        SourceKey srcKey = new SourceKey(ownerId, segmentId, batchLookupId);
+
+        if (src != null) {
+            if (sources == null)
+                sources = new HashMap<>();
+
+            sources.put(srcKey, src);
+        }
+        else if (sources != null)
+            sources.remove(srcKey);
+    }
+
+    /**
+     * @param ownerId Owner node ID.
+     * @param segmentId Index segment ID.
+     * @param batchLookupId Batch lookup ID.
+     * @return Range source.
+     */
+    @SuppressWarnings("unchecked")
+    public synchronized <T> T getSource(UUID ownerId, int segmentId, int batchLookupId) {
+        if (sources == null)
+            return null;
+
+        return (T)sources.get(new SourceKey(ownerId, segmentId, batchLookupId));
+    }
+
+    /**
+     * @return Next batch ID.
+     */
+    public int nextBatchLookupId() {
+        return ++batchLookupIdGen;
+    }
+
+    /**
+     * @return Cleared flag.
+     */
+    public boolean isCancelled() {
+        return cancelled;
+    }
+
+    /**
+     * Mark as cleared.
+     */
+    public void cancel() {
+        cancelled = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DistributedJoinContext.class, this);
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java
deleted file mode 100644
index 7c958da..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.opt.join;
-
-/**
- * Defines set of distributed join modes.
- */
-public enum DistributedJoinMode {
-    /**
-     * Distributed joins is disabled. Local joins will be performed instead.
-     */
-    OFF,
-
-    /**
-     * Distributed joins is enabled within local node only.
-     *
-     * NOTE: This mode is used with segmented indices for local sql queries.
-     * As in this case we need to make distributed join across local index segments
-     * and prevent range-queries to other nodes.
-     */
-    LOCAL_ONLY,
-
-    /**
-     * Distributed joins is enabled.
-     */
-    ON;
-
-    /**
-     * @param isLocal Query local flag.
-     * @param distributedJoins Query distributed joins flag.
-     * @return DistributedJoinMode for the query.
-     */
-    public static DistributedJoinMode distributedJoinMode(boolean isLocal, boolean distributedJoins) {
-        return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF;
-    }
-}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
index 5411a4a..26206c9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
@@ -44,7 +44,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Future;
 
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.LOCAL_ONLY;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
 
@@ -64,8 +63,8 @@ public class DistributedLookupBatch implements IndexLookupBatch {
     /** */
     private final int affColId;
 
-    /** */
-    private GridH2QueryContext qctx;
+    /** Join context. */
+    private DistributedJoinContext joinCtx;
 
     /** */
     private int batchLookupId;
@@ -143,27 +142,29 @@ public class DistributedLookupBatch implements IndexLookupBatch {
     /** {@inheritDoc} */
     @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
     @Override public boolean addSearchRows(SearchRow firstRow, SearchRow lastRow) {
-        if (qctx == null || findCalled) {
-            if (qctx == null) {
+        if (joinCtx == null || findCalled) {
+            if (joinCtx == null) {
                 // It is the first call after query begin (may be after reuse),
                 // reinitialize query context and result.
-                qctx = GridH2QueryContext.get();
+                GridH2QueryContext qctx = GridH2QueryContext.get();
                 res = new ArrayList<>();
 
                 assert qctx != null;
                 assert !findCalled;
+
+                joinCtx = qctx.distributedJoinContext();
             }
             else {
                 // Cleanup after the previous lookup phase.
                 assert batchLookupId != 0;
 
                 findCalled = false;
-                qctx.putStreams(batchLookupId, null);
+                joinCtx.putStreams(batchLookupId, null);
                 res.clear();
             }
 
             // Reinitialize for the next lookup phase.
-            batchLookupId = qctx.nextBatchLookupId();
+            batchLookupId = joinCtx.nextBatchLookupId();
             rangeStreams = new HashMap<>();
         }
 
@@ -212,9 +213,9 @@ public class DistributedLookupBatch implements IndexLookupBatch {
             List<GridH2RowRangeBounds> bounds;
 
             if (stream == null) {
-                stream = new RangeStream(cctx.kernalContext(), idx, qctx, segmentKey.node());
+                stream = new RangeStream(cctx.kernalContext(), idx, joinCtx, segmentKey.node());
 
-                stream.request(createRequest(qctx, batchLookupId, segmentKey.segmentId()));
+                stream.request(createRequest(joinCtx, batchLookupId, segmentKey.segmentId()));
                 stream.request().bounds(bounds = new ArrayList<>());
 
                 rangeStreams.put(segmentKey, stream);
@@ -225,7 +226,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
             bounds.add(rangeBounds);
 
             // If at least one node will have a full batch then we are ok.
-            if (bounds.size() >= qctx.pageSize())
+            if (bounds.size() >= joinCtx.pageSize())
                 batchFull = true;
         }
 
@@ -260,9 +261,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
      * @return {@code True} if local query execution is enforced.
      */
     private boolean localQuery() {
-        assert qctx != null : "Missing query context: " + this;
-
-        return qctx.distributedJoinMode() == LOCAL_ONLY;
+        return joinCtx != null && joinCtx.local();
     }
 
     /**
@@ -275,7 +274,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
             return;
         }
 
-        qctx.putStreams(batchLookupId, rangeStreams);
+        joinCtx.putStreams(batchLookupId, rangeStreams);
 
         // Start streaming.
         for (RangeStream stream : rangeStreams.values()) {
@@ -296,14 +295,16 @@ public class DistributedLookupBatch implements IndexLookupBatch {
 
     /** {@inheritDoc} */
     @Override public void reset(boolean beforeQry) {
-        if (beforeQry || qctx == null) // Query context can be null if addSearchRows was never called.
+        if (beforeQry || joinCtx == null) // Query context can be null if addSearchRows was never called.
             return;
 
         assert batchLookupId != 0;
 
         // Do cleanup after the query run.
-        qctx.putStreams(batchLookupId, null);
-        qctx = null; // The same query can be reused multiple times for different query contexts.
+        joinCtx.putStreams(batchLookupId, null);
+
+        joinCtx = null; // The same query can be reused multiple times for different query contexts.
+
         batchLookupId = 0;
 
         rangeStreams = Collections.emptyMap();
@@ -331,28 +332,28 @@ public class DistributedLookupBatch implements IndexLookupBatch {
         int partition = cctx.affinity().partition(affKeyObj);
 
         if (isLocalQry) {
-            if (qctx.partitionsMap() != null) {
+            if (joinCtx.partitionsMap() != null) {
                 // If we have explicit partitions map, we have to use it to calculate affinity node.
-                UUID nodeId = qctx.nodeForPartition(partition, cctx);
+                UUID nodeId = joinCtx.nodeForPartition(partition, cctx);
 
                 if(!cctx.localNodeId().equals(nodeId))
                     return null; // Prevent remote index call for local queries.
             }
 
-            if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, qctx.topologyVersion()))
+            if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, joinCtx.topologyVersion()))
                 return null;
 
             node = cctx.localNode();
         }
         else{
-            if (qctx.partitionsMap() != null) {
+            if (joinCtx.partitionsMap() != null) {
                 // If we have explicit partitions map, we have to use it to calculate affinity node.
-                UUID nodeId = qctx.nodeForPartition(partition, cctx);
+                UUID nodeId = joinCtx.nodeForPartition(partition, cctx);
 
                 node = cctx.discovery().node(nodeId);
             }
             else // Get primary node for current topology version.
-                node = cctx.affinity().primaryByKey(affKeyObj, qctx.topologyVersion());
+                node = cctx.affinity().primaryByKey(affKeyObj, joinCtx.topologyVersion());
 
             if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
                 throw H2Utils.retryException("Failed to get primary node by key for range segment.");
@@ -366,7 +367,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
      * @return Collection of nodes for broadcasting.
      */
     public List<SegmentKey> broadcastSegments(boolean isLocalQry) {
-        Map<UUID, int[]> partMap = qctx.partitionsMap();
+        Map<UUID, int[]> partMap = joinCtx.partitionsMap();
 
         List<ClusterNode> nodes;
 
@@ -378,7 +379,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
         }
         else {
             if (partMap == null)
-                nodes = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+                nodes = new ArrayList<>(CU.affinityNodes(cctx, joinCtx.topologyVersion()));
             else {
                 nodes = new ArrayList<>(partMap.size());
 
@@ -411,17 +412,18 @@ public class DistributedLookupBatch implements IndexLookupBatch {
     }
 
     /**
-     * @param qctx Query context.
+     * @param joinCtx Join context.
      * @param batchLookupId Batch lookup ID.
      * @param segmentId Segment ID.
      * @return Index range request.
      */
-    public static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId, int segmentId) {
+    public static GridH2IndexRangeRequest createRequest(DistributedJoinContext joinCtx, int batchLookupId,
+        int segmentId) {
         GridH2IndexRangeRequest req = new GridH2IndexRangeRequest();
 
-        req.originNodeId(qctx.originNodeId());
-        req.queryId(qctx.queryId());
-        req.originSegmentId(qctx.segment());
+        req.originNodeId(joinCtx.originNodeId());
+        req.queryId(joinCtx.queryId());
+        req.originSegmentId(joinCtx.segment());
         req.segment(segmentId);
         req.batchLookupId(batchLookupId);
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
index 0684089..b80cd61 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -62,7 +61,7 @@ public class RangeStream {
     private final GridH2IndexBase idx;
 
     /** */
-    private final GridH2QueryContext qctx;
+    private final DistributedJoinContext joinCtx;
 
     /** */
     private final ClusterNode node;
@@ -86,14 +85,14 @@ public class RangeStream {
     private int cursorRangeId = -1;
 
     /**
-     * @param qctx Query context.
+     * @param joinCtx Join context.
      * @param node Node.
      */
-    public RangeStream(GridKernalContext ctx, GridH2IndexBase idx, GridH2QueryContext qctx, ClusterNode node) {
+    public RangeStream(GridKernalContext ctx, GridH2IndexBase idx, DistributedJoinContext joinCtx, ClusterNode node) {
         this.ctx = ctx;
         this.idx = idx;
         this.node = node;
-        this.qctx = qctx;
+        this.joinCtx = joinCtx;
     }
 
     /**
@@ -137,7 +136,7 @@ public class RangeStream {
         final long start = U.currentTimeMillis();
 
         for (int attempt = 0;; attempt++) {
-            if (qctx.isCleared())
+            if (joinCtx.isCancelled())
                 throw H2Utils.retryException("Query is cancelled.");
 
             if (ctx.isStopping())
@@ -164,7 +163,7 @@ public class RangeStream {
 
                         if (remainingRanges > 0) {
                             if (req.bounds() != null)
-                                req = DistributedLookupBatch.createRequest(qctx, req.batchLookupId(), req.segment());
+                                req = DistributedLookupBatch.createRequest(joinCtx, req.batchLookupId(), req.segment());
 
                             // Prefetch next page.
                             idx.send(singletonList(node), req);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 741f859..28166d5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -129,6 +129,7 @@ public class GridSqlQuerySplitter {
      * @param distributedJoins Distributed joins flag.
      * @param extractor Partition extractor.
      */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy, boolean distributedJoins,
         PartitionExtractor extractor) {
         this.params = params;
@@ -183,6 +184,45 @@ public class GridSqlQuerySplitter {
         boolean enforceJoinOrder,
         PartitionExtractor partExtractor
     ) throws SQLException, IgniteCheckedException {
+        SplitterContext.set(distributedJoins);
+
+        try {
+            return split0(
+                conn,
+                prepared,
+                params,
+                collocatedGrpBy,
+                distributedJoins,
+                enforceJoinOrder,
+                partExtractor
+            );
+        }
+        finally {
+            SplitterContext.set(false);
+        }
+    }
+
+    /**
+     * @param conn Connection.
+     * @param prepared Prepared.
+     * @param params Parameters.
+     * @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
+     * @param distributedJoins If distributed joins enabled.
+     * @param enforceJoinOrder Enforce join order.
+     * @param partExtractor Partition extractor.
+     * @return Two step query.
+     * @throws SQLException If failed.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static GridCacheTwoStepQuery split0(
+        Connection conn,
+        Prepared prepared,
+        Object[] params,
+        boolean collocatedGrpBy,
+        boolean distributedJoins,
+        boolean enforceJoinOrder,
+        PartitionExtractor partExtractor
+    ) throws SQLException, IgniteCheckedException {
         if (params == null)
             params = GridCacheSqlQuery.EMPTY_PARAMS;
 
@@ -836,6 +876,7 @@ public class GridSqlQuerySplitter {
      * @param wrapAlias Alias of the wrap query.
      * @param select The original select.
      */
+    @SuppressWarnings("IfMayBeConditional")
     private void pushDownSelectColumns(
         Set<GridSqlAlias> tblAliases,
         Map<String,GridSqlAlias> cols,
@@ -1237,6 +1278,7 @@ public class GridSqlQuerySplitter {
      * @param cols Columns from SELECT clause.
      * @return Map of columns with types.
      */
+    @SuppressWarnings("IfMayBeConditional")
     private LinkedHashMap<String,?> collectColumns(List<GridSqlAst> cols) {
         LinkedHashMap<String, GridSqlType> res = new LinkedHashMap<>(cols.size(), 1f, false);
 
@@ -1523,6 +1565,7 @@ public class GridSqlQuerySplitter {
      * @param hasDistinctAggregate If query has distinct aggregate expression.
      * @param first If this is the first aggregate found in this expression.
      */
+    @SuppressWarnings("IfMayBeConditional")
     private void splitAggregate(
         GridSqlAst parentExpr,
         int aggIdx,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterContext.java
new file mode 100644
index 0000000..013cd0e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterContext.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.sql;
+
+import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel;
+
+/**
+ * Splitter context while delegates optimization information to H2 internals.
+ */
+public class SplitterContext {
+    /** Empty context. */
+    private static final SplitterContext EMPTY = new SplitterContext(false);
+
+    /** Splitter context. */
+    private static final ThreadLocal<SplitterContext> CTX = ThreadLocal.withInitial(() -> EMPTY);
+
+    /** Whether distributed joins are enabled. */
+    private final boolean distributedJoins;
+
+    /** Query collocation model. */
+    private CollocationModel collocationModel;
+
+    /**
+     * @return Current context.
+     */
+    public static SplitterContext get() {
+        return CTX.get();
+    }
+
+    /**
+     * Set new context.
+     *
+     * @param distributedJoins Whether distributed joins are enabled.
+     */
+    public static void set(boolean distributedJoins) {
+        SplitterContext ctx = distributedJoins ? new SplitterContext(true) : EMPTY;
+
+        CTX.set(ctx);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param distributedJoins Whether distributed joins are enabled.
+     */
+    public SplitterContext(boolean distributedJoins) {
+        this.distributedJoins = distributedJoins;
+    }
+
+    /**
+     * @return Whether distributed joins are enabled.
+     */
+    public boolean distributedJoins() {
+        return distributedJoins;
+    }
+
+    /**
+     * @return Query collocation model.
+     */
+    public CollocationModel collocationModel() {
+        return collocationModel;
+    }
+
+    /**
+     * @param collocationModel Query collocation model.
+     */
+    public void collocationModel(CollocationModel collocationModel) {
+        this.collocationModel = collocationModel;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index e67294d..399bf25 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -21,12 +21,10 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.AbstractCollection;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -76,8 +74,9 @@ import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture;
 import org.apache.ignite.internal.processors.query.h2.UpdateResult;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
-import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
@@ -95,7 +94,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.apache.ignite.thread.IgniteThread;
 import org.h2.command.Prepared;
@@ -113,8 +111,6 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.isDataPageScanEnabled;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
 
@@ -334,7 +330,18 @@ public class GridMapQueryExecutor {
         if (F.isEmpty(cacheIds))
             return null;
 
-        Collection<Integer> partIds = wrap(explicitParts);
+        Collection<Integer> partIds;
+
+        if (explicitParts == null)
+            partIds = null;
+        else if (explicitParts.length == 0)
+            partIds = Collections.emptyList();
+        else {
+            partIds = new ArrayList<>(explicitParts.length);
+
+            for (int explicitPart : explicitParts)
+                partIds.add(explicitPart);
+        }
 
         for (int i = 0; i < cacheIds.size(); i++) {
             GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds.get(i));
@@ -515,44 +522,6 @@ public class GridMapQueryExecutor {
     }
 
     /**
-     * @param ints Integers.
-     * @return Collection wrapper.
-     */
-    private static Collection<Integer> wrap(final int[] ints) {
-        if (ints == null)
-            return null;
-
-        if (ints.length == 0)
-            return Collections.emptySet();
-
-        return new AbstractCollection<Integer>() {
-            @SuppressWarnings("NullableProblems")
-            @Override public Iterator<Integer> iterator() {
-                return new Iterator<Integer>() {
-                    /** */
-                    private int i = 0;
-
-                    @Override public boolean hasNext() {
-                        return i < ints.length;
-                    }
-
-                    @Override public Integer next() {
-                        return ints[i++];
-                    }
-
-                    @Override public void remove() {
-                        throw new UnsupportedOperationException();
-                    }
-                };
-            }
-
-            @Override public int size() {
-                return ints.length;
-            }
-        };
-    }
-
-    /**
      * @param node Node.
      * @param req Query request.
      */
@@ -563,9 +532,8 @@ public class GridMapQueryExecutor {
 
         final int[] parts = qryParts == null ? partsMap == null ? null : partsMap.get(ctx.localNodeId()) : qryParts;
 
-        final DistributedJoinMode joinMode = distributedJoinMode(
-            req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
-            req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
+        boolean distributedJoins = req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS);
+        boolean local = req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL);
 
         final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
         final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
@@ -660,7 +628,8 @@ public class GridMapQueryExecutor {
                     partsMap,
                     parts,
                     req.pageSize(),
-                    joinMode,
+                    distributedJoins,
+                    local,
                     enforceJoinOrder,
                     false, // Replicated is always false here (see condition above).
                     req.timeout(),
@@ -687,7 +656,8 @@ public class GridMapQueryExecutor {
                                 partsMap,
                                 parts,
                                 req.pageSize(),
-                                joinMode,
+                                distributedJoins,
+                                local,
                                 enforceJoinOrder,
                                 false,
                                 req.timeout(),
@@ -717,7 +687,8 @@ public class GridMapQueryExecutor {
             partsMap,
             parts,
             req.pageSize(),
-            joinMode,
+            distributedJoins,
+            local,
             enforceJoinOrder,
             replicated,
             req.timeout(),
@@ -742,7 +713,8 @@ public class GridMapQueryExecutor {
      * @param partsMap Partitions map for unstable topology.
      * @param parts Explicit partitions for current node.
      * @param pageSize Page size.
-     * @param distributedJoinMode Query distributed join mode.
+     * @param distributeJoins Query distributed join mode.
+     * @param local Lcoal flag.
      * @param lazy Streaming flag.
      * @param mvccSnapshot MVCC snapshot.
      * @param tx Transaction.
@@ -762,7 +734,8 @@ public class GridMapQueryExecutor {
         final Map<UUID, int[]> partsMap,
         final int[] parts,
         final int pageSize,
-        final DistributedJoinMode distributedJoinMode,
+        final boolean distributeJoins,
+        final boolean local,
         final boolean enforceJoinOrder,
         final boolean replicated,
         final int timeout,
@@ -800,7 +773,8 @@ public class GridMapQueryExecutor {
                         partsMap,
                         parts,
                         pageSize,
-                        distributedJoinMode,
+                        distributeJoins,
+                        local,
                         enforceJoinOrder,
                         replicated,
                         timeout,
@@ -872,23 +846,37 @@ public class GridMapQueryExecutor {
                 throw new IllegalStateException();
 
             // Prepare query context.
+            DistributedJoinContext distirbutedJoinCtx = null;
+
+            if (distributeJoins) {
+                distirbutedJoinCtx = new DistributedJoinContext(
+                    local,
+                    topVer,
+                    partsMap,
+                    node.id(),
+                    reqId,
+                    segmentId,
+                    pageSize
+                );
+            }
+
+            GridH2QueryType qryTyp = replicated ? REPLICATED : MAP;
+
             GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(),
                 node.id(),
                 reqId,
                 segmentId,
-                replicated ? REPLICATED : MAP)
-                .filter(h2.backupFilter(topVer, parts))
-                .partitionsMap(partsMap)
-                .distributedJoinMode(distributedJoinMode)
-                .pageSize(pageSize)
-                .topologyVersion(topVer)
+                qryTyp,
+                h2.backupFilter(topVer, parts)
+            )
+                .distributedJoinContext(distirbutedJoinCtx)
                 .reservations(reserved)
                 .mvccSnapshot(mvccSnapshot)
                 .lazyWorker(worker);
 
             Connection conn = h2.connections().connectionForThread().connection(schemaName);
 
-            H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
+            H2Utils.setupConnection(conn, distributeJoins, enforceJoinOrder);
 
             GridH2QueryContext.set(qctx);
 
@@ -897,7 +885,7 @@ public class GridMapQueryExecutor {
 
             try {
                 if (nodeRess.cancelled(reqId)) {
-                    GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
+                    GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qryTyp);
 
                     nodeRess.cancelRequest(reqId);
 
@@ -1091,7 +1079,7 @@ public class GridMapQueryExecutor {
         if (qctx != null) { // No-op if already released.
             GridH2QueryContext.clearThreadLocal();
 
-            if (qctx.distributedJoinMode() == OFF)
+            if (qctx.distributedJoinContext() == null)
                 qctx.clearContext(false);
         }
     }
@@ -1100,7 +1088,7 @@ public class GridMapQueryExecutor {
      * @param node Node.
      * @param req DML request.
      */
-    private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) throws IgniteCheckedException {
+    private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) {
         int[] parts = req.queryPartitions();
 
         List<Integer> cacheIds = req.caches();
@@ -1388,7 +1376,7 @@ public class GridMapQueryExecutor {
 
             GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, segmentId,
             /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1,
-                loc ? null : Collections.<Message>emptyList(),
+                loc ? null : Collections.emptyList(),
                 loc ? Collections.<Value[]>emptyList() : null,
                 false);
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3170eae..8097800 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -108,7 +108,6 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEna
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
-import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.setDataPageScanEnabled;
 
@@ -760,8 +759,16 @@ public class GridReduceQueryExecutor {
 
                         H2Utils.setupConnection(r.connection(), false, enforceJoinOrder);
 
-                        GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
-                            .pageSize(r.pageSize()).distributedJoinMode(OFF));
+                        GridH2QueryContext qctx = new GridH2QueryContext(
+                            locNodeId,
+                            locNodeId,
+                            qryReqId,
+                            0,
+                            REDUCE,
+                            null
+                        );
+
+                        GridH2QueryContext.set(qctx);
 
                         try {
                             if (qry.explain())