You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/04 13:34:46 UTC

[ignite] branch master updated: IGNITE-11185: SQL: moved index distributed join logic to H2TreeIndex. Minor changes to collocation model. This closes #6012.

This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b61c32  IGNITE-11185: SQL: moved index distributed join logic to H2TreeIndex. Minor changes to collocation model. This closes #6012.
7b61c32 is described below

commit 7b61c32a1c5ab443b315d5a87fec43d19b39e6fc
Author: devozerov <vo...@gridgain.com>
AuthorDate: Mon Feb 4 16:32:52 2019 +0300

    IGNITE-11185: SQL: moved index distributed join logic to H2TreeIndex. Minor changes to collocation model. This closes #6012.
---
 .../query/h2/opt/GridH2SpatialIndex.java           |   8 +-
 .../internal/processors/query/h2/H2Utils.java      |  15 +
 .../query/h2/database/H2PkHashIndex.java           |   3 +-
 .../query/h2/database/H2TreeClientIndex.java       |  49 +--
 .../processors/query/h2/database/H2TreeIndex.java  | 423 ++++++++++++++++++--
 .../processors/query/h2/opt/GridH2IndexBase.java   | 443 +--------------------
 .../query/h2/opt/join/BroadcastCursor.java         |   4 +-
 .../query/h2/opt/join/CollocationModel.java        | 219 ++++------
 .../h2/opt/join/CollocationModelAffinity.java      |  32 ++
 .../h2/opt/join/CollocationModelMultiplier.java    |  54 +++
 .../query/h2/opt/join/CollocationModelType.java    |  75 ++++
 .../query/h2/opt/join/DistributedLookupBatch.java  |  17 +-
 .../processors/query/h2/opt/join/RangeSource.java  |   6 +-
 .../processors/query/h2/opt/join/RangeStream.java  |   6 +-
 .../query/h2/twostep/GridMergeIndexSorted.java     |   4 +-
 .../query/h2/twostep/GridReduceQueryExecutor.java  |   3 +-
 16 files changed, 709 insertions(+), 652 deletions(-)

diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index 4af31fb..f2f0b01 100644
--- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.H2Cursor;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
@@ -60,6 +61,9 @@ import org.locationtech.jts.geom.Geometry;
  */
 @SuppressWarnings("unused"/*reflection*/)
 public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex {
+    /** Cache context. */
+    private final GridCacheContext ctx;
+
     /** */
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
@@ -173,7 +177,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
 
             assert key != null;
 
-            final int seg = segmentForRow(row);
+            final int seg = segmentForRow(ctx, row);
 
             Long rowId = keyToId.get(key);
 
@@ -249,7 +253,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
 
             assert oldRow != null;
 
-            final int seg = segmentForRow(row);
+            final int seg = segmentForRow(ctx, row);
 
             if (!segments[seg].remove(getEnvelope(row, rowId), rowId))
                 throw DbException.throwInternalError("row not found");
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index 7078b43..684ecf7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -32,6 +32,7 @@ import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -728,4 +729,18 @@ public class H2Utils {
             return prepared(s);
         }
     }
+
+    /**
+     * @param arr Array.
+     * @param off Offset.
+     * @param cmp Comparator.
+     */
+    public static <Z> void bubbleUp(Z[] arr, int off, Comparator<Z> cmp) {
+        for (int i = off, last = arr.length - 1; i < last; i++) {
+            if (cmp.compare(arr[i], arr[i + 1]) <= 0)
+                break;
+
+            U.swap(arr, i, i + 1);
+        }
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index 3216b94..7fa877e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -70,6 +70,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
      * @param colsList Index columns.
      * @param segments Segments.
      */
+    @SuppressWarnings("ZeroLengthArrayAllocation")
     public H2PkHashIndex(
         GridCacheContext<?, ?> cctx,
         GridH2Table tbl,
@@ -81,7 +82,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
 
         this.segments = segments;
 
-        IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);
+        IndexColumn[] cols = colsList.toArray(new IndexColumn[0]);
 
         IndexColumn.mapColumns(cols, tbl);
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
index 1c1104b..726ba10 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.query.h2.database;
 
 import java.util.List;
+
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -31,25 +33,15 @@ import org.h2.table.IndexColumn;
  * We need indexes on an not affinity nodes. The index shouldn't contains any data.
  */
 public class H2TreeClientIndex extends H2TreeIndexBase {
-
-    /**
-     *
-     */
-    public static final IgniteSQLException SHOULDNT_BE_INVOKED_EXCEPTION = new IgniteSQLException("Shouldn't be invoked, due to it's not affinity node");
-
     /**
      * @param tbl Table.
      * @param name Index name.
      * @param pk Primary key.
      * @param colsList Index columns.
      */
-    public H2TreeClientIndex(
-        GridH2Table tbl,
-        String name,
-        boolean pk,
-        List<IndexColumn> colsList
-    ) {
-        IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);
+    @SuppressWarnings("ZeroLengthArrayAllocation")
+    public H2TreeClientIndex(GridH2Table tbl, String name, boolean pk, List<IndexColumn> colsList) {
+        IndexColumn[] cols = colsList.toArray(new IndexColumn[0]);
 
         IndexColumn.mapColumns(cols, tbl);
 
@@ -59,51 +51,48 @@ public class H2TreeClientIndex extends H2TreeIndexBase {
 
     /** {@inheritDoc} */
     @Override public void refreshColumnIds() {
-        //do nothing.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void destroy(boolean rmvIndex) {
-        //do nothing.
+        // Do nothing.
     }
 
     /** {@inheritDoc} */
     @Override public int segmentsCount() {
-        throw SHOULDNT_BE_INVOKED_EXCEPTION;
+        throw unsupported();
     }
 
     /** {@inheritDoc} */
     @Override public Cursor find(Session ses, SearchRow lower, SearchRow upper) {
-        throw SHOULDNT_BE_INVOKED_EXCEPTION;
+        throw unsupported();
     }
 
     /** {@inheritDoc} */
     @Override public H2CacheRow put(H2CacheRow row) {
-        throw SHOULDNT_BE_INVOKED_EXCEPTION;
+        throw unsupported();
     }
 
     /** {@inheritDoc} */
     @Override public boolean putx(H2CacheRow row) {
-        throw SHOULDNT_BE_INVOKED_EXCEPTION;
+        throw unsupported();
     }
 
     /** {@inheritDoc} */
     @Override public boolean removex(SearchRow row) {
-        throw SHOULDNT_BE_INVOKED_EXCEPTION;
+        throw unsupported();
     }
 
     /** {@inheritDoc} */
     @Override public long getRowCount(Session ses) {
-        throw SHOULDNT_BE_INVOKED_EXCEPTION;
+        throw unsupported();
     }
 
     /** {@inheritDoc} */
-    @Override public Cursor findFirstOrLast(Session session, boolean b) {
-        throw SHOULDNT_BE_INVOKED_EXCEPTION;
+    @Override public Cursor findFirstOrLast(Session session, boolean first) {
+        throw unsupported();
     }
 
-    /** {@inheritDoc} */
-    @Override protected H2Tree treeForRead(int segment) {
-        throw SHOULDNT_BE_INVOKED_EXCEPTION;
+    /**
+     * @return Exception about unsupported operation.
+     */
+    private static IgniteException unsupported() {
+        return new IgniteSQLException("Shouldn't be invoked on non-affinity node.");
     }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index d52bec4..2744923 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -18,12 +18,21 @@
 package org.apache.ignite.internal.processors.query.h2.database;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
@@ -33,26 +42,60 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.h2.H2Cursor;
 import org.apache.ignite.internal.processors.query.h2.H2RowCache;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
 import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch;
+import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource;
+import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream;
+import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
 import org.apache.ignite.internal.stat.IoStatisticsHolder;
 import org.apache.ignite.internal.stat.IoStatisticsType;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteTree;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.CIX2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.h2.engine.Session;
 import org.h2.index.Cursor;
+import org.h2.index.IndexCondition;
+import org.h2.index.IndexLookupBatch;
 import org.h2.index.IndexType;
 import org.h2.index.SingleRowCursor;
 import org.h2.message.DbException;
 import org.h2.result.SearchRow;
 import org.h2.table.IndexColumn;
+import org.h2.table.TableFilter;
+import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
+import javax.cache.CacheException;
+
+import static java.util.Collections.singletonList;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
+import static org.h2.result.Row.MEMORY_CALCULATE;
+
 /**
  * H2 Index over {@link BPlusTree}.
  */
@@ -67,6 +110,9 @@ public class H2TreeIndex extends H2TreeIndexBase {
     /** */
     private final List<InlineIndexHelper> inlineIdxs;
 
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
     /** Cache context. */
     private final GridCacheContext<?, ?> cctx;
 
@@ -88,10 +134,23 @@ public class H2TreeIndex extends H2TreeIndexBase {
     /** */
     private final IgniteLogger log;
 
+    /** */
+    private final Object msgTopic;
+
+    /** */
+    private final GridMessageListener msgLsnr;
+
+    /** */
+    private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() {
+        @Override public void applyx(ClusterNode locNode, Message msg) {
+            onMessage0(locNode.id(), msg);
+        }
+    };
+
     /**
      * @param cctx Cache context.
      * @param rowCache Row cache.
-     * @param tbl Table.
+     * @param table Table.
      * @param idxName Index name.
      * @param pk Primary key.
      * @param affinityKey {@code true} for affinity key.
@@ -101,10 +160,11 @@ public class H2TreeIndex extends H2TreeIndexBase {
      * @param segmentsCnt Count of tree segments.
      * @throws IgniteCheckedException If failed.
      */
+    @SuppressWarnings("MapReplaceableByEnumMap")
     public H2TreeIndex(
         GridCacheContext<?, ?> cctx,
         @Nullable H2RowCache rowCache,
-        GridH2Table tbl,
+        GridH2Table table,
         String idxName,
         boolean pk,
         boolean affinityKey,
@@ -115,23 +175,25 @@ public class H2TreeIndex extends H2TreeIndexBase {
     ) throws IgniteCheckedException {
         assert segmentsCnt > 0 : segmentsCnt;
 
-        this.cctx = cctx;
+        ctx = cctx.kernalContext();
+        log = ctx.log(getClass());
 
-        this.log = cctx.logger(getClass().getName());
+        this.cctx = cctx;
 
         this.pk = pk;
         this.affinityKey = affinityKey;
 
-        this.tblName = tbl.getName();
+        tblName = table.getName();
+
         this.idxName = idxName;
 
-        this.table = tbl;
+        this.table = table;
 
-        GridQueryTypeDescriptor typeDesc = tbl.rowDescriptor().type();
+        GridQueryTypeDescriptor typeDesc = table.rowDescriptor().type();
 
         int typeId = cctx.binaryMarshaller() ? typeDesc.typeId() : typeDesc.valueClass().hashCode();
 
-        treeName = BPlusTree.treeName((tbl.rowDescriptor() == null ? "" : typeId + "_") + idxName, "H2Tree");
+        treeName = BPlusTree.treeName((table.rowDescriptor() == null ? "" : typeId + "_") + idxName, "H2Tree");
 
         IndexColumnsInfo unwrappedColsInfo = new IndexColumnsInfo(unwrappedColsList, inlineSize);
 
@@ -161,11 +223,11 @@ public class H2TreeIndex extends H2TreeIndexBase {
 
                 segments[i] = new H2Tree(
                     cctx,
-                    tbl,
+                    table,
                     treeName,
                     idxName,
                     tblName,
-                    tbl.cacheName(),
+                    table.cacheName(),
                     cctx.offheap().reuseListForIndex(treeName),
                     cctx.groupId(),
                     cctx.dataRegion().pageMemory(),
@@ -198,12 +260,31 @@ public class H2TreeIndex extends H2TreeIndexBase {
 
         inlineIdxs = colsInfo.inlineIdx();
 
-        IndexColumn.mapColumns(cols, tbl);
+        IndexColumn.mapColumns(cols, table);
 
-        initBaseIndex(tbl, 0, idxName, cols,
+        initBaseIndex(table, 0, idxName, cols,
             pk ? IndexType.createPrimaryKey(false, false) : IndexType.createNonUnique(false, false, false));
 
-        initDistributedJoinMessaging(tbl);
+        // Initialize distributed joins.
+        msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, table.identifierString() + '.' + getName());
+
+        msgLsnr = new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+                GridSpinBusyLock l = table.rowDescriptor().indexing().busyLock();
+
+                if (!l.enterBusy())
+                    return;
+
+                try {
+                    onMessage0(nodeId, msg);
+                }
+                finally {
+                    l.leaveBusy();
+                }
+            }
+        };
+
+        ctx.io().addMessageListener(msgTopic, msgLsnr);
     }
 
     /**
@@ -303,7 +384,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
         try {
             InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
 
-            int seg = segmentForRow(row);
+            int seg = segmentForRow(cctx, row);
 
             H2Tree tree = treeForRead(seg);
 
@@ -324,7 +405,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
         try {
             InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
 
-            int seg = segmentForRow(row);
+            int seg = segmentForRow(cctx, row);
 
             H2Tree tree = treeForRead(seg);
 
@@ -347,7 +428,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
         try {
             InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
 
-            int seg = segmentForRow(row);
+            int seg = segmentForRow(cctx, row);
 
             H2Tree tree = treeForRead(seg);
 
@@ -411,17 +492,24 @@ public class H2TreeIndex extends H2TreeIndexBase {
             throw new IgniteException(e);
         }
         finally {
-            super.destroy(rmvIdx);
+            if (msgLsnr != null)
+                ctx.io().removeMessageListener(msgTopic, msgLsnr);
         }
     }
 
-    /** {@inheritDoc} */
-    @Override protected H2Tree treeForRead(int segment) {
+    /**
+     * @param segment Segment Id.
+     * @return Snapshot for requested segment if there is one.
+     */
+    private H2Tree treeForRead(int segment) {
         return segments[segment];
     }
 
-    /** {@inheritDoc} */
-    @Override protected BPlusTree.TreeRowClosure<H2Row, H2Row> filter(GridH2QueryContext qctx) {
+    /**
+     * @param qctx Query context.
+     * @return Row filter.
+     */
+    private BPlusTree.TreeRowClosure<H2Row, H2Row> filter(GridH2QueryContext qctx) {
         if (qctx == null) {
             assert !cctx.mvccEnabled();
 
@@ -510,14 +598,299 @@ public class H2TreeIndex extends H2TreeIndexBase {
             inlineIdxs.set(pos, inlineHelpers.get(pos));
     }
 
+    /** {@inheritDoc} */
+    @Override public IndexLookupBatch createLookupBatch(TableFilter[] filters, int filter) {
+        GridH2QueryContext qctx = GridH2QueryContext.get();
+
+        if (qctx == null || qctx.distributedJoinContext() == null || !getTable().isPartitioned())
+            return null;
+
+        IndexColumn affCol = getTable().getAffinityKeyColumn();
+        GridH2RowDescriptor desc = getTable().rowDescriptor();
+
+        int affColId = -1;
+        boolean ucast = false;
+
+        if (affCol != null) {
+            affColId = affCol.column.getColumnId();
+            int[] masks = filters[filter].getMasks();
+
+            if (masks != null) {
+                ucast = (masks[affColId] & IndexCondition.EQUALITY) != 0 ||
+                    desc.checkKeyIndexCondition(masks, IndexCondition.EQUALITY);
+            }
+        }
+
+        GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
+
+        return new DistributedLookupBatch(this, cctx, ucast, affColId);
+    }
+
+    /**
+     * @param nodes Nodes.
+     * @param msg Message.
+     */
+    public void send(Collection<ClusterNode> nodes, Message msg) {
+        boolean res = getTable().rowDescriptor().indexing().send(msgTopic,
+            -1,
+            nodes,
+            msg,
+            null,
+            locNodeHnd,
+            GridIoPolicy.IDX_POOL,
+            false
+        );
+
+        if (!res)
+            throw H2Utils.retryException("Failed to send message to nodes: " + nodes);
+    }
+
+    /**
+     * @param nodeId Source node ID.
+     * @param msg Message.
+     */
+    private void onMessage0(UUID nodeId, Object msg) {
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null)
+            return;
+
+        try {
+            if (msg instanceof GridH2IndexRangeRequest)
+                onIndexRangeRequest(node, (GridH2IndexRangeRequest)msg);
+            else if (msg instanceof GridH2IndexRangeResponse)
+                onIndexRangeResponse(node, (GridH2IndexRangeResponse)msg);
+        }
+        catch (Throwable th) {
+            U.error(log, "Failed to handle message[nodeId=" + nodeId + ", msg=" + msg + "]", th);
+
+            if (th instanceof Error)
+                throw th;
+        }
+    }
+
+    /**
+     * @param node Requesting node.
+     * @param msg Request message.
+     */
+    private void onIndexRangeRequest(final ClusterNode node, final GridH2IndexRangeRequest msg) {
+        GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
+
+        res.originNodeId(msg.originNodeId());
+        res.queryId(msg.queryId());
+        res.originSegmentId(msg.originSegmentId());
+        res.segment(msg.segment());
+        res.batchLookupId(msg.batchLookupId());
+
+        GridH2QueryContext qctx = GridH2QueryContext.get(
+            ctx.localNodeId(),
+            msg.originNodeId(),
+            msg.queryId(),
+            msg.originSegmentId(),
+            MAP
+        );
+
+        if (qctx == null)
+            res.status(STATUS_NOT_FOUND);
+        else {
+            DistributedJoinContext joinCtx = qctx.distributedJoinContext();
+
+            assert joinCtx != null;
+
+            try {
+                RangeSource src;
+
+                if (msg.bounds() != null) {
+                    // This is the first request containing all the search rows.
+                    assert !msg.bounds().isEmpty() : "empty bounds";
+
+                    src = new RangeSource(this, msg.bounds(), msg.segment(), filter(qctx));
+                }
+                else {
+                    // This is request to fetch next portion of data.
+                    src = joinCtx.getSource(node.id(), msg.segment(), msg.batchLookupId());
+
+                    assert src != null;
+                }
+
+                List<GridH2RowRange> ranges = new ArrayList<>();
+
+                int maxRows = joinCtx.pageSize();
+
+                assert maxRows > 0 : maxRows;
+
+                while (maxRows > 0) {
+                    GridH2RowRange range = src.next(maxRows);
+
+                    if (range == null)
+                        break;
+
+                    ranges.add(range);
+
+                    if (range.rows() != null)
+                        maxRows -= range.rows().size();
+                }
+
+                assert !ranges.isEmpty();
+
+                if (src.hasMoreRows()) {
+                    // Save source for future fetches.
+                    if (msg.bounds() != null)
+                        joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
+                }
+                else if (msg.bounds() == null) {
+                    // Drop saved source.
+                    joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
+                }
+
+                res.ranges(ranges);
+                res.status(STATUS_OK);
+            }
+            catch (Throwable th) {
+                U.error(log, "Failed to process request: " + msg, th);
+
+                res.error(th.getClass() + ": " + th.getMessage());
+                res.status(STATUS_ERROR);
+            }
+        }
+
+        send(singletonList(node), res);
+    }
+
     /**
+     * @param node Responded node.
+     * @param msg Response message.
+     */
+    private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) {
+        GridH2QueryContext qctx = GridH2QueryContext.get(
+            ctx.localNodeId(),
+            msg.originNodeId(),
+            msg.queryId(),
+            msg.originSegmentId(),
+            MAP
+        );
+
+        if (qctx == null)
+            return;
+
+        DistributedJoinContext joinCtx = qctx.distributedJoinContext();
+
+        assert joinCtx != null;
+
+        Map<SegmentKey, RangeStream> streams = joinCtx.getStreams(msg.batchLookupId());
+
+        if (streams == null)
+            return;
+
+        RangeStream stream = streams.get(new SegmentKey(node, msg.segment()));
+
+        assert stream != null;
+
+        stream.onResponse(msg);
+    }
+
+    /**
+     * Find rows for the segments (distributed joins).
      *
+     * @param bounds Bounds.
+     * @param segment Segment.
+     * @param filter Filter.
+     * @return Iterator.
      */
+    @SuppressWarnings("unchecked")
+    public Iterator<H2Row> findForSegment(GridH2RowRangeBounds bounds, int segment,
+        BPlusTree.TreeRowClosure<H2Row, H2Row> filter) {
+        SearchRow first = toSearchRow(bounds.first());
+        SearchRow last = toSearchRow(bounds.last());
+
+        IgniteTree t = treeForRead(segment);
+
+        try {
+            GridCursor<H2Row> range = ((BPlusTree)t).find(first, last, filter, null);
+
+            if (range == null)
+                range = H2Utils.EMPTY_CURSOR;
+
+            H2Cursor cur = new H2Cursor(range);
+
+            return new CursorIteratorWrapper(cur);
+        }
+        catch (IgniteCheckedException e) {
+            throw DbException.convert(e);
+        }
+    }
+
+    /**
+     * @param msg Row message.
+     * @return Search row.
+     */
+    private SearchRow toSearchRow(GridH2RowMessage msg) {
+        if (msg == null)
+            return null;
+
+        Value[] vals = new Value[getTable().getColumns().length];
+
+        assert vals.length > 0;
+
+        List<GridH2ValueMessage> msgVals = msg.values();
+
+        for (int i = 0; i < indexColumns.length; i++) {
+            if (i >= msgVals.size())
+                continue;
+
+            try {
+                vals[indexColumns[i].column.getColumnId()] = msgVals.get(i).value(ctx);
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        return database.createRow(vals, MEMORY_CALCULATE);
+    }
+
+    /**
+     * @param row Search row.
+     * @return Row message.
+     */
+    public GridH2RowMessage toSearchRowMessage(SearchRow row) {
+        if (row == null)
+            return null;
+
+        List<GridH2ValueMessage> vals = new ArrayList<>(indexColumns.length);
+
+        for (IndexColumn idxCol : indexColumns) {
+            Value val = row.getValue(idxCol.column.getColumnId());
+
+            if (val == null)
+                break;
+
+            try {
+                vals.add(GridH2ValueMessageFactory.toMessage(val));
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        GridH2RowMessage res = new GridH2RowMessage();
+
+        res.values(vals);
+
+        return res;
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings({"PublicInnerClass", "AssignmentOrReturnOfFieldWithMutableType"})
     public class IndexColumnsInfo {
         /** */
         private final int inlineSize;
+
         /** */
         private final IndexColumn[] cols;
+
         /** */
         private final List<InlineIndexHelper> inlineIdx;
 
@@ -525,12 +898,12 @@ public class H2TreeIndex extends H2TreeIndexBase {
          * @param colsList Index columns list
          * @param cfgInlineSize Inline size from cache config.
          */
+        @SuppressWarnings("ZeroLengthArrayAllocation")
         public IndexColumnsInfo(List<IndexColumn> colsList, int cfgInlineSize) {
-            this.cols = colsList.toArray(new IndexColumn[0]);
-
-            this.inlineIdx = getAvailableInlineColumns(cols);
+            cols = colsList.toArray(new IndexColumn[0]);
 
-            this.inlineSize = computeInlineSize(inlineIdx, cfgInlineSize);
+            inlineIdx = getAvailableInlineColumns(cols);
+            inlineSize = computeInlineSize(inlineIdx, cfgInlineSize);
         }
 
         /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index c53d0ec..4be96cd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -17,135 +17,23 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.GridTopic;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.h2.H2Cursor;
-import org.apache.ignite.internal.processors.query.h2.H2Utils;
-import org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper;
-import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
-import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch;
+import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModelMultiplier;
 import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel;
-import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource;
-import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream;
-import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey;
-import org.apache.ignite.internal.processors.query.h2.sql.SplitterContext;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.IgniteTree;
-import org.apache.ignite.internal.util.lang.GridCursor;
-import org.apache.ignite.internal.util.typedef.CIX2;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.logger.NullLogger;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
-import org.h2.index.IndexCondition;
-import org.h2.index.IndexLookupBatch;
 import org.h2.message.DbException;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
-import org.h2.table.IndexColumn;
 import org.h2.table.TableFilter;
 import org.h2.value.Value;
 
-import javax.cache.CacheException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import static java.util.Collections.singletonList;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
-import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
-import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
-import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
-import static org.h2.result.Row.MEMORY_CALCULATE;
-
 /**
  * Index base.
  */
 public abstract class GridH2IndexBase extends BaseIndex {
-    /** */
-    public static final Object EXPLICIT_NULL = new Object();
-
-    /** */
-    private Object msgTopic;
-
-    /** */
-    private GridMessageListener msgLsnr;
-
-    /** */
-    private IgniteLogger log;
-
-    /** */
-    private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() {
-        @Override public void applyx(ClusterNode clusterNode, Message msg) {
-            onMessage0(clusterNode.id(), msg);
-        }
-    };
-
-    protected GridCacheContext<?, ?> ctx;
-
-    /**
-     * @param tbl Table.
-     */
-    @SuppressWarnings("MapReplaceableByEnumMap")
-    protected final void initDistributedJoinMessaging(GridH2Table tbl) {
-        final GridH2RowDescriptor desc = tbl.rowDescriptor();
-
-        if (desc != null && desc.context() != null) {
-            ctx = desc.context();
-
-            GridKernalContext ctx = desc.context().kernalContext();
-
-            log = ctx.log(getClass());
-
-            msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifierString() + '.' + getName());
-
-            msgLsnr = new GridMessageListener() {
-                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
-                    GridSpinBusyLock l = desc.indexing().busyLock();
-
-                    if (!l.enterBusy())
-                        return;
-
-                    try {
-                        onMessage0(nodeId, msg);
-                    }
-                    finally {
-                        l.leaveBusy();
-                    }
-                }
-            };
-
-            ctx.io().addMessageListener(msgTopic, msgLsnr);
-        }
-        else {
-            msgTopic = null;
-            msgLsnr = null;
-            log = new NullLogger();
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public final void close(Session ses) {
         // No-op. Actual index destruction must happen in method destroy.
@@ -159,8 +47,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param rmv Flag remove.
      */
     public void destroy(boolean rmv) {
-        if (msgLsnr != null)
-            kernalContext().io().removeMessageListener(msgTopic, msgLsnr);
+        // No-op.
     }
 
     /**
@@ -209,18 +96,9 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @return Multiplier.
      */
     public final int getDistributedMultiplier(Session ses, TableFilter[] filters, int filter) {
-        // We do optimizations with respect to distributed joins only on PREPARE stage only.
-        // Notice that we check for isJoinBatchEnabled, because we can do multiple different
-        // optimization passes on PREPARE stage.
-        // Query expressions can not be distributed as well.
-        SplitterContext ctx = SplitterContext.get();
-
-        if (!ctx.distributedJoins() || !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression())
-            return CollocationModel.MULTIPLIER_COLLOCATED;
+        CollocationModelMultiplier mul = CollocationModel.distributedMultiplier(ses, filters, filter);
 
-        assert filters != null;
-
-        return CollocationModel.distributedMultiplier(ctx, ses, filters, filter);
+        return mul.multiplier();
     }
 
     /** {@inheritDoc} */
@@ -264,283 +142,15 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public IndexLookupBatch createLookupBatch(TableFilter[] filters, int filter) {
-        GridH2QueryContext qctx = GridH2QueryContext.get();
-
-        if (qctx == null || qctx.distributedJoinContext() == null || !getTable().isPartitioned())
-            return null;
-
-        IndexColumn affCol = getTable().getAffinityKeyColumn();
-        GridH2RowDescriptor desc = getTable().rowDescriptor();
-
-        int affColId = -1;
-        boolean ucast = false;
-
-        if (affCol != null) {
-            affColId = affCol.column.getColumnId();
-            int[] masks = filters[filter].getMasks();
-
-            if (masks != null) {
-                ucast = (masks[affColId] & IndexCondition.EQUALITY) != 0 ||
-                        desc.checkKeyIndexCondition(masks, IndexCondition.EQUALITY);
-            }
-        }
-
-        GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
-
-        return new DistributedLookupBatch(this, cctx, ucast, affColId);
-    }
-
-    /** {@inheritDoc} */
     @Override public void removeChildrenAndResources(Session session) {
         // The sole purpose of this override is to pass session to table.removeIndex
         assert table instanceof GridH2Table;
 
         ((GridH2Table)table).removeIndex(session, this);
-        remove(session);
-        database.removeMeta(session, getId());
-    }
-
-    /**
-     * @param nodes Nodes.
-     * @param msg Message.
-     */
-    public void send(Collection<ClusterNode> nodes, Message msg) {
-        if (!getTable().rowDescriptor().indexing().send(msgTopic,
-            -1,
-            nodes,
-            msg,
-            null,
-            locNodeHnd,
-            GridIoPolicy.IDX_POOL,
-            false))
-            throw H2Utils.retryException("Failed to send message to nodes: " + nodes);
-    }
-
-    /**
-     * @param nodeId Source node ID.
-     * @param msg Message.
-     */
-    private void onMessage0(UUID nodeId, Object msg) {
-        ClusterNode node = kernalContext().discovery().node(nodeId);
-
-        if (node == null)
-            return;
-
-        try {
-            if (msg instanceof GridH2IndexRangeRequest)
-                onIndexRangeRequest(node, (GridH2IndexRangeRequest)msg);
-            else if (msg instanceof GridH2IndexRangeResponse)
-                onIndexRangeResponse(node, (GridH2IndexRangeResponse)msg);
-        }
-        catch (Throwable th) {
-            U.error(log, "Failed to handle message[nodeId=" + nodeId + ", msg=" + msg + "]", th);
-
-            if (th instanceof Error)
-                throw th;
-        }
-    }
-
-    /**
-     * @return Kernal context.
-     */
-    private GridKernalContext kernalContext() {
-        return getTable().rowDescriptor().context().kernalContext();
-    }
-
-    /**
-     * @param node Requesting node.
-     * @param msg Request message.
-     */
-    private void onIndexRangeRequest(final ClusterNode node, final GridH2IndexRangeRequest msg) {
-        GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
-
-        res.originNodeId(msg.originNodeId());
-        res.queryId(msg.queryId());
-        res.originSegmentId(msg.originSegmentId());
-        res.segment(msg.segment());
-        res.batchLookupId(msg.batchLookupId());
-
-        GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(), msg.originNodeId(),
-            msg.queryId(), msg.originSegmentId(), MAP);
-
-        if (qctx == null)
-            res.status(STATUS_NOT_FOUND);
-        else {
-            DistributedJoinContext joinCtx = qctx.distributedJoinContext();
-
-            assert joinCtx != null;
-
-            try {
-                RangeSource src;
-
-                if (msg.bounds() != null) {
-                    // This is the first request containing all the search rows.
-                    assert !msg.bounds().isEmpty() : "empty bounds";
-
-                    src = new RangeSource(this, msg.bounds(), msg.segment(), filter(qctx));
-                }
-                else {
-                    // This is request to fetch next portion of data.
-                    src = joinCtx.getSource(node.id(), msg.segment(), msg.batchLookupId());
-
-                    assert src != null;
-                }
-
-                List<GridH2RowRange> ranges = new ArrayList<>();
-
-                int maxRows = joinCtx.pageSize();
-
-                assert maxRows > 0 : maxRows;
-
-                while (maxRows > 0) {
-                    GridH2RowRange range = src.next(maxRows);
-
-                    if (range == null)
-                        break;
-
-                    ranges.add(range);
-
-                    if (range.rows() != null)
-                        maxRows -= range.rows().size();
-                }
-
-                assert !ranges.isEmpty();
-
-                if (src.hasMoreRows()) {
-                    // Save source for future fetches.
-                    if (msg.bounds() != null)
-                        joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
-                }
-                else if (msg.bounds() == null) {
-                    // Drop saved source.
-                    joinCtx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
-                }
 
-                res.ranges(ranges);
-                res.status(STATUS_OK);
-            }
-            catch (Throwable th) {
-                U.error(log, "Failed to process request: " + msg, th);
-
-                res.error(th.getClass() + ": " + th.getMessage());
-                res.status(STATUS_ERROR);
-            }
-        }
-
-        send(singletonList(node), res);
-    }
-
-    /**
-     * @param qctx Query context.
-     * @return Row filter.
-     */
-    protected BPlusTree.TreeRowClosure<H2Row, H2Row> filter(GridH2QueryContext qctx) {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @param node Responded node.
-     * @param msg Response message.
-     */
-    private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) {
-        GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
-            msg.originNodeId(), msg.queryId(), msg.originSegmentId(), MAP);
-
-        if (qctx == null)
-            return;
-
-        DistributedJoinContext joinCtx = qctx.distributedJoinContext();
-
-        assert joinCtx != null;
-
-        Map<SegmentKey, RangeStream> streams = joinCtx.getStreams(msg.batchLookupId());
-
-        if (streams == null)
-            return;
-
-        RangeStream stream = streams.get(new SegmentKey(node, msg.segment()));
-
-        assert stream != null;
-
-        stream.onResponse(msg);
-    }
-
-    /**
-     * @param msg Row message.
-     * @return Search row.
-     */
-    private SearchRow toSearchRow(GridH2RowMessage msg) {
-        if (msg == null)
-            return null;
-
-        GridKernalContext ctx = kernalContext();
-
-        Value[] vals = new Value[getTable().getColumns().length];
-
-        assert vals.length > 0;
-
-        List<GridH2ValueMessage> msgVals = msg.values();
-
-        for (int i = 0; i < indexColumns.length; i++) {
-            if (i >= msgVals.size())
-                continue;
-
-            try {
-                vals[indexColumns[i].column.getColumnId()] = msgVals.get(i).value(ctx);
-            }
-            catch (IgniteCheckedException e) {
-                throw new CacheException(e);
-            }
-        }
-
-        return database.createRow(vals, MEMORY_CALCULATE);
-    }
-
-    /**
-     * @param row Search row.
-     * @return Row message.
-     */
-    public GridH2RowMessage toSearchRowMessage(SearchRow row) {
-        if (row == null)
-            return null;
-
-        List<GridH2ValueMessage> vals = new ArrayList<>(indexColumns.length);
-
-        for (IndexColumn idxCol : indexColumns) {
-            Value val = row.getValue(idxCol.column.getColumnId());
-
-            if (val == null)
-                break;
-
-            try {
-                vals.add(GridH2ValueMessageFactory.toMessage(val));
-            }
-            catch (IgniteCheckedException e) {
-                throw new CacheException(e);
-            }
-        }
-
-        GridH2RowMessage res = new GridH2RowMessage();
-
-        res.values(vals);
-
-        return res;
-    }
+        remove(session);
 
-    /**
-     * @param arr Array.
-     * @param off Offset.
-     * @param cmp Comparator.
-     */
-    public static <Z> void bubbleUp(Z[] arr, int off, Comparator<Z> cmp) {
-        // TODO Optimize: use binary search if the range in array is big.
-        for (int i = off, last = arr.length - 1; i < last; i++) {
-            if (cmp.compare(arr[i], arr[i + 1]) <= 0)
-                break;
-
-            U.swap(arr, i, i + 1);
-        }
+        database.removeMeta(session, getId());
     }
 
     /**
@@ -561,7 +171,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @return Segment ID for given row.
      */
     @SuppressWarnings("IfMayBeConditional")
-    protected int segmentForRow(SearchRow row) {
+    protected int segmentForRow(GridCacheContext ctx, SearchRow row) {
         assert row != null;
 
         if (segmentsCount() == 1 || ctx == null)
@@ -584,45 +194,6 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * Find rows for the segments (distributed joins).
-     *
-     * @param bounds Bounds.
-     * @param segment Segment.
-     * @param filter Filter.
-     * @return Iterator.
-     */
-    @SuppressWarnings("unchecked")
-    public Iterator<H2Row> findForSegment(GridH2RowRangeBounds bounds, int segment,
-        BPlusTree.TreeRowClosure<H2Row, H2Row> filter) {
-        SearchRow first = toSearchRow(bounds.first());
-        SearchRow last = toSearchRow(bounds.last());
-
-        IgniteTree t = treeForRead(segment);
-
-        try {
-            GridCursor<H2Row> range = ((BPlusTree)t).find(first, last, filter, null);
-
-            if (range == null)
-                range = H2Utils.EMPTY_CURSOR;
-
-            H2Cursor cur = new H2Cursor(range);
-
-            return new CursorIteratorWrapper(cur);
-        }
-        catch (IgniteCheckedException e) {
-            throw DbException.convert(e);
-        }
-    }
-
-    /**
-     * @param segment Segment Id.
-     * @return Snapshot for requested segment if there is one.
-     */
-    protected <K, V> IgniteTree<K, V> treeForRead(int segment) {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
      * Re-assign column ids after removal of column(s).
      */
     public void refreshColumnIds() {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java
index 632d72a..86051e4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt.join;
 
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.h2.index.Cursor;
 import org.h2.result.Row;
@@ -30,6 +31,7 @@ import java.util.Map;
 /**
  * Merge cursor from multiple nodes.
  */
+@SuppressWarnings("ComparatorNotSerializable")
 public class BroadcastCursor implements Cursor, Comparator<RangeStream> {
     /** Index. */
     private final GridH2IndexBase idx;
@@ -122,7 +124,7 @@ public class BroadcastCursor implements Cursor, Comparator<RangeStream> {
         }
 
         // Bubble up current min stream with respect to fetched row to achieve correct sort order of streams.
-        GridH2IndexBase.bubbleUp(streams, off, this);
+        H2Utils.bubbleUp(streams, off, this);
 
         return true;
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
index e5eea1d..6bbd968 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java
@@ -48,17 +48,8 @@ import org.h2.table.TableView;
  * Collocation model for a query.
  */
 public final class CollocationModel {
-    /** */
-    public static final int MULTIPLIER_COLLOCATED = 1;
-
-    /** */
-    private static final int MULTIPLIER_UNICAST = 50;
-
-    /** */
-    private static final int MULTIPLIER_BROADCAST = 200;
-
-    /** */
-    private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000;
+    /** Empty filter array. */
+    private static final TableFilter[] EMPTY_FILTERS = new TableFilter[0];
 
     /** */
     private final CollocationModel upper;
@@ -70,10 +61,10 @@ public final class CollocationModel {
     private final boolean view;
 
     /** */
-    private int multiplier;
+    private CollocationModelMultiplier multiplier;
 
     /** */
-    private Type type;
+    private CollocationModelType type;
 
     /** */
     private CollocationModel[] children;
@@ -236,7 +227,7 @@ public final class CollocationModel {
 
         // Reset results.
         type = null;
-        multiplier = 0;
+        multiplier = null;
 
         return true;
     }
@@ -254,14 +245,14 @@ public final class CollocationModel {
 
             boolean collocated = true;
             boolean partitioned = false;
-            int maxMultiplier = MULTIPLIER_COLLOCATED;
+            CollocationModelMultiplier maxMultiplier = CollocationModelMultiplier.COLLOCATED;
 
             for (int i = 0; i < childFilters.length; i++) {
                 CollocationModel child = child(i, true);
 
-                Type t = child.type(true);
+                CollocationModelType t = child.type(true);
 
-                if (child.multiplier == MULTIPLIER_REPLICATED_NOT_LAST)
+                if (child.multiplier == CollocationModelMultiplier.REPLICATED_NOT_LAST)
                     maxMultiplier = child.multiplier;
 
                 if (t.isPartitioned()) {
@@ -270,19 +261,19 @@ public final class CollocationModel {
                     if (!t.isCollocated()) {
                         collocated = false;
 
-                        int m = child.multiplier(true);
+                        CollocationModelMultiplier m = child.multiplier(true);
 
-                        if (m > maxMultiplier) {
+                        if (m.multiplier() > maxMultiplier.multiplier()) {
                             maxMultiplier = m;
 
-                            if (maxMultiplier == MULTIPLIER_REPLICATED_NOT_LAST)
+                            if (maxMultiplier == CollocationModelMultiplier.REPLICATED_NOT_LAST)
                                 break;
                         }
                     }
                 }
             }
 
-            type = Type.of(partitioned, collocated);
+            type = CollocationModelType.of(partitioned, collocated);
             multiplier = maxMultiplier;
         }
         else {
@@ -294,8 +285,8 @@ public final class CollocationModel {
 
             // Only partitioned tables will do distributed joins.
             if (!(tbl instanceof GridH2Table) || !((GridH2Table)tbl).isPartitioned()) {
-                type = Type.REPLICATED;
-                multiplier = MULTIPLIER_COLLOCATED;
+                type = CollocationModelType.REPLICATED;
+                multiplier = CollocationModelMultiplier.COLLOCATED;
 
                 return;
             }
@@ -303,29 +294,29 @@ public final class CollocationModel {
             // If we are the first partitioned table in a join, then we are "base" for all the rest partitioned tables
             // which will need to get remote result (if there is no affinity condition). Since this query is broadcasted
             // to all the affinity nodes the "base" does not need to get remote results.
-            if (!upper.findPartitionedTableBefore(filter)) {
-                type = Type.PARTITIONED_COLLOCATED;
-                multiplier = MULTIPLIER_COLLOCATED;
+            if (!upper.isPartitionedTableBeforeExists(filter)) {
+                type = CollocationModelType.PARTITIONED_COLLOCATED;
+                multiplier = CollocationModelMultiplier.COLLOCATED;
             }
             else {
                 // It is enough to make sure that our previous join by affinity key is collocated, then we are
                 // collocated. If we at least have affinity key condition, then we do unicast which is cheaper.
                 switch (upper.joinedWithCollocated(filter)) {
                     case COLLOCATED_JOIN:
-                        type = Type.PARTITIONED_COLLOCATED;
-                        multiplier = MULTIPLIER_COLLOCATED;
+                        type = CollocationModelType.PARTITIONED_COLLOCATED;
+                        multiplier = CollocationModelMultiplier.COLLOCATED;
 
                         break;
 
                     case HAS_AFFINITY_CONDITION:
-                        type = Type.PARTITIONED_NOT_COLLOCATED;
-                        multiplier = MULTIPLIER_UNICAST;
+                        type = CollocationModelType.PARTITIONED_NOT_COLLOCATED;
+                        multiplier = CollocationModelMultiplier.UNICAST;
 
                         break;
 
                     case NONE:
-                        type = Type.PARTITIONED_NOT_COLLOCATED;
-                        multiplier = MULTIPLIER_BROADCAST;
+                        type = CollocationModelType.PARTITIONED_NOT_COLLOCATED;
+                        multiplier = CollocationModelMultiplier.BROADCAST;
 
                         break;
 
@@ -334,18 +325,20 @@ public final class CollocationModel {
                 }
             }
 
-            if (upper.previousReplicated(filter))
-                multiplier = MULTIPLIER_REPLICATED_NOT_LAST;
+            if (upper.isPreviousTableReplicated(filter))
+                multiplier = CollocationModelMultiplier.REPLICATED_NOT_LAST;
         }
     }
 
     /**
-     * @param f Current filter.
-     * @return {@code true} If partitioned table was found.
+     * Check whether at least one PARTITIONED table is located before current table.
+     *
+     * @param filterIdx Current filter index.
+     * @return {@code true} If PARTITIONED table exists.
      */
-    private boolean findPartitionedTableBefore(int f) {
-        for (int i = 0; i < f; i++) {
-            CollocationModel child = child(i, true);
+    private boolean isPartitionedTableBeforeExists(int filterIdx) {
+        for (int idx = 0; idx < filterIdx; idx++) {
+            CollocationModel child = child(idx, true);
 
             // The c can be null if it is not a GridH2Table and not a sub-query,
             // it is a some kind of function table or anything else that considered replicated.
@@ -354,19 +347,26 @@ public final class CollocationModel {
         }
 
         // We have to search globally in upper queries as well.
-        return upper != null && upper.findPartitionedTableBefore(filter);
+        return upper != null && upper.isPartitionedTableBeforeExists(filter);
     }
 
     /**
-     * @param f Current filter.
+     * Check if previous table in the sequence is REPLICATED.
+     *
+     * @param filterIdx Current filter index.
      * @return {@code true} If previous table is REPLICATED.
      */
-    @SuppressWarnings("SimplifiableIfStatement")
-    private boolean previousReplicated(int f) {
-        if (f > 0 && child(f - 1, true).type(true) == Type.REPLICATED)
+    private boolean isPreviousTableReplicated(int filterIdx) {
+        // We are at the first table, nothing exists before it
+        if (filterIdx == 0)
+            return false;
+
+        CollocationModel child = child(filterIdx - 1, true);
+
+        if (child != null && child.type(true) == CollocationModelType.REPLICATED)
             return true;
 
-        return upper != null && upper.previousReplicated(filter);
+        return upper != null && upper.isPreviousTableReplicated(filter);
     }
 
     /**
@@ -374,7 +374,7 @@ public final class CollocationModel {
      * @return Affinity join type.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private Affinity joinedWithCollocated(int f) {
+    private CollocationModelAffinity joinedWithCollocated(int f) {
         TableFilter tf = childFilters[f];
 
         GridH2Table tbl = (GridH2Table)tf.getTable();
@@ -424,10 +424,10 @@ public final class CollocationModel {
                             // the found affinity column is the needed one, since we can select multiple
                             // different affinity columns from different tables.
                             if (cm != null && !cm.view) {
-                                Type t = cm.type(true);
+                                CollocationModelType t = cm.type(true);
 
                                 if (t.isPartitioned() && t.isCollocated() && isAffinityColumn(prevJoin, expCol, validate))
-                                    return Affinity.COLLOCATED_JOIN;
+                                    return CollocationModelAffinity.COLLOCATED_JOIN;
                             }
                         }
                     }
@@ -435,7 +435,7 @@ public final class CollocationModel {
             }
         }
 
-        return affKeyCondFound ? Affinity.HAS_AFFINITY_CONDITION : Affinity.NONE;
+        return affKeyCondFound ? CollocationModelAffinity.HAS_AFFINITY_CONDITION : CollocationModelAffinity.NONE;
     }
 
     /**
@@ -457,6 +457,7 @@ public final class CollocationModel {
      * @param validate Query validation flag.
      * @return {@code true} It it is an affinity column.
      */
+    @SuppressWarnings("IfMayBeConditional")
     private static boolean isAffinityColumn(TableFilter f, ExpressionColumn expCol, boolean validate) {
         Column col = expCol.getColumn();
 
@@ -519,21 +520,23 @@ public final class CollocationModel {
      * @return Multiplier.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private int multiplier(boolean withUnion) {
+    private CollocationModelMultiplier multiplier(boolean withUnion) {
         calculate();
 
-        assert multiplier != 0;
+        assert multiplier != null;
 
         if (withUnion && unions != null) {
-            int maxMultiplier = 0;
+            CollocationModelMultiplier maxMultiplier = null;
 
             for (int i = 0; i < unions.size(); i++) {
-                int m = unions.get(i).multiplier(false);
+                CollocationModelMultiplier m = unions.get(i).multiplier(false);
 
-                if (m > maxMultiplier)
+                if (maxMultiplier == null || m.multiplier() > maxMultiplier.multiplier())
                     maxMultiplier = m;
             }
 
+            assert maxMultiplier != null;
+
             return maxMultiplier;
         }
 
@@ -544,26 +547,26 @@ public final class CollocationModel {
      * @param withUnion With respect to union.
      * @return Type.
      */
-    private Type type(boolean withUnion) {
+    private CollocationModelType type(boolean withUnion) {
         calculate();
 
         assert type != null;
 
         if (withUnion && unions != null) {
-            Type left = unions.get(0).type(false);
+            CollocationModelType left = unions.get(0).type(false);
 
             for (int i = 1; i < unions.size(); i++) {
-                Type right = unions.get(i).type(false);
+                CollocationModelType right = unions.get(i).type(false);
 
                 if (!left.isCollocated() || !right.isCollocated()) {
-                    left = Type.PARTITIONED_NOT_COLLOCATED;
+                    left = CollocationModelType.PARTITIONED_NOT_COLLOCATED;
 
                     break;
                 }
                 else if (!left.isPartitioned() && !right.isPartitioned())
-                    left = Type.REPLICATED;
+                    left = CollocationModelType.REPLICATED;
                 else
-                    left = Type.PARTITIONED_COLLOCATED;
+                    left = CollocationModelType.PARTITIONED_COLLOCATED;
             }
 
             return left;
@@ -577,6 +580,7 @@ public final class CollocationModel {
      * @param create Create child if needed.
      * @return Child collocation.
      */
+    @SuppressWarnings("IfMayBeConditional")
     private CollocationModel child(int i, boolean create) {
         CollocationModel child = children[i];
 
@@ -625,18 +629,22 @@ public final class CollocationModel {
     /**
      * Get distributed multiplier for the given sequence of tables.
      *
-     * @param ctx Splitter context.
      * @param ses Session.
      * @param filters Filters.
      * @param filter Filter index.
      * @return Multiplier.
      */
-    public static int distributedMultiplier(
-        SplitterContext ctx,
-        Session ses,
-        TableFilter[] filters,
-        int filter
-    ) {
+    public static CollocationModelMultiplier distributedMultiplier(Session ses, TableFilter[] filters, int filter) {
+        // Notice that we check for isJoinBatchEnabled, because we can do multiple different
+        // optimization passes on PREPARE stage.
+        // Query expressions can not be distributed as well.
+        SplitterContext ctx = SplitterContext.get();
+
+        if (!ctx.distributedJoins() || !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression())
+            return CollocationModelMultiplier.COLLOCATED;
+
+        assert filters != null;
+
         clearViewIndexCache(ses);
 
         CollocationModel model = buildCollocationModel(ctx, ses.getSubQueryInfo(), filters, filter, false);
@@ -717,9 +725,9 @@ public final class CollocationModel {
     public static boolean isCollocated(Query qry) {
         CollocationModel mdl = buildCollocationModel(null, -1, qry, null, true);
 
-        Type type = mdl.type(true);
+        CollocationModelType type = mdl.type(true);
 
-        if (!type.isCollocated() && mdl.multiplier == MULTIPLIER_REPLICATED_NOT_LAST)
+        if (!type.isCollocated() && mdl.multiplier == CollocationModelMultiplier.REPLICATED_NOT_LAST)
             throw new CacheException("Failed to execute query: for distributed join " +
                 "all REPLICATED caches must be at the end of the joined tables list.");
 
@@ -763,7 +771,7 @@ public final class CollocationModel {
         for (TableFilter f = select.getTopTableFilter(); f != null; f = f.getJoin())
             list.add(f);
 
-        TableFilter[] filters = list.toArray(new TableFilter[list.size()]);
+        TableFilter[] filters = list.toArray(EMPTY_FILTERS);
 
         CollocationModel cm = createChildModel(upper, filter, unions, true, validate);
 
@@ -803,75 +811,4 @@ public final class CollocationModel {
         if (!viewIdxCache.isEmpty())
             viewIdxCache.clear();
     }
-
-    /**
-     * Collocation type.
-     */
-    private enum Type {
-        /** */
-        PARTITIONED_COLLOCATED(true, true),
-
-        /** */
-        PARTITIONED_NOT_COLLOCATED(true, false),
-
-        /** */
-        REPLICATED(false, true);
-
-        /** */
-        private final boolean partitioned;
-
-        /** */
-        private final boolean collocated;
-
-        /**
-         * @param partitioned Partitioned.
-         * @param collocated Collocated.
-         */
-        Type(boolean partitioned, boolean collocated) {
-            this.partitioned = partitioned;
-            this.collocated = collocated;
-        }
-
-        /**
-         * @return {@code true} If partitioned.
-         */
-        public boolean isPartitioned() {
-            return partitioned;
-        }
-
-        /**
-         * @return {@code true} If collocated.
-         */
-        public boolean isCollocated() {
-            return collocated;
-        }
-
-        /**
-         * @param partitioned Partitioned.
-         * @param collocated Collocated.
-         * @return Type.
-         */
-        static Type of(boolean partitioned, boolean collocated) {
-            if (collocated)
-                return partitioned ? Type.PARTITIONED_COLLOCATED : Type.REPLICATED;
-
-            assert partitioned;
-
-            return Type.PARTITIONED_NOT_COLLOCATED;
-        }
-    }
-
-    /**
-     * Affinity of a table relative to previous joined tables.
-     */
-    private enum Affinity {
-        /** */
-        NONE,
-
-        /** */
-        HAS_AFFINITY_CONDITION,
-
-        /** */
-        COLLOCATED_JOIN
-    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelAffinity.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelAffinity.java
new file mode 100644
index 0000000..07740ad
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelAffinity.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+/**
+ * Affinity of a table relative to previous joined tables.
+ */
+public enum CollocationModelAffinity {
+    /** */
+    NONE,
+
+    /** */
+    HAS_AFFINITY_CONDITION,
+
+    /** */
+    COLLOCATED_JOIN
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelMultiplier.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelMultiplier.java
new file mode 100644
index 0000000..74b76a3
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelMultiplier.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+/**
+ * Multiplier for different collocation types.
+ */
+public enum CollocationModelMultiplier {
+    /** Tables are collocated, cheap. */
+    COLLOCATED(1),
+
+    /** */
+    UNICAST(50),
+
+    /** */
+    BROADCAST(200),
+
+    /** Force REPLICATED tables to be at the end of join sequence. */
+    REPLICATED_NOT_LAST(10_000);
+
+    /** Multiplier value. */
+    private final int multiplier;
+
+    /**
+     * Constructor.
+     *
+     * @param multiplier Multiplier value.
+     */
+    CollocationModelMultiplier(int multiplier) {
+        this.multiplier = multiplier;
+    }
+
+    /**
+     * @return Multiplier value.
+     */
+    public int multiplier() {
+        return multiplier;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelType.java
new file mode 100644
index 0000000..ea6b10e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModelType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+/**
+ * Collocation type.
+ */
+public enum CollocationModelType {
+    /** */
+    PARTITIONED_COLLOCATED(true, true),
+
+    /** */
+    PARTITIONED_NOT_COLLOCATED(true, false),
+
+    /** */
+    REPLICATED(false, true);
+
+    /** */
+    private final boolean partitioned;
+
+    /** */
+    private final boolean collocated;
+
+    /**
+     * @param partitioned Partitioned.
+     * @param collocated Collocated.
+     */
+    CollocationModelType(boolean partitioned, boolean collocated) {
+        this.partitioned = partitioned;
+        this.collocated = collocated;
+    }
+
+    /**
+     * @return {@code true} If partitioned.
+     */
+    public boolean isPartitioned() {
+        return partitioned;
+    }
+
+    /**
+     * @return {@code true} If collocated.
+     */
+    public boolean isCollocated() {
+        return collocated;
+    }
+
+    /**
+     * @param partitioned Partitioned.
+     * @param collocated Collocated.
+     * @return Type.
+     */
+    public static CollocationModelType of(boolean partitioned, boolean collocated) {
+        if (collocated)
+            return partitioned ? PARTITIONED_COLLOCATED : REPLICATED;
+
+        assert partitioned;
+
+        return PARTITIONED_NOT_COLLOCATED;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
index 26206c9..490858f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
@@ -21,7 +21,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -51,8 +51,11 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2R
  * Index lookup batch.
  */
 public class DistributedLookupBatch implements IndexLookupBatch {
+    /** */
+    private static final Object EXPLICIT_NULL = new Object();
+
     /** Index. */
-    private final GridH2IndexBase idx;
+    private final H2TreeIndex idx;
 
     /** */
     private final GridCacheContext<?,?> cctx;
@@ -89,7 +92,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
      * @param ucast Unicast or broadcast query.
      * @param affColId Affinity column ID.
      */
-    public DistributedLookupBatch(GridH2IndexBase idx, GridCacheContext<?, ?> cctx, boolean ucast, int affColId) {
+    public DistributedLookupBatch(H2TreeIndex idx, GridCacheContext<?, ?> cctx, boolean ucast, int affColId) {
         this.idx = idx;
         this.cctx = cctx;
         this.ucast = ucast;
@@ -112,7 +115,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
         Value affKeyLast = lastRow.getValue(affColId);
 
         if (affKeyFirst != null && equal(affKeyFirst, affKeyLast))
-            return affKeyFirst == ValueNull.INSTANCE ? GridH2IndexBase.EXPLICIT_NULL : affKeyFirst.getObject();
+            return affKeyFirst == ValueNull.INSTANCE ? EXPLICIT_NULL : affKeyFirst.getObject();
 
         if (idx.getTable().rowDescriptor().isKeyColumn(affColId))
             return null;
@@ -122,7 +125,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
         Value pkLast = lastRow.getValue(QueryUtils.KEY_COL);
 
         if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE)
-            return GridH2IndexBase.EXPLICIT_NULL;
+            return EXPLICIT_NULL;
 
         if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast))
             return null;
@@ -176,7 +179,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
 
         if (affKey != null) {
             // Affinity key is provided.
-            if (affKey == GridH2IndexBase.EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything.
+            if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything.
                 return false;
 
             segmentKeys = F.asList(rangeSegment(affKey, locQry));
@@ -325,7 +328,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
      * @return Segment key for Affinity key.
      */
     public SegmentKey rangeSegment(Object affKeyObj, boolean isLocalQry) {
-        assert affKeyObj != null && affKeyObj != GridH2IndexBase.EXPLICIT_NULL : affKeyObj;
+        assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
 
         ClusterNode node;
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java
index 405478e..64aa0ca 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.opt.join;
 
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
 import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
@@ -36,7 +36,7 @@ import static java.util.Collections.emptyIterator;
  */
 public class RangeSource {
     /** Index. */
-    private final GridH2IndexBase idx;
+    private final H2TreeIndex idx;
 
     /** */
     private Iterator<GridH2RowRangeBounds> boundsIter;
@@ -59,7 +59,7 @@ public class RangeSource {
      * @param filter Filter.
      */
     public RangeSource(
-        GridH2IndexBase idx,
+        H2TreeIndex idx,
         Iterable<GridH2RowRangeBounds> bounds,
         int segment,
         BPlusTree.TreeRowClosure<H2Row, H2Row> filter
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
index b80cd61..6891ff3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
@@ -23,8 +23,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -58,7 +58,7 @@ public class RangeStream {
     private final GridKernalContext ctx;
 
     /** Index. */
-    private final GridH2IndexBase idx;
+    private final H2TreeIndex idx;
 
     /** */
     private final DistributedJoinContext joinCtx;
@@ -88,7 +88,7 @@ public class RangeStream {
      * @param joinCtx Join context.
      * @param node Node.
      */
-    public RangeStream(GridKernalContext ctx, GridH2IndexBase idx, DistributedJoinContext joinCtx, ClusterNode node) {
+    public RangeStream(GridKernalContext ctx, H2TreeIndex idx, DistributedJoinContext joinCtx, ClusterNode node) {
         this.ctx = ctx;
         this.idx = idx;
         this.node = node;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
index 880e305..482752a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
 import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRowFactory;
 import org.apache.ignite.internal.util.typedef.F;
@@ -49,7 +50,6 @@ import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.emptyIterator;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase.bubbleUp;
 
 /**
  * Sorted index.
@@ -258,7 +258,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
                 return; // All streams are done.
 
             if (streams[off].next())
-                bubbleUp(streams, off, streamCmp);
+                H2Utils.bubbleUp(streams, off, streamCmp);
             else
                 streams[off++] = null; // Move left bound and nullify empty stream.
         }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8097800..fdfb44c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -1209,7 +1209,8 @@ public class GridReduceQueryExecutor {
             specialize,
             locNodeHnd,
             GridIoPolicy.QUERY_POOL,
-            runLocParallel);
+            runLocParallel
+        );
     }
 
     /**