You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2020/01/31 15:57:32 UTC

[ignite] 01/01: IGNITE-12609: SQL: GridReduceQueryExecutor refactoring.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-12609
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 1616b682a5c130a63de74c91bc62b8ae05882117
Author: Andrey V. Mashenkov <an...@gmail.com>
AuthorDate: Wed Jan 29 15:14:50 2020 +0300

    IGNITE-12609: SQL: GridReduceQueryExecutor refactoring.
    
    Optimize query mapping.
    Decouple query reducer and H2 index adapters code.
---
 .../apache/ignite/internal/util/lang/GridFunc.java |   3 +
 .../cache/query/GridCacheTwoStepQuery.java         |  18 +-
 .../query/h2/H2JavaObjectSerializer.java           |  55 +++
 .../processors/query/h2/IgniteH2Indexing.java      |  12 +-
 .../query/h2/sql/GridSqlQuerySplitter.java         |   2 +
 .../h2/twostep/AbstractReduceIndexAdapter.java     | 128 +++++
 .../query/h2/twostep/AbstractReducer.java          | 367 ++++++++++++++
 .../query/h2/twostep/GridMapQueryExecutor.java     |   2 +-
 .../query/h2/twostep/GridReduceQueryExecutor.java  | 532 ++++++++++++---------
 .../query/h2/twostep/ReduceBlockList.java          |   5 +-
 .../query/h2/twostep/ReduceIndexIterator.java      |  10 +-
 .../query/h2/twostep/ReducePartitionMapResult.java |  15 +-
 .../query/h2/twostep/ReducePartitionMapper.java    | 222 ++++++---
 .../query/h2/twostep/ReduceQueryRun.java           |  59 ++-
 .../processors/query/h2/twostep/ReduceTable.java   |  11 +-
 .../processors/query/h2/twostep/Reducer.java       | 106 ++++
 .../query/h2/twostep/SortedReduceIndexAdapter.java |  67 +++
 .../{ReduceIndexSorted.java => SortedReducer.java} | 356 +++++++++++---
 .../h2/twostep/UnsortedReduceIndexAdapter.java     |  61 +++
 ...duceIndexUnsorted.java => UnsortedReducer.java} | 146 ++++--
 .../query/h2/twostep/msg/GridH2QueryRequest.java   |  31 ++
 .../query/IgniteSqlSplitterSelfTest.java           |   6 +-
 .../internal/processors/query/KillQueryTest.java   |   4 +-
 .../h2/twostep/RetryCauseMessageSelfTest.java      |   8 +-
 .../IgniteBinaryCacheQueryTestSuite.java           |   3 -
 25 files changed, 1743 insertions(+), 486 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 8fe1a67..11e7a13 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -364,6 +364,9 @@ public class GridFunc {
 
         int n = ThreadLocalRandom.current().nextInt(c.size());
 
+        if (c instanceof List)
+            return ((List<? extends T>)c).get(n);
+
         int i = 0;
 
         for (T t : c) {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 6483985..c4f4143 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
@@ -50,6 +49,9 @@ public class GridCacheTwoStepQuery {
     private final boolean distributedJoins;
 
     /** */
+    private final boolean replicatedOnly;
+
+    /** */
     private final boolean skipMergeTbl;
 
     /** */
@@ -77,6 +79,7 @@ public class GridCacheTwoStepQuery {
      * @param skipMergeTbl Skip merge table flag.
      * @param explain Explain flag.
      * @param distributedJoins Distributed joins flag.
+     * @param replicatedOnly Replicated only flag.
      * @param derivedPartitions Derived partitions.
      * @param cacheIds Cache ids.
      * @param mvccEnabled Mvcc flag.
@@ -91,16 +94,18 @@ public class GridCacheTwoStepQuery {
         boolean skipMergeTbl,
         boolean explain,
         boolean distributedJoins,
+        boolean replicatedOnly,
         PartitionResult derivedPartitions,
         List<Integer> cacheIds,
         boolean mvccEnabled,
         boolean locSplit
     ) {
+        assert !F.isEmpty(mapQrys);
+
         this.originalSql = originalSql;
         this.paramsCnt = paramsCnt;
         this.tbls = tbls;
         this.rdc = rdc;
-        this.mapQrys = F.isEmpty(mapQrys) ? Collections.emptyList() : mapQrys;
         this.skipMergeTbl = skipMergeTbl;
         this.explain = explain;
         this.distributedJoins = distributedJoins;
@@ -108,6 +113,8 @@ public class GridCacheTwoStepQuery {
         this.cacheIds = cacheIds;
         this.mvccEnabled = mvccEnabled;
         this.locSplit = locSplit;
+        this.mapQrys = mapQrys;
+        this.replicatedOnly = replicatedOnly;
     }
 
     /**
@@ -139,12 +146,7 @@ public class GridCacheTwoStepQuery {
     public boolean isReplicatedOnly() {
         assert !mapQrys.isEmpty();
 
-        for (GridCacheSqlQuery mapQry : mapQrys) {
-            if (mapQry.isPartitioned())
-                return false;
-        }
-
-        return true;
+        return replicatedOnly;
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2JavaObjectSerializer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2JavaObjectSerializer.java
new file mode 100644
index 0000000..97abcd2
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2JavaObjectSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.h2.api.JavaObjectSerializer;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Ignite java object serializer implementation for H2.
+ */
+class H2JavaObjectSerializer implements JavaObjectSerializer {
+    /** Class loader. */
+    private final ClassLoader clsLdr;
+
+    /** Marshaller. */
+    private final Marshaller marshaller;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    H2JavaObjectSerializer(@NotNull GridKernalContext ctx) {
+        marshaller = ctx.config().getMarshaller();
+        clsLdr = U.resolveClassLoader(ctx.config());
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] serialize(Object obj) throws Exception {
+        return U.marshal(marshaller, obj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object deserialize(byte[] bytes) throws Exception {
+        return U.unmarshal(marshaller, bytes, clsLdr);
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 9871e2e..e20c178 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -2265,17 +2265,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @return Serializer.
      */
     private JavaObjectSerializer h2Serializer() {
-        return new JavaObjectSerializer() {
-            @Override public byte[] serialize(Object obj) throws Exception {
-                return U.marshal(marshaller, obj);
-            }
-
-            @Override public Object deserialize(byte[] bytes) throws Exception {
-                ClassLoader clsLdr = ctx != null ? U.resolveClassLoader(ctx.config()) : null;
-
-                return U.unmarshal(marshaller, bytes, clsLdr);
-            }
-        };
+        return new H2JavaObjectSerializer(ctx);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 7b432da..04968ea 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -296,6 +296,7 @@ public class GridSqlQuerySplitter {
 
         List<Integer> cacheIds = H2Utils.collectCacheIds(idx, null, splitter.tbls);
         boolean mvccEnabled = H2Utils.collectMvccEnabled(idx, cacheIds);
+        boolean replicatedOnly = splitter.mapSqlQrys.stream().noneMatch(GridCacheSqlQuery::isPartitioned);
 
         H2Utils.checkQuery(idx, cacheIds, splitter.tbls);
 
@@ -309,6 +310,7 @@ public class GridSqlQuerySplitter {
             splitter.skipMergeTbl,
             explain,
             distributedJoins,
+            replicatedOnly,
             splitter.extractor.mergeMapQueries(splitter.mapSqlQrys),
             cacheIds,
             mvccEnabled,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractReduceIndexAdapter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractReduceIndexAdapter.java
new file mode 100644
index 0000000..55b0943
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractReduceIndexAdapter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.h2.engine.Session;
+import org.h2.index.BaseIndex;
+import org.h2.index.Cursor;
+import org.h2.index.Index;
+import org.h2.index.IndexType;
+import org.h2.message.DbException;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+import org.h2.table.IndexColumn;
+import org.h2.table.Table;
+
+/**
+ * H2 {@link Index} adapter base class.
+ */
+public abstract class AbstractReduceIndexAdapter extends BaseIndex {
+    /**
+     * @param ctx  Context.
+     * @param tbl  Table.
+     * @param name Index name.
+     * @param type Type.
+     * @param cols Columns.
+     */
+    protected AbstractReduceIndexAdapter(GridKernalContext ctx,
+        Table tbl,
+        String name,
+        IndexType type,
+        IndexColumn[] cols
+    ) {
+        initBaseIndex(tbl, 0, name, cols, type);
+    }
+
+    /**
+     * @return Index reducer.
+     */
+    abstract AbstractReducer reducer();
+
+    /** {@inheritDoc} */
+    @Override public long getRowCount(Session ses) {
+        Cursor c = find(ses, null, null);
+
+        long cnt = 0;
+
+        while (c.next())
+            cnt++;
+
+        return cnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getRowCountApproximation() {
+        return 10_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final Cursor find(Session ses, SearchRow first, SearchRow last) {
+        return reducer().find(first, last);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkRename() {
+        throw DbException.getUnsupportedException("rename");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(Session ses) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void add(Session ses, Row row) {
+        throw DbException.getUnsupportedException("add");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove(Session ses, Row row) {
+        throw DbException.getUnsupportedException("remove row");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove(Session ses) {
+        throw DbException.getUnsupportedException("remove index");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void truncate(Session ses) {
+        throw DbException.getUnsupportedException("truncate");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean canGetFirstOrLast() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor findFirstOrLast(Session ses, boolean first) {
+        throw DbException.getUnsupportedException("findFirstOrLast");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean needRebuild() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getDiskSpaceUsed() {
+        return 0;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractReducer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractReducer.java
new file mode 100644
index 0000000..e920a49
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractReducer.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.index.Cursor;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+import org.h2.value.Value;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_MAX_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+
+/**
+ * Base class for reducer of remote index lookup results.
+ */
+public abstract class AbstractReducer implements Reducer {
+    /** */
+    static final int MAX_FETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_MAX_SIZE, 10_000);
+
+    /** */
+    static final int PREFETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE, 1024);
+
+    static {
+        if (!U.isPow2(PREFETCH_SIZE)) {
+            throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
+                ") must be positive and a power of 2.");
+        }
+
+        if (PREFETCH_SIZE >= MAX_FETCH_SIZE) {
+            throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
+                ") must be less than " + IGNITE_SQL_MERGE_TABLE_MAX_SIZE + " (" + MAX_FETCH_SIZE + ").");
+        }
+    }
+
+    /** */
+    @SuppressWarnings("rawtypes")
+    private static final AtomicReferenceFieldUpdater<AbstractReducer, ConcurrentMap> LAST_PAGES_UPDATER =
+        AtomicReferenceFieldUpdater.newUpdater(AbstractReducer.class, ConcurrentMap.class, "lastPages");
+
+    /** */
+    private final GridKernalContext ctx;
+
+    /** DO NOT change name field of this field, updated through {@link #LAST_PAGES_UPDATER} */
+    @SuppressWarnings("unused")
+    private volatile ConcurrentMap<ReduceSourceKey, Integer> lastPages;
+
+    /** Row source nodes. */
+    protected Set<UUID> srcNodes;
+
+    /** */
+    private int pageSize;
+
+    /**
+     * Will be r/w from query execution thread only, does not need to be threadsafe.
+     */
+    protected final ReduceBlockList<Row> fetched;
+
+    /** */
+    private Row lastEvictedRow;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     */
+    AbstractReducer(GridKernalContext ctx) {
+        this.ctx = ctx;
+
+        fetched = new ReduceBlockList<>(PREFETCH_SIZE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
+        assert srcNodes == null;
+
+        srcNodes = new HashSet<>(nodes.size());
+
+        for (ClusterNode node : nodes) {
+            if (!srcNodes.add(node.id()))
+                throw new IllegalStateException();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<UUID> sources() {
+        return srcNodes;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasSource(UUID nodeId) {
+        return srcNodes.contains(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setPageSize(int pageSize) {
+        this.pageSize = pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onFailure(UUID nodeId, final CacheException e) {
+        if (nodeId == null)
+            nodeId = F.first(srcNodes);
+
+        addPage0(new ReduceResultPage(null, nodeId, null) {
+            @Override public boolean isFail() {
+                return true;
+            }
+
+            @Override public void fetchNextPage() {
+                if (e == null)
+                    super.fetchNextPage();
+                else
+                    throw e;
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public final Cursor find(@Nullable SearchRow first, @Nullable SearchRow last) {
+        checkBounds(lastEvictedRow, first, last);
+
+        if (fetchedAll())
+            return findAllFetched(fetched, first, last);
+
+        return findInStream(first, last);
+    }
+
+    /**
+     * @param first Row.
+     * @param last Row.
+     * @return Cursor over remote streams.
+     */
+    protected abstract Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last);
+
+    /**
+     * @param fetched Fetched data.
+     * @param first Row.
+     * @param last Row.
+     * @return Cursor over fetched data.
+     */
+    protected abstract Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last);
+
+    /**
+     * @param lastEvictedRow Last evicted fetched row.
+     * @param first Lower bound.
+     * @param last Upper bound.
+     */
+    protected void checkBounds(Row lastEvictedRow, SearchRow first, SearchRow last) {
+        if (lastEvictedRow != null)
+            throw new IgniteException("Fetched result set was too large.");
+    }
+
+    /**
+     * @param evictedBlock Evicted block.
+     */
+    protected void onBlockEvict(@NotNull List<Row> evictedBlock) {
+        assert evictedBlock.size() == PREFETCH_SIZE;
+
+        // Remember the last row (it will be max row) from the evicted block.
+        lastEvictedRow = requireNonNull(last(evictedBlock));
+    }
+
+    /**
+     * @param l List.
+     * @return Last element.
+     */
+    static <Z> Z last(List<Z> l) {
+        return l.get(l.size() - 1);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public void addPage(ReduceResultPage page) {
+        markLastPage(page);
+        addPage0(page);
+    }
+
+    /**
+     * @param page Page.
+     */
+    protected abstract void addPage0(ReduceResultPage page);
+
+    /**
+     * Fails index if any source node is left.
+     */
+    private void checkSourceNodesAlive() {
+        for (UUID nodeId : srcNodes) {
+            if (!ctx.discovery().alive(nodeId)) {
+                onFailure(nodeId, null);
+
+                return;
+            }
+        }
+    }
+
+    /**
+     * @param e Error.
+     */
+    public void fail(final CacheException e) {
+        for (UUID nodeId : srcNodes)
+            onFailure(nodeId, e);
+    }
+
+    /**
+     * @param page Page.
+     */
+    private void markLastPage(ReduceResultPage page) {
+        GridQueryNextPageResponse res = page.response();
+
+        if (!res.last()) {
+            UUID nodeId = page.source();
+
+            initLastPages(nodeId, res);
+
+            ConcurrentMap<ReduceSourceKey,Integer> lp = lastPages;
+
+            if (lp == null)
+                return; // It was not initialized --> wait for last page flag.
+
+            Integer lastPage = lp.get(new ReduceSourceKey(nodeId, res.segmentId()));
+
+            if (lastPage == null)
+                return; // This node may use the new protocol --> wait for last page flag.
+
+            if (lastPage != res.page()) {
+                assert lastPage > res.page();
+
+                return; // This is not the last page.
+            }
+        }
+
+        page.setLast(true);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
+    private void initLastPages(UUID nodeId, GridQueryNextPageResponse res) {
+        int allRows = res.allRows();
+
+        // If the old protocol we send all rows number in the page 0, other pages have -1.
+        // In the new protocol we do not know it and always have -1, except terminating page,
+        // which has -2. Thus we have to init page counters only when we receive positive value
+        // in the first page.
+        if (allRows < 0 || res.page() != 0)
+            return;
+
+        ConcurrentMap<ReduceSourceKey,Integer> lp = lastPages;
+
+        if (lp == null && !LAST_PAGES_UPDATER.compareAndSet(this, null, lp = new ConcurrentHashMap<>()))
+            lp = lastPages;
+
+        assert pageSize > 0: pageSize;
+
+        int lastPage = allRows == 0 ? 0 : (allRows - 1) / pageSize;
+
+        assert lastPage >= 0: lastPage;
+
+        if (lp.put(new ReduceSourceKey(nodeId, res.segmentId()), lastPage) != null)
+            throw new IllegalStateException();
+    }
+
+    /**
+     * @param lastPage Real last page.
+     * @return Created dummy page.
+     */
+    protected final ReduceResultPage createDummyLastPage(ReduceResultPage lastPage) {
+        assert !lastPage.isDummyLast(); // It must be a real last page.
+
+        return new ReduceResultPage(ctx, lastPage.source(), null).setLast(true);
+    }
+
+    /**
+     * @param queue Queue to poll.
+     * @param iter Current iterator.
+     * @return The same or new iterator.
+     */
+    protected final Iterator<Value[]> pollNextIterator(Pollable<ReduceResultPage> queue, Iterator<Value[]> iter) {
+        if (!iter.hasNext()) {
+            ReduceResultPage page = takeNextPage(queue);
+
+            if (!page.isLast())
+                page.fetchNextPage(); // Failed will throw an exception here.
+
+            iter = page.rows();
+
+            // The received iterator must be empty in the dummy last page or on failure.
+            assert iter.hasNext() || page.isDummyLast() || page.isFail();
+        }
+
+        return iter;
+    }
+
+    /**
+     * @param queue Queue to poll.
+     * @return Next page.
+     */
+    private ReduceResultPage takeNextPage(Pollable<ReduceResultPage> queue) {
+        ReduceResultPage page;
+
+        for (;;) {
+            try {
+                page = queue.poll(500, TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException e) {
+                throw new CacheException("Query execution was interrupted.", e);
+            }
+
+            if (page != null)
+                break;
+
+            checkSourceNodesAlive();
+        }
+
+        return page;
+    }
+
+    /**
+     * Pollable.
+     */
+    interface Pollable<E> {
+        /**
+         * @param timeout Timeout.
+         * @param unit Time unit.
+         * @return Polled value or {@code null} if none.
+         * @throws InterruptedException If interrupted.
+         */
+        E poll(long timeout, TimeUnit unit) throws InterruptedException;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index dd2216b..222ce26 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -189,7 +189,7 @@ public class GridMapQueryExecutor {
 
         final Map<UUID,int[]> partsMap = req.partitions();
 
-        final int[] parts = qryParts == null ? partsMap == null ? null : partsMap.get(ctx.localNodeId()) : qryParts;
+        final int[] parts = qryParts == null ? (partsMap == null ? null : partsMap.get(ctx.localNodeId())) : qryParts;
 
         boolean distributedJoins = req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS);
         boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index d3e7772..8aaebb6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -42,6 +41,7 @@ import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cache.query.QueryRetryException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -94,6 +94,7 @@ import org.h2.jdbc.JdbcConnection;
 import org.h2.table.Column;
 import org.h2.util.IntArray;
 import org.h2.value.Value;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.singletonList;
@@ -102,7 +103,6 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkAc
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;
-import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.setDataPageScanEnabled;
 
 /**
  * Reduce query executor.
@@ -152,12 +152,12 @@ public class GridReduceQueryExecutor {
         }
     };
 
+    /** Default query timeout. */
+    private final long dfltQueryTimeout = IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, DFLT_RETRY_TIMEOUT);
+
     /** Partition mapper. */
     private ReducePartitionMapper mapper;
 
-    /** Default query timeout. */
-    private long dfltQueryTimeout;
-
     /**
      * @param ctx Context.
      * @param h2 H2 Indexing.
@@ -167,8 +167,6 @@ public class GridReduceQueryExecutor {
         this.ctx = ctx;
         this.h2 = h2;
 
-        dfltQueryTimeout = IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, DFLT_RETRY_TIMEOUT);
-
         log = ctx.log(GridReduceQueryExecutor.class);
 
         mapper = new ReducePartitionMapper(ctx, log);
@@ -182,7 +180,7 @@ public class GridReduceQueryExecutor {
         UUID nodeId = evt.eventNode().id();
 
         for (ReduceQueryRun r : runs.values()) {
-            for (ReduceIndex idx : r.indexes()) {
+            for (Reducer idx : r.reducers()) {
                 if (idx.hasSource(nodeId)) {
                     handleNodeLeft(r, nodeId);
 
@@ -255,7 +253,7 @@ public class GridReduceQueryExecutor {
 
         final int pageSize = r.pageSize();
 
-        ReduceIndex idx = r.indexes().get(msg.query());
+        Reducer idx = r.reducers().get(msg.query());
 
         ReduceResultPage page;
 
@@ -273,7 +271,7 @@ public class GridReduceQueryExecutor {
 
                     try {
                         GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, seg, pageSize,
-                            (byte)setDataPageScanEnabled(0, r.isDataPageScanEnabled()));
+                            (byte)GridH2QueryRequest.setDataPageScanEnabled(0, r.isDataPageScanEnabled()));
 
                         if (node.isLocal())
                             h2.mapQueryExecutor().onNextPageRequest(node, msg0);
@@ -299,7 +297,7 @@ public class GridReduceQueryExecutor {
         if (msg.retry() != null)
             r.setStateOnRetry(node.id(), msg.retry(), msg.retryCause());
         else if (msg.page() == 0) // Count down only on each first page received.
-            r.latch().countDown();
+            r.onFirstPage();
     }
 
     /**
@@ -346,11 +344,18 @@ public class GridReduceQueryExecutor {
         Boolean dataPageScanEnabled,
         int pageSize
     ) {
+        assert !qry.mvccEnabled() || mvccTracker != null;
+
+        if (pageSize <= 0)
+            pageSize = Query.DFLT_PAGE_SIZE;
+
         // If explicit partitions are set, but there are no real tables, ignore.
         if (!qry.hasCacheIds() && parts != null)
             parts = null;
 
-        assert !qry.mvccEnabled() || mvccTracker != null;
+        // Partitions are not supported for queries over all replicated caches.
+        if (parts != null && qry.isReplicatedOnly())
+            throw new CacheException("Partitions are not supported for replicated caches");
 
         try {
             if (qry.mvccEnabled())
@@ -365,59 +370,29 @@ public class GridReduceQueryExecutor {
         if (F.isEmpty(params))
             params = EMPTY_PARAMS;
 
-        List<GridCacheSqlQuery> mapQueries;
-
-        if (singlePartMode)
-            mapQueries = prepareMapQueryForSinglePartition(qry, params);
-        else {
-            mapQueries = new ArrayList<>(qry.mapQueries().size());
+        List<Integer> cacheIds = qry.cacheIds();
 
-            // Copy queries here because node ID will be changed below.
-            for (GridCacheSqlQuery mapQry : qry.mapQueries())
-                mapQueries.add(mapQry.copy());
-        }
+        List<GridCacheSqlQuery> mapQueries = prepareMapQueries(qry, params, singlePartMode);
 
-        final boolean isReplicatedOnly = qry.isReplicatedOnly();
+        final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable() || singlePartMode;
 
-        long retryTimeout = retryTimeout(timeoutMillis);
+        final int segmentsPerIndex = qry.explain() || qry.isReplicatedOnly() ? 1 :
+            mapper.findFirstPartitioned(cacheIds).config().getQueryParallelism();
 
-        final long startTime = U.currentTimeMillis();
+        final long retryTimeout = retryTimeout(timeoutMillis);
+        final long qryStartTime = U.currentTimeMillis();
 
         ReduceQueryRun lastRun = null;
 
         for (int attempt = 0;; attempt++) {
-            try {
-                cancel.checkCancelled();
-            }
-            catch (QueryCancelledException cancelEx) {
-                throw new CacheException("Failed to run reduce query locally. " + cancelEx.getMessage(),  cancelEx);
-            }
-
-            if (attempt > 0 && retryTimeout > 0 && (U.currentTimeMillis() - startTime > retryTimeout)) {
-                // There are few cases when 'retryCause' can be undefined, so we should throw exception with proper message here.
-                if (lastRun == null || lastRun.retryCause() == null)
-                    throw new CacheException("Failed to map SQL query to topology during timeout: " + retryTimeout + "ms");
-
-                UUID retryNodeId = lastRun.retryNodeId();
-                String retryCause = lastRun.retryCause();
-
-                throw new CacheException("Failed to map SQL query to topology on data node [dataNodeId=" + retryNodeId +
-                    ", msg=" + retryCause + ']');
-            }
+            ensureQueryNotCancelled(cancel);
 
-            if (attempt != 0) {
-                try {
-                    Thread.sleep(attempt * 10); // Wait for exchange.
-                }
-                catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
+            if (attempt > 0) {
+                throttleOnRetry(lastRun, qryStartTime, retryTimeout, attempt);
 
-                    throw new CacheException("Query was interrupted.", e);
-                }
+                ensureQueryNotCancelled(cancel);
             }
 
-            List<Integer> cacheIds = qry.cacheIds();
-
             AffinityTopologyVersion topVer = h2.readyTopologyVersion();
 
             // Check if topology has changed while retrying on locked topology.
@@ -426,176 +401,41 @@ public class GridReduceQueryExecutor {
                     "execution inside a transaction. It's recommended to rollback and retry transaction."));
             }
 
-            long qryReqId = qryIdGen.incrementAndGet();
-
-            final ReduceQueryRun r = new ReduceQueryRun(
-                h2.connections().connectionForThread().connection(schemaName),
-                mapQueries.size(),
-                pageSize,
-                dataPageScanEnabled
-            );
-
-            ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable detachedConn = h2.connections().detachThreadConnection();
-
-            Collection<ClusterNode> nodes;
-
-            // Explicit partition mapping for unstable topology.
-            Map<ClusterNode, IntArray> partsMap = null;
-
-            // Explicit partitions mapping for query.
-            Map<ClusterNode, IntArray> qryMap = null;
-
-            // Partitions are not supported for queries over all replicated caches.
-            if (parts != null) {
-                boolean replicatedOnly = true;
-
-                for (Integer cacheId : cacheIds) {
-                    if (!cacheContext(cacheId).isReplicated()) {
-                        replicatedOnly = false;
-
-                        break;
-                    }
-                }
-
-                if (replicatedOnly)
-                    throw new CacheException("Partitions are not supported for replicated caches");
-            }
-
-            if (qry.isLocalSplit() || !qry.hasCacheIds())
-                nodes = singletonList(ctx.discovery().localNode());
-            else {
-                ReducePartitionMapResult nodesParts =
-                    mapper.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, qryReqId);
-
-                nodes = nodesParts.nodes();
-                partsMap = nodesParts.partitionsMap();
-                qryMap = nodesParts.queryPartitionsMap();
-
-                if (F.isEmpty(nodes))
-                    continue; // Retry.
-
-                if (isReplicatedOnly || qry.explain()) {
-                    ClusterNode locNode = ctx.discovery().localNode();
-
-                    // Always prefer local node if possible.
-                    if (nodes.contains(locNode))
-                        nodes = singletonList(locNode);
-                    else {
-                        // Select random data node to run query on a replicated data or
-                        // get EXPLAIN PLAN from a single node.
-                        nodes = singletonList(F.rand(nodes));
-                    }
-                }
-            }
-
-            int tblIdx = 0;
-
-            final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable() || singlePartMode;
-
-            final int segmentsPerIndex = qry.explain() || isReplicatedOnly ? 1 :
-                mapper.findFirstPartitioned(cacheIds).config().getQueryParallelism();
+            ReducePartitionMapResult mapping = createMapping(qry, parts, cacheIds, topVer);
 
-            int replicatedQrysCnt = 0;
+            if (mapping == null) // Can't map query.
+                continue; // Retry.
 
-            final Collection<ClusterNode> finalNodes = nodes;
-
-            for (GridCacheSqlQuery mapQry : mapQueries) {
-                ReduceIndex idx;
+            final Collection<ClusterNode> nodes = mapping.nodes();
 
-                if (!skipMergeTbl) {
-                    ReduceTable tbl;
-
-                    try {
-                        tbl = createMergeTable(r.connection(), mapQry, qry.explain());
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteException(e);
-                    }
+            assert !F.isEmpty(nodes);
 
-                    idx = tbl.getMergeIndex();
+            final long qryReqId = qryIdGen.incrementAndGet();
 
-                    fakeTable(r.connection(), tblIdx++).innerTable(tbl);
-                }
-                else
-                    idx = ReduceIndexUnsorted.createDummy(ctx);
-
-                // If the query has only replicated tables, we have to run it on a single node only.
-                if (!mapQry.isPartitioned()) {
-                    ClusterNode node = F.rand(nodes);
-
-                    mapQry.node(node.id());
-
-                    replicatedQrysCnt++;
-
-                    idx.setSources(singletonList(node), 1); // Replicated tables can have only 1 segment.
-                }
-                else
-                    idx.setSources(nodes, segmentsPerIndex);
-
-                idx.setPageSize(r.pageSize());
-
-                r.indexes().add(idx);
-            }
+            boolean retry = false;
+            boolean release = true;
 
-            r.latch(new CountDownLatch(isReplicatedOnly ? 1 :
-                (r.indexes().size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt));
+            final ReduceQueryRun r = createReduceQueryRun(
+                (JdbcConnection)h2.connections().connectionForThread().connection(schemaName),
+                mapQueries, nodes, pageSize, segmentsPerIndex, skipMergeTbl, qry.explain(), dataPageScanEnabled);
 
             runs.put(qryReqId, r);
 
-            boolean release = true;
+            ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable detachedConn = h2.connections().detachThreadConnection();
 
             try {
-                cancel.checkCancelled();
-
-                if (ctx.clientDisconnected()) {
-                    throw new CacheException("Query was cancelled, client node disconnected.",
-                        new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(),
-                            "Client node disconnected."));
-                }
-
-                List<GridCacheSqlQuery> mapQrys = mapQueries;
-
-                if (qry.explain()) {
-                    mapQrys = new ArrayList<>(mapQueries.size());
-
-                    for (GridCacheSqlQuery mapQry : mapQueries)
-                        mapQrys.add(new GridCacheSqlQuery(singlePartMode ? mapQry.query() : "EXPLAIN " + mapQry.query())
-                            .parameterIndexes(mapQry.parameterIndexes()));
-                }
-
-                final long qryReqId0 = qryReqId;
-
-                cancel.set(() -> send(finalNodes, new GridQueryCancelRequest(qryReqId0), null, true));
-
-                boolean retry = false;
-
-                int flags = singlePartMode && !enforceJoinOrder ? 0 : GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
-
-                // Distributed joins flag is set if it is either reald
-                if (qry.distributedJoins())
-                    flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS;
-
-                if (qry.explain())
-                    flags |= GridH2QueryRequest.FLAG_EXPLAIN;
-
-                if (isReplicatedOnly)
-                    flags |= GridH2QueryRequest.FLAG_REPLICATED;
-
-                if (lazy)
-                    flags |= GridH2QueryRequest.FLAG_LAZY;
-
-                flags = setDataPageScanEnabled(flags, dataPageScanEnabled);
+                cancel.set(() -> send(nodes, new GridQueryCancelRequest(qryReqId), null, true));
 
                 GridH2QueryRequest req = new GridH2QueryRequest()
                     .requestId(qryReqId)
                     .topologyVersion(topVer)
-                    .pageSize(r.pageSize())
+                    .pageSize(pageSize)
                     .caches(qry.cacheIds())
                     .tables(qry.distributedJoins() ? qry.tables() : null)
-                    .partitions(convert(partsMap))
-                    .queries(mapQrys)
+                    .partitions(convert(mapping.partitionsMap()))
+                    .queries(mapQueries)
                     .parameters(params)
-                    .flags(flags)
+                    .flags(queryFlags(qry, enforceJoinOrder, lazy, dataPageScanEnabled))
                     .timeout(timeoutMillis)
                     .schemaName(schemaName);
 
@@ -603,7 +443,7 @@ public class GridReduceQueryExecutor {
                     req.mvccSnapshot(mvccTracker.snapshot());
 
                 final C2<ClusterNode, Message, Message> spec =
-                    parts == null ? null : new ReducePartitionsSpecializer(qryMap);
+                    parts == null ? null : new ReducePartitionsSpecializer(mapping.queryPartitionsMap());
 
                 if (send(nodes, req, spec, false)) {
                     awaitAllReplies(r, nodes, cancel);
@@ -631,12 +471,18 @@ public class GridReduceQueryExecutor {
                 else // Send failed.
                     retry = true;
 
-                Iterator<List<?>> resIter;
+                if (retry) {
+                    lastRun = runs.remove(qryReqId);
+                    assert lastRun != null;
+
+                    continue;
+                }
+                else {
+                    Iterator<List<?>> resIter;
 
-                if (!retry) {
                     if (skipMergeTbl) {
                         resIter = new ReduceIndexIterator(this,
-                            finalNodes,
+                            nodes,
                             r,
                             qryReqId,
                             qry.distributedJoins(),
@@ -645,9 +491,7 @@ public class GridReduceQueryExecutor {
                         release = false;
                     }
                     else {
-                        cancel.checkCancelled();
-
-                        H2Utils.setupConnection(r.connection(), false, enforceJoinOrder);
+                        ensureQueryNotCancelled(cancel);
 
                         QueryContext qctx = new QueryContext(
                             0,
@@ -658,6 +502,8 @@ public class GridReduceQueryExecutor {
                             true
                         );
 
+                        H2Utils.setupConnection(r.connection(), false, enforceJoinOrder);
+
                         QueryContextRegistry qryCtxRegistry = h2.queryContextRegistry();
 
                         qryCtxRegistry.setThreadLocal(qctx);
@@ -682,7 +528,7 @@ public class GridReduceQueryExecutor {
                                 cancel,
                                 dataPageScanEnabled,
                                 qryInfo
-                                );
+                            );
 
                             resIter = new H2FieldsIterator(res, mvccTracker, detachedConn);
 
@@ -695,18 +541,9 @@ public class GridReduceQueryExecutor {
                             qryCtxRegistry.clearThreadLocal();
                         }
                     }
-                }
-                else {
-                    assert r != null;
-                    lastRun=r;
 
-                    if (Thread.currentThread().isInterrupted())
-                        throw new IgniteInterruptedCheckedException("Query was interrupted.");
-
-                    continue;
+                    return new GridQueryCacheObjectsIterator(resIter, h2.objectContext(), keepBinary);
                 }
-
-                return new GridQueryCacheObjectsIterator(resIter, h2.objectContext(), keepBinary);
             }
             catch (IgniteCheckedException | RuntimeException e) {
                 release = true;
@@ -738,7 +575,7 @@ public class GridReduceQueryExecutor {
                     detachedConn.recycle();
 
                 if (release) {
-                    releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins(), mvccTracker);
+                    releaseRemoteResources(nodes, r, qryReqId, qry.distributedJoins(), mvccTracker);
 
                     if (!skipMergeTbl) {
                         for (int i = 0, mapQrys = mapQueries.size(); i < mapQrys; i++)
@@ -750,6 +587,233 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * Wait on retry.
+     *
+     * @param lastRun Previous query run.
+     * @param startTime Query start time.
+     * @param retryTimeout Query retry timeout.
+     * @param timeoutMultiplier Timeout multiplier.
+     */
+    private void throttleOnRetry(
+        @Nullable ReduceQueryRun lastRun,
+        long startTime,
+        long retryTimeout,
+        int timeoutMultiplier) {
+        if (retryTimeout > 0 && (U.currentTimeMillis() - startTime > retryTimeout)) {
+            // There are few cases when 'retryCause' can be undefined, so we should throw exception with proper message here.
+            if (lastRun == null || lastRun.retryCause() == null)
+                throw new CacheException("Failed to map SQL query to topology during timeout: " + retryTimeout + "ms");
+
+            UUID retryNodeId = lastRun.retryNodeId();
+            String retryCause = lastRun.retryCause();
+
+            throw new CacheException("Failed to map SQL query to topology on data node [dataNodeId=" + retryNodeId +
+                ", msg=" + retryCause + ']');
+        }
+
+        try {
+            Thread.sleep(Math.min(10_000, timeoutMultiplier * 10)); // Wait for exchange.
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new CacheException("Query was interrupted.", e);
+        }
+    }
+
+    /**
+     * Check if query is cancelled.
+     *
+     * @param cancel Query cancel object.
+     * @throws CacheException If query was cancelled.
+     */
+    private void ensureQueryNotCancelled(GridQueryCancel cancel) {
+        if (Thread.currentThread().isInterrupted())
+            throw new CacheException(new IgniteInterruptedCheckedException("Query was interrupted."));
+
+        try {
+            cancel.checkCancelled();
+        }
+        catch (QueryCancelledException cancelEx) {
+            throw new CacheException("Failed to run reduce query locally. " + cancelEx.getMessage(),  cancelEx);
+        }
+
+        if (ctx.clientDisconnected()) {
+            throw new CacheException("Query was cancelled, client node disconnected.",
+                new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(),
+                    "Client node disconnected."));
+        }
+    }
+
+    /**
+     * Prepare map queries.
+     * @param qry TwoStep query.
+     * @param params Query parameters.
+     * @param singlePartMode Single partition mode flag.
+     * @return List of map queries.
+     */
+    @NotNull private List<GridCacheSqlQuery> prepareMapQueries(
+        GridCacheTwoStepQuery qry,
+        Object[] params,
+        boolean singlePartMode) {
+        List<GridCacheSqlQuery> mapQueries;
+
+        {
+            if (singlePartMode)
+                mapQueries = prepareMapQueryForSinglePartition(qry, params);
+            else {
+                mapQueries = new ArrayList<>(qry.mapQueries().size());
+
+                // Copy queries here because node ID will be changed below.
+                for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+                    final GridCacheSqlQuery copy = mapQry.copy();
+
+                    mapQueries.add(copy);
+
+                    if (qry.explain())
+                        copy.query("EXPLAIN " + mapQry.query()).parameterIndexes(mapQry.parameterIndexes());
+                }
+            }
+        }
+        return mapQueries;
+    }
+
+    /**
+     * Query run factory method.
+     *
+     * @param conn H2 connection.
+     * @param mapQueries Map queries.
+     * @param nodes Target nodes.
+     * @param pageSize Page size.
+     * @param segmentsPerIndex Segments per-index.
+     * @param skipMergeTbl Skip merge table flag.
+     * @param explain Explain query flag.
+     * @param dataPageScanEnabled DataPage scan enabled flag.
+     * @return Reduce query run.
+     */
+    @NotNull private ReduceQueryRun createReduceQueryRun(
+        JdbcConnection conn,
+        List<GridCacheSqlQuery> mapQueries,
+        Collection<ClusterNode> nodes,
+        int pageSize,
+        int segmentsPerIndex,
+        boolean skipMergeTbl,
+        boolean explain,
+        Boolean dataPageScanEnabled) {
+
+        final ReduceQueryRun r = new ReduceQueryRun(
+            conn,
+            mapQueries.size(),
+            pageSize,
+            dataPageScanEnabled
+        );
+
+        int tblIdx = 0;
+        int replicatedQrysCnt = 0;
+        for (GridCacheSqlQuery mapQry : mapQueries) {
+            Reducer reducer;
+
+            if (skipMergeTbl)
+                reducer = UnsortedReducer.createDummy(ctx);
+            else {
+                ReduceTable tbl;
+
+                try {
+                    tbl = createMergeTable(conn, mapQry, explain);
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+
+                reducer = tbl.getReducer();
+
+                fakeTable(conn, tblIdx++).innerTable(tbl);
+            }
+
+            // If the query has only replicated tables, we have to run it on a single node only.
+            if (!mapQry.isPartitioned()) {
+                ClusterNode node = F.rand(nodes);
+
+                mapQry.node(node.id());
+
+                replicatedQrysCnt++;
+
+                reducer.setSources(singletonList(node), 1); // Replicated tables can have only 1 segment.
+            }
+            else
+                reducer.setSources(nodes, segmentsPerIndex);
+
+            reducer.setPageSize(r.pageSize());
+
+            r.reducers().add(reducer);
+        }
+
+        r.init( (r.reducers().size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt);
+
+        return r;
+    }
+
+    /**
+     * Build query flags.
+     *
+     * @return Query flags.
+     */
+    private int queryFlags(GridCacheTwoStepQuery qry,
+        boolean enforceJoinOrder,
+        boolean lazy,
+        Boolean dataPageScanEnabled) {
+        if (qry.distributedJoins())
+            enforceJoinOrder = true;
+
+        return GridH2QueryRequest.queryFlags(qry.distributedJoins(),
+            enforceJoinOrder, lazy, qry.isReplicatedOnly(),
+            qry.explain(), dataPageScanEnabled);
+    }
+
+    /**
+     * Create mapping for query.
+     *
+     * @param qry Query.
+     * @param parts Partitions.
+     * @param cacheIds Cache ids.
+     * @param topVer Topology version.
+     * @return
+     */
+    private ReducePartitionMapResult createMapping(GridCacheTwoStepQuery qry,
+        @Nullable int[] parts,
+        List<Integer> cacheIds,
+        AffinityTopologyVersion topVer) {
+        if (qry.isLocalSplit() || !qry.hasCacheIds())
+            return new ReducePartitionMapResult(singletonList(ctx.discovery().localNode()), null, null);
+        else {
+            ReducePartitionMapResult nodesParts =
+                mapper.nodesForPartitions(cacheIds, topVer, parts, qry.isReplicatedOnly());
+
+            Collection<ClusterNode> nodes = nodesParts.nodes();
+
+            if (F.isEmpty(nodes))
+                return null;
+
+            if (qry.explain() || qry.isReplicatedOnly()) {
+                ClusterNode locNode = ctx.discovery().localNode();
+
+                // Always prefer local node if possible.
+                if (nodes.contains(locNode))
+                    nodes = singletonList(locNode);
+                else {
+                    // Select random data node to run query on a replicated data or
+                    // get EXPLAIN PLAN from a single node.
+                    nodes = singletonList(F.rand(nodes));
+                }
+
+                return new ReducePartitionMapResult(nodes,  nodesParts.partitionsMap(), nodesParts.queryPartitionsMap());
+            }
+
+            return nodesParts;
+        }
+    }
+
+    /**
      *
      * @param schemaName Schema name.
      * @param cacheIds Cache ids.
@@ -778,10 +842,8 @@ public class GridReduceQueryExecutor {
     ) {
         AffinityTopologyVersion topVer = h2.readyTopologyVersion();
 
-        final long reqId = qryIdGen.incrementAndGet();
-
         ReducePartitionMapResult nodesParts =
-            mapper.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, reqId);
+            mapper.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly);
 
         Collection<ClusterNode> nodes = nodesParts.nodes();
 
@@ -807,6 +869,8 @@ public class GridReduceQueryExecutor {
             }
         }
 
+        final long reqId = qryIdGen.incrementAndGet();
+
         final DmlDistributedUpdateRun r = new DmlDistributedUpdateRun(nodes.size());
 
         int flags = enforceJoinOrder ? GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER : 0;
@@ -917,7 +981,7 @@ public class GridReduceQueryExecutor {
         if (distributedJoins)
             send(nodes, new GridQueryCancelRequest(qryReqId), null, true);
 
-        for (ReduceIndex idx : r.indexes()) {
+        for (Reducer idx : r.reducers()) {
             if (!idx.fetchedAll()) {
                 if (!distributedJoins) // cancel request has been already sent for distributed join.
                     send(nodes, new GridQueryCancelRequest(qryReqId), null, true);
@@ -944,7 +1008,7 @@ public class GridReduceQueryExecutor {
      */
     private void awaitAllReplies(ReduceQueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
         throws IgniteInterruptedCheckedException, QueryCancelledException {
-        while (!U.await(r.latch(), 500, TimeUnit.MILLISECONDS)) {
+        while (!r.tryMapToSources(500, TimeUnit.MILLISECONDS)) {
 
             cancel.checkCancelled();
 
@@ -952,7 +1016,7 @@ public class GridReduceQueryExecutor {
                 if (!ctx.discovery().alive(node)) {
                     handleNodeLeft(r, node.id());
 
-                    assert r.latch().getCount() == 0;
+                    assert r.mapped();
 
                     return;
                 }
@@ -1004,7 +1068,7 @@ public class GridReduceQueryExecutor {
      */
     private Iterator<List<?>> explainPlan(JdbcConnection c, GridCacheTwoStepQuery qry, Object[] params)
         throws IgniteCheckedException {
-        List<List<?>> lists = new ArrayList<>();
+        List<List<?>> lists = new ArrayList<>(qry.mapQueries().size() + 1);
 
         for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) {
             ResultSet rs =
@@ -1172,20 +1236,20 @@ public class GridReduceQueryExecutor {
             ArrayList<Index> idxs = new ArrayList<>(2);
 
             if (explain) {
-                idxs.add(new ReduceIndexUnsorted(ctx, tbl,
+                idxs.add(new UnsortedReduceIndexAdapter(ctx, tbl,
                     sortedIndex ? MERGE_INDEX_SORTED : MERGE_INDEX_UNSORTED));
             }
             else if (sortedIndex) {
                 List<GridSqlSortColumn> sortCols = (List<GridSqlSortColumn>)qry.sortColumns();
 
-                ReduceIndexSorted sortedMergeIdx = new ReduceIndexSorted(ctx, tbl, MERGE_INDEX_SORTED,
+                SortedReduceIndexAdapter sortedMergeIdx = new SortedReduceIndexAdapter(ctx, tbl, MERGE_INDEX_SORTED,
                     GridSqlSortColumn.toIndexColumns(tbl, sortCols));
 
                 idxs.add(ReduceTable.createScanIndex(sortedMergeIdx));
                 idxs.add(sortedMergeIdx);
             }
             else
-                idxs.add(new ReduceIndexUnsorted(ctx, tbl, MERGE_INDEX_UNSORTED));
+                idxs.add(new UnsortedReduceIndexAdapter(ctx, tbl, MERGE_INDEX_UNSORTED));
 
             tbl.indexes(idxs);
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java
index 87d4c14..9609ec3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceBlockList.java
@@ -17,12 +17,11 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import org.apache.ignite.internal.util.typedef.internal.U;
-
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.RandomAccess;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  *
@@ -86,7 +85,7 @@ public class ReduceBlockList<Z> extends AbstractList<Z> implements RandomAccess
      * @return Last block.
      */
     public List<Z> lastBlock() {
-        return ReduceIndex.last(blocks);
+        return blocks.get(blocks.size() - 1);
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java
index 3ff3a15..8a035a6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexIterator.java
@@ -29,7 +29,7 @@ import org.h2.result.Row;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Iterator that transparently and sequentially traverses a bunch of {@link ReduceIndex} objects.
+ * Iterator that transparently and sequentially traverses a bunch of {@link AbstractReduceIndexAdapter} objects.
  */
 public class ReduceIndexIterator implements Iterator<List<?>>, AutoCloseable {
     /** Reduce query executor. */
@@ -48,7 +48,7 @@ public class ReduceIndexIterator implements Iterator<List<?>>, AutoCloseable {
     private final boolean distributedJoins;
 
     /** Iterator over indexes. */
-    private final Iterator<ReduceIndex> idxIter;
+    private final Iterator<Reducer> rdcIter;
 
     /** Current cursor. */
     private Cursor cursor;
@@ -85,7 +85,7 @@ public class ReduceIndexIterator implements Iterator<List<?>>, AutoCloseable {
         this.distributedJoins = distributedJoins;
         this.mvccTracker = mvccTracker;
 
-        idxIter = run.indexes().iterator();
+        rdcIter = run.reducers().iterator();
 
         advance();
     }
@@ -127,8 +127,8 @@ public class ReduceIndexIterator implements Iterator<List<?>>, AutoCloseable {
             boolean hasNext = false;
 
             while (cursor == null || !(hasNext = cursor.next())) {
-                if (idxIter.hasNext())
-                    cursor = idxIter.next().findInStream(null, null);
+                if (rdcIter.hasNext())
+                    cursor = rdcIter.next().find(null, null);
                 else {
                     releaseIfNeeded();
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapResult.java
index 83bbf8e..468f326 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapResult.java
@@ -17,11 +17,11 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import org.apache.ignite.cluster.ClusterNode;
-import org.h2.util.IntArray;
-
 import java.util.Collection;
 import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.h2.util.IntArray;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Result of nodes to partitions mapping for a query or update.
@@ -43,8 +43,9 @@ public class ReducePartitionMapResult {
      * @param partsMap Partitions map.
      * @param qryMap Nodes map.
      */
-    public ReducePartitionMapResult(Collection<ClusterNode> nodes, Map<ClusterNode, IntArray> partsMap,
-        Map<ClusterNode, IntArray> qryMap) {
+    public ReducePartitionMapResult(Collection<ClusterNode> nodes,
+        @Nullable Map<ClusterNode, IntArray> partsMap,
+        @Nullable Map<ClusterNode, IntArray> qryMap) {
         this.nodes = nodes;
         this.partsMap = partsMap;
         this.qryMap = qryMap;
@@ -60,14 +61,14 @@ public class ReducePartitionMapResult {
     /**
      * @return Maps a node to partition array.
      */
-    public Map<ClusterNode, IntArray> partitionsMap() {
+    public @Nullable Map<ClusterNode, IntArray> partitionsMap() {
         return partsMap;
     }
 
     /**
      * @return Maps a node to partition array.
      */
-    public Map<ClusterNode, IntArray> queryPartitionsMap() {
+    public @Nullable Map<ClusterNode, IntArray> queryPartitionsMap() {
         return qryMap;
     }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
index c898a5f..0fab5a2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReducePartitionMapper.java
@@ -31,14 +31,14 @@ import org.apache.ignite.cache.CacheServerNotFoundException;
 import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import org.apache.ignite.internal.util.GridIntIterator;
-import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.util.IntArray;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
@@ -75,11 +75,10 @@ public class ReducePartitionMapper {
      * @param topVer Topology version.
      * @param parts Partitions array.
      * @param isReplicatedOnly Allow only replicated caches.
-     * @param qryId Query ID.
      * @return Result.
      */
     public ReducePartitionMapResult nodesForPartitions(List<Integer> cacheIds, AffinityTopologyVersion topVer,
-        int[] parts, boolean isReplicatedOnly, long qryId) {
+        int[] parts, boolean isReplicatedOnly) {
         Collection<ClusterNode> nodes = null;
         Map<ClusterNode, IntArray> partsMap = null;
         Map<ClusterNode, IntArray> qryMap = null;
@@ -104,9 +103,9 @@ public class ReducePartitionMapper {
 
         if (isPreloadingActive(cacheIds)) {
             if (isReplicatedOnly)
-                nodes = replicatedUnstableDataNodes(cacheIds, qryId);
+                nodes = replicatedUnstableDataNodes(cacheIds);
             else {
-                partsMap = partitionedUnstableDataNodes(cacheIds, qryId);
+                partsMap = partitionedUnstableDataNodes(cacheIds);
 
                 if (partsMap != null) {
                     qryMap = narrowForQuery(partsMap, parts);
@@ -116,10 +115,14 @@ public class ReducePartitionMapper {
             }
         }
         else {
-            qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts, qryId);
+            if (parts == null)
+                nodes = stableDataNodes(cacheIds, topVer, isReplicatedOnly);
+            else {
+                qryMap = stableDataNodesForPartitions(topVer, cacheIds, parts);
 
-            if (qryMap != null)
-                nodes = qryMap.keySet();
+                if (qryMap != null)
+                    nodes = qryMap.keySet();
+            }
         }
 
         return new ReducePartitionMapResult(nodes, partsMap, qryMap);
@@ -163,41 +166,107 @@ public class ReducePartitionMapper {
     }
 
     /**
-     * @param isReplicatedOnly If we must only have replicated caches.
      * @param topVer Topology version.
      * @param cacheIds Participating cache IDs.
      * @param parts Partitions.
-     * @param qryId Query ID.
      * @return Data nodes or {@code null} if repartitioning started and we need to retry.
      */
-    private Map<ClusterNode, IntArray> stableDataNodes(boolean isReplicatedOnly, AffinityTopologyVersion topVer,
-        List<Integer> cacheIds, int[] parts, long qryId) {
+    private Map<ClusterNode, IntArray> stableDataNodesForPartitions(
+        AffinityTopologyVersion topVer,
+        List<Integer> cacheIds,
+        @NotNull int[] parts) {
+        assert parts != null;
+
+        GridCacheContext<?, ?> cctx = firstPartitionedCache(cacheIds);
+
+        Map<ClusterNode, IntArray> map = stableDataNodesMap(topVer, cctx, parts);
+
+        Set<ClusterNode> nodes = map.keySet();
+
+        if (narrowToCaches(cctx, nodes, cacheIds, topVer, parts, false) == null)
+            return null;
+
+        return map;
+    }
+
+    /**
+     * @param cacheIds Participating cache IDs.
+     * @param topVer Topology version.
+     * @param isReplicatedOnly If we must only have replicated caches.
+     * @return Data nodes or {@code null} if repartitioning started and we need to retry.
+     */
+    private Collection<ClusterNode> stableDataNodes(
+        List<Integer> cacheIds,
+        AffinityTopologyVersion topVer,
+        boolean isReplicatedOnly) {
+        GridCacheContext<?, ?> cctx = (isReplicatedOnly) ? cacheContext(cacheIds.get(0)) :
+            firstPartitionedCache(cacheIds);
+
+        Set<ClusterNode> nodes;
+
+        // Explicit partitions mapping is not applicable to replicated cache.
+        final AffinityAssignment topologyAssignment = cctx.affinity().assignment(topVer);
+
+        if (cctx.isReplicated()) {
+            // Mutable collection needed for this particular case.
+            nodes = isReplicatedOnly && cacheIds.size() > 1 ?
+                new HashSet<>(topologyAssignment.nodes()) : topologyAssignment.nodes();
+        }
+        else
+            nodes = topologyAssignment.primaryPartitionNodes();
+
+        return narrowToCaches(cctx, nodes, cacheIds, topVer, null, isReplicatedOnly);
+    }
+
+    /**
+     * If the first cache is not partitioned, find it (if it's present) and move it to index 0.
+     *
+     * @param cacheIds Cache ids collection.
+     * @return First partitioned cache.
+     */
+    private GridCacheContext<?, ?> firstPartitionedCache(List<Integer> cacheIds) {
         GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(0));
 
         // If the first cache is not partitioned, find it (if it's present) and move it to index 0.
-        if (!cctx.isPartitioned()) {
-            for (int cacheId = 1; cacheId < cacheIds.size(); cacheId++) {
-                GridCacheContext<?, ?> currCctx = cacheContext(cacheIds.get(cacheId));
+        if (cctx.isPartitioned())
+            return cctx;
 
-                if (currCctx.isPartitioned()) {
-                    Collections.swap(cacheIds, 0, cacheId);
+        for (int cacheId = 1; cacheId < cacheIds.size(); cacheId++) {
+            GridCacheContext<?, ?> currCctx = cacheContext(cacheIds.get(cacheId));
 
-                    cctx = currCctx;
+            if (currCctx.isPartitioned()) {
+                Collections.swap(cacheIds, 0, cacheId);
 
-                    break;
-                }
+                return currCctx;
             }
         }
 
-        Map<ClusterNode, IntArray> map = stableDataNodesMap(topVer, cctx, parts);
+        assert false;
 
-        Set<ClusterNode> nodes = map.keySet();
+        return cctx;
+    }
 
-        if (F.isEmpty(map))
+    /**
+     * @param cctx First cache context.
+     * @param nodes Nodes.
+     * @param cacheIds Query cache Ids.
+     * @param topVer Topology version.
+     * @param parts Query partitions.
+     * @param isReplicatedOnly Replicated only flag.
+     * @return
+     */
+    private Collection<ClusterNode> narrowToCaches(
+        GridCacheContext<?, ?> cctx,
+        Collection<ClusterNode> nodes,
+        List<Integer> cacheIds,
+        AffinityTopologyVersion topVer,
+        int[] parts,
+        boolean isReplicatedOnly) {
+        if (F.isEmpty(nodes))
             throw new CacheServerNotFoundException("Failed to find data nodes for cache: " + cctx.name());
 
         for (int i = 1; i < cacheIds.size(); i++) {
-            GridCacheContext<?,?> extraCctx = cacheContext(cacheIds.get(i));
+            GridCacheContext<?, ?> extraCctx = cacheContext(cacheIds.get(i));
 
             String extraCacheName = extraCctx.name();
 
@@ -209,7 +278,7 @@ public class ReducePartitionMapper {
                     "with partitioned tables [replicatedCache=" + cctx.name() +
                     ", partitionedCache=" + extraCacheName + "]");
 
-            Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, extraCctx, parts).keySet();
+            Set<ClusterNode> extraNodes = stableDataNodesSet(topVer, extraCctx, parts);
 
             if (F.isEmpty(extraNodes))
                 throw new CacheServerNotFoundException("Failed to find data nodes for cache: " + extraCacheName);
@@ -220,7 +289,7 @@ public class ReducePartitionMapper {
                 if (isReplicatedOnly) {
                     nodes.retainAll(extraNodes);
 
-                    disjoint = map.isEmpty();
+                    disjoint = nodes.isEmpty();
                 }
                 else
                     disjoint = !extraNodes.containsAll(nodes);
@@ -231,7 +300,7 @@ public class ReducePartitionMapper {
             if (disjoint) {
                 if (isPreloadingActive(cacheIds)) {
                     logRetry("Failed to calculate nodes for SQL query (got disjoint node map during rebalance) " +
-                        "[qryId=" + qryId + ", affTopVer=" + topVer + ", cacheIds=" + cacheIds +
+                        "[affTopVer=" + topVer + ", cacheIds=" + cacheIds +
                         ", parts=" + (parts == null ? "[]" : Arrays.toString(parts)) +
                         ", replicatedOnly=" + isReplicatedOnly + ", lastCache=" + extraCctx.name() +
                         ", lastCacheId=" + extraCctx.cacheId() + ']');
@@ -244,7 +313,7 @@ public class ReducePartitionMapper {
             }
         }
 
-        return map;
+        return nodes;
     }
 
     /**
@@ -253,39 +322,19 @@ public class ReducePartitionMapper {
      * @param parts Partitions.
      */
     private Map<ClusterNode, IntArray> stableDataNodesMap(AffinityTopologyVersion topVer,
-        final GridCacheContext<?, ?> cctx, @Nullable final int[] parts) {
-
-        Map<ClusterNode, IntArray> mapping = new HashMap<>();
-
-        // Explicit partitions mapping is not applicable to replicated cache.
-        if (cctx.isReplicated()) {
-            for (ClusterNode clusterNode : cctx.affinity().assignment(topVer).nodes())
-                mapping.put(clusterNode, null);
-
-            return mapping;
-        }
+        final GridCacheContext<?, ?> cctx, final @NotNull int[] parts) {
+        assert !cctx.isReplicated();
 
         List<List<ClusterNode>> assignment = cctx.affinity().assignment(topVer).assignment();
 
-        boolean needPartsFilter = parts != null;
-
-        GridIntIterator iter = needPartsFilter ? new GridIntList(parts).iterator() :
-            U.forRange(0, cctx.affinity().partitions());
-
-        while(iter.hasNext()) {
-            int partId = iter.next();
+        Map<ClusterNode, IntArray> mapping = new HashMap<>();
 
-            List<ClusterNode> partNodes = assignment.get(partId);
+        for (int part : parts) {
+            List<ClusterNode> partNodes = assignment.get(part);
 
             if (!partNodes.isEmpty()) {
                 ClusterNode prim = partNodes.get(0);
 
-                if (!needPartsFilter) {
-                    mapping.put(prim, null);
-
-                    continue;
-                }
-
                 IntArray partIds = mapping.get(prim);
 
                 if (partIds == null) {
@@ -294,7 +343,7 @@ public class ReducePartitionMapper {
                     mapping.put(prim, partIds);
                 }
 
-                partIds.add(partId);
+                partIds.add(part);
             }
         }
 
@@ -302,14 +351,46 @@ public class ReducePartitionMapper {
     }
 
     /**
+     * Note: This may return unmodifiable set.
+     *
+     * @param topVer Topology version.
+     * @param cctx Cache context.
+     * @param parts Partitions.
+     */
+    private Set<ClusterNode> stableDataNodesSet(AffinityTopologyVersion topVer,
+        final GridCacheContext<?, ?> cctx, @Nullable final int[] parts) {
+
+        // Explicit partitions mapping is not applicable to replicated cache.
+        final AffinityAssignment topologyAssignment = cctx.affinity().assignment(topVer);
+
+        if (cctx.isReplicated())
+            return topologyAssignment.nodes();
+
+        if (parts == null)
+            return topologyAssignment.primaryPartitionNodes();
+
+        List<List<ClusterNode>> assignment = topologyAssignment.assignment();
+
+        Set<ClusterNode> nodes = new HashSet<>();
+
+        for (int part : parts) {
+            List<ClusterNode> partNodes = assignment.get(part);
+
+            if (!partNodes.isEmpty())
+                nodes.add(partNodes.get(0));
+        }
+
+        return nodes;
+    }
+
+    /**
      * Calculates partition mapping for partitioned cache on unstable topology.
      *
      * @param cacheIds Cache IDs.
-     * @param qryId Query ID.
      * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
      */
     @SuppressWarnings("unchecked")
-    private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(List<Integer> cacheIds, long qryId) {
+    private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(List<Integer> cacheIds) {
         // If the main cache is replicated, just replace it with the first partitioned.
         GridCacheContext<?,?> cctx = findFirstPartitioned(cacheIds);
 
@@ -346,7 +427,7 @@ public class ReducePartitionMapper {
                 }
                 else if (!F.isEmpty(dataNodes(cctx.groupId(), NONE))) {
                     logRetry("Failed to calculate nodes for SQL query (partition has no owners, but corresponding " +
-                        "cache group has data nodes) [qryId=" + qryId + ", cacheIds=" + cacheIds +
+                        "cache group has data nodes) [cacheIds=" + cacheIds +
                         ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() + ", part=" + p +
                         ", cacheGroupId=" + cctx.groupId() + ']');
 
@@ -381,8 +462,7 @@ public class ReducePartitionMapper {
                     if (F.isEmpty(owners)) {
                         if (!F.isEmpty(dataNodes(extraCctx.groupId(), NONE))) {
                             logRetry("Failed to calculate nodes for SQL query (partition has no owners, but " +
-                                "corresponding cache group has data nodes) [qryId=" + qryId +
-                                ", cacheIds=" + cacheIds + ", cacheName=" + extraCctx.name() +
+                                "corresponding cache group has data nodes) [ cacheIds=" + cacheIds + ", cacheName=" + extraCctx.name() +
                                 ", cacheId=" + extraCctx.cacheId() + ", part=" + p +
                                 ", cacheGroupId=" + extraCctx.groupId() + ']');
 
@@ -400,7 +480,7 @@ public class ReducePartitionMapper {
 
                         if (partLocs[p].isEmpty()) {
                             logRetry("Failed to calculate nodes for SQL query (caches have no common data nodes for " +
-                                "partition) [qryId=" + qryId + ", cacheIds=" + cacheIds +
+                                "partition) [cacheIds=" + cacheIds +
                                 ", lastCacheName=" + extraCctx.name() + ", lastCacheId=" + extraCctx.cacheId() +
                                 ", part=" + p + ']');
 
@@ -417,7 +497,7 @@ public class ReducePartitionMapper {
                 if (!extraCctx.isReplicated())
                     continue;
 
-                Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx, qryId);
+                Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx);
 
                 if (F.isEmpty(dataNodes))
                     return null; // Retry.
@@ -432,7 +512,7 @@ public class ReducePartitionMapper {
 
                     if (partLoc.isEmpty()) {
                         logRetry("Failed to calculate nodes for SQL query (caches have no common data nodes for " +
-                            "partition) [qryId=" + qryId + ", cacheIds=" + cacheIds +
+                            "partition) [cacheIds=" + cacheIds +
                             ", lastReplicatedCacheName=" + extraCctx.name() +
                             ", lastReplicatedCacheId=" + extraCctx.cacheId() + ", part=" + part + ']');
 
@@ -474,10 +554,9 @@ public class ReducePartitionMapper {
      * Calculates data nodes for replicated caches on unstable topology.
      *
      * @param cacheIds Cache IDs.
-     * @param qryId Query ID.
      * @return Collection of all data nodes owning all the caches or {@code null} for retry.
      */
-    private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> cacheIds, long qryId) {
+    private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> cacheIds) {
         int i = 0;
 
         GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i++));
@@ -492,7 +571,7 @@ public class ReducePartitionMapper {
             assert cctx.isReplicated(): "all the extra caches must be replicated here";
         }
 
-        Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx, qryId);
+        Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx);
 
         if (F.isEmpty(nodes))
             return null; // Retry.
@@ -508,7 +587,7 @@ public class ReducePartitionMapper {
                     "with tables in partitioned caches [replicatedCache=" + cctx.name() + ", " +
                     "partitionedCache=" + extraCctx.name() + "]");
 
-            Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx, qryId);
+            Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx);
 
             if (F.isEmpty(extraOwners))
                 return null; // Retry.
@@ -517,7 +596,7 @@ public class ReducePartitionMapper {
 
             if (nodes.isEmpty()) {
                 logRetry("Failed to calculate nodes for SQL query (got disjoint node map for REPLICATED caches " +
-                    "during rebalance) [qryId=" + qryId + ", cacheIds=" + cacheIds +
+                    "during rebalance) [cacheIds=" + cacheIds +
                     ", lastCache=" + extraCctx.name() + ", lastCacheId=" + extraCctx.cacheId() + ']');
 
                 return null; // Retry.
@@ -531,10 +610,9 @@ public class ReducePartitionMapper {
      * Collects all the nodes owning all the partitions for the given replicated cache.
      *
      * @param cctx Cache context.
-     * @param qryId Query ID.
      * @return Owning nodes or {@code null} if we can't find owners for some partitions.
      */
-    private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx, long qryId) {
+    private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx) {
         assert cctx.isReplicated() : cctx.name() + " must be replicated";
 
         String cacheName = cctx.name();
@@ -550,7 +628,7 @@ public class ReducePartitionMapper {
 
             if (F.isEmpty(owners)) {
                 logRetry("Failed to calculate nodes for SQL query (partition of a REPLICATED cache has no owners) [" +
-                    "qryId=" + qryId + ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() +
+                    "cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() +
                     ", part=" + p + ']');
 
                 return null; // Retry.
@@ -560,7 +638,7 @@ public class ReducePartitionMapper {
 
             if (dataNodes.isEmpty()) {
                 logRetry("Failed to calculate nodes for SQL query (partitions of a REPLICATED has no common owners) [" +
-                    "qryId=" + qryId + ", cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() +
+                    "cacheName=" + cctx.name() + ", cacheId=" + cctx.cacheId() +
                     ", lastPart=" + p + ']');
 
                 return null; // Retry.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
index 1fb3c78..1468eb3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -17,17 +17,17 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import java.sql.Connection;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
-
-import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.jdbc.JdbcConnection;
 import org.jetbrains.annotations.Nullable;
 
@@ -36,7 +36,7 @@ import org.jetbrains.annotations.Nullable;
  */
 public class ReduceQueryRun {
     /** */
-    private final List<ReduceIndex> idxs;
+    private final List<Reducer> idxs;
 
     /** */
     private CountDownLatch latch;
@@ -61,16 +61,18 @@ public class ReduceQueryRun {
      * @param dataPageScanEnabled If data page scan is enabled.
      */
     ReduceQueryRun(
-        Connection conn,
+        JdbcConnection conn,
         int idxsCnt,
         int pageSize,
         Boolean dataPageScanEnabled
     ) {
-        this.conn = (JdbcConnection)conn;
+        assert pageSize > 0;
+
+        this.conn = conn;
 
         idxs = new ArrayList<>(idxsCnt);
 
-        this.pageSize = pageSize > 0 ? pageSize : Query.DFLT_PAGE_SIZE;
+        this.pageSize = pageSize;
         this.dataPageScanEnabled  = dataPageScanEnabled;
     }
 
@@ -126,8 +128,8 @@ public class ReduceQueryRun {
         while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
             latch.countDown();
 
-        for (ReduceIndex idx : idxs) // Fail all merge indexes.
-            idx.fail(state.nodeId, state.ex);
+        for (Reducer idx : idxs) // Fail all merge indexes.
+            idx.onFailure(state.nodeId, state.ex);
     }
 
     /**
@@ -195,22 +197,47 @@ public class ReduceQueryRun {
     /**
      * @return Indexes.
      */
-    List<ReduceIndex> indexes() {
+    List<Reducer> reducers() {
         return idxs;
     }
 
     /**
-     * @return Latch.
+     * Initialize.
+     *
+     * @param srcSegmentCnt Total number of source segments.
+     */
+    void init(int srcSegmentCnt) {
+        assert latch == null;
+
+        latch = new CountDownLatch(srcSegmentCnt);
+    }
+
+    /**
+     * First page callback.
      */
-    CountDownLatch latch() {
-        return latch;
+    void onFirstPage() {
+        latch.countDown();
+    }
+
+    /**
+     * Try map query to sources.
+     *
+     * @param time Timeout.
+     * @param timeUnit Timeunit.
+     * @return {@code True} if first pages are received from all sources, {@code False} otherwise.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    boolean tryMapToSources(long time, TimeUnit timeUnit) throws IgniteInterruptedCheckedException {
+        assert latch != null;
+
+        return U.await(latch, time, timeUnit);
     }
 
     /**
-     * @param latch Latch.
+     * @return {@code True} if first pages are received from all sources, {@code False} otherwise.
      */
-    void latch(CountDownLatch latch) {
-        this.latch = latch;
+    boolean mapped() {
+        return latch != null && latch.getCount() == 0;
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java
index 0030fcd..9897edb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceTable.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
 import java.util.ArrayList;
-
 import org.apache.ignite.internal.processors.query.h2.opt.H2ScanIndex;
 import org.apache.ignite.internal.util.typedef.F;
 import org.h2.command.ddl.CreateTableData;
@@ -61,15 +60,19 @@ public class ReduceTable extends TableBase {
     /**
      * @return Merge index.
      */
-    public ReduceIndex getMergeIndex() {
-        return (ReduceIndex)idxs.get(idxs.size() - 1); // Sorted index must be the last.
+    public Reducer getReducer() {
+        final Index index = idxs.get(idxs.size() - 1);
+
+        assert index instanceof AbstractReduceIndexAdapter : "Reducer index not found.";
+
+        return ((AbstractReduceIndexAdapter)index).reducer(); // Sorted index must be the last.
     }
 
     /**
      * @param idx Index.
      * @return Scan index.
      */
-    public static H2ScanIndex<ReduceIndex> createScanIndex(ReduceIndex idx) {
+    public static H2ScanIndex<AbstractReduceIndexAdapter> createScanIndex(AbstractReduceIndexAdapter idx) {
         return new H2ScanIndex<>(idx);
     }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/Reducer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/Reducer.java
new file mode 100644
index 0000000..b6e9226
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/Reducer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.UUID;
+import javax.cache.CacheException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.h2.index.Cursor;
+import org.h2.index.Index;
+import org.h2.result.SearchRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Reducer for remote index lookup results.
+ */
+interface Reducer {
+    /**
+     * Check if node with given nodeId is data source node for the index.
+     *
+     * @param nodeId Node ID.
+     * @return {@code true} If this index needs data from the given source node, {@code false} otherwise.
+     */
+    boolean hasSource(UUID nodeId);
+
+    /**
+     * Set source nodes.
+     *
+     * @param nodes Nodes.
+     * @param segmentsCnt Index segments per table.
+     */
+    void setSources(Collection<ClusterNode> nodes, int segmentsCnt);
+
+    /**
+     * @return Return source nodes for this merge index.
+     */
+    Set<UUID> sources();
+
+    /**
+     * Offer result page for reduce.
+     *
+     * @param page Page.
+     */
+    void addPage(ReduceResultPage page);
+
+    /**
+     * @param pageSize Page size.
+     */
+    void setPageSize(int pageSize);
+
+    /**
+     * Check if all rows has been fetched from all sources.
+     *
+     * @return {@code true} If all rows has been fetched, {@code false} otherwise.
+     */
+    boolean fetchedAll();
+
+    /**
+     * Find a rows and create a cursor to iterate over the result.
+     *
+     * @param first Lower bound.
+     * @param last Upper bound.
+     * @return Cursor instance.
+     */
+    Cursor find(@Nullable SearchRow first, @Nullable SearchRow last);
+
+    /**
+     * Fail cursor callback.
+     *
+     * @param nodeId Node ID.
+     * @param e Exception.
+     */
+    void onFailure(UUID nodeId, CacheException e);
+
+    /**
+     * Rows comparator.
+     * See {@link Index}
+     */
+    interface RowComparator {
+        /**
+         * Compare two rows.
+         *
+         * @param rowData the first row
+         * @param compare the second row
+         * @return 0 if both rows are equal, -1 if the first row is smaller,
+         *         otherwise 1
+         */
+        int compareRows(SearchRow rowData, SearchRow compare);
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/SortedReduceIndexAdapter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/SortedReduceIndexAdapter.java
new file mode 100644
index 0000000..dd430a7
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/SortedReduceIndexAdapter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.HashSet;
+import org.apache.ignite.internal.GridKernalContext;
+import org.h2.engine.Session;
+import org.h2.index.Index;
+import org.h2.index.IndexType;
+import org.h2.result.SortOrder;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
+import org.h2.table.TableFilter;
+
+/**
+ * H2 {@link Index} adapter for {@link SortedReducer}.
+ */
+public final class SortedReduceIndexAdapter extends AbstractReduceIndexAdapter {
+    /** */
+    private static final IndexType TYPE = IndexType.createNonUnique(false);
+
+    /** */
+    private final SortedReducer delegate;
+
+    /**
+     * @param ctx Kernal context.
+     * @param tbl Table.
+     * @param name Index name,
+     * @param cols Columns.
+     */
+    public SortedReduceIndexAdapter(
+        GridKernalContext ctx,
+        ReduceTable tbl,
+        String name,
+        IndexColumn[] cols
+    ) {
+        super(ctx, tbl, name, TYPE, cols);
+
+        delegate = new SortedReducer(ctx, this::compareRows);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected SortedReducer reducer() {
+        return delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder,
+        HashSet<Column> allColumnsSet) {
+        return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, false, allColumnsSet);
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/SortedReducer.java
similarity index 54%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexSorted.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/SortedReducer.java
index 4dcfbbf..26cc81a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexSorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/SortedReducer.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -37,45 +37,49 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
 import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRowFactory;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.h2.engine.Session;
 import org.h2.index.Cursor;
-import org.h2.index.IndexType;
+import org.h2.message.DbException;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
-import org.h2.result.SortOrder;
-import org.h2.table.Column;
-import org.h2.table.IndexColumn;
-import org.h2.table.TableFilter;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.emptyIterator;
+import static java.util.Objects.requireNonNull;
 
 /**
- * Sorted index.
+ * Sorted merge index.
  */
-public final class ReduceIndexSorted extends ReduceIndex {
+public class SortedReducer extends AbstractReducer {
     /** */
-    private static final IndexType TYPE = IndexType.createNonUnique(false);
+    @SuppressWarnings("ComparatorMethodParameterNotUsed")
+    protected final Comparator<SearchRow> firstRowCmp = (rowInList, searchRow) -> {
+        int res = compareRows(rowInList, searchRow);
 
-    /** */
-    private final Comparator<RowStream> streamCmp = new Comparator<RowStream>() {
-        @Override public int compare(RowStream o1, RowStream o2) {
-            if (o1 == o2) // both nulls
-                return 0;
-
-            if (o1 == null)
-                return -1;
+        return res == 0 ? 1 : res;
+    };
 
-            if (o2 == null)
-                return 1;
+    /** */
+    @SuppressWarnings("ComparatorMethodParameterNotUsed")
+    protected final Comparator<SearchRow> lastRowCmp = (rowInList, searchRow) -> {
+        int res = compareRows(rowInList, searchRow);
 
-            return compareRows(o1.get(), o2.get());
-        }
+        return res == 0 ? -1 : res;
     };
 
     /** */
-    private Map<UUID,RowStream[]> streamsMap;
+    private final Comparator<RowStream> streamCmp = (o1, o2) -> {
+        if (o1 == o2) // both nulls
+            return 0;
+
+        if (o1 == null)
+            return -1;
+
+        if (o2 == null)
+            return 1;
+
+        return compareRows(o1.get(), o2.get());
+    };
 
     /** */
     private final Lock lock = new ReentrantLock();
@@ -84,24 +88,40 @@ public final class ReduceIndexSorted extends ReduceIndex {
     private final Condition notEmpty = lock.newCondition();
 
     /** */
+    private final RowComparator rowComparator;
+
+    /** */
+    private Map<UUID, RowStream[]> streamsMap;
+
+    /** */
     private ReduceResultPage failPage;
 
     /** */
     private MergeStreamIterator it;
 
     /**
+     *  Constructor.
+     *
      * @param ctx Kernal context.
-     * @param tbl Table.
-     * @param name Index name,
-     * @param cols Columns.
+     * @param rowComparator Row comparator.
      */
-    public ReduceIndexSorted(
-        GridKernalContext ctx,
-        ReduceTable tbl,
-        String name,
-        IndexColumn[] cols
-    ) {
-        super(ctx, tbl, name, TYPE, cols);
+    public SortedReducer(GridKernalContext ctx, final RowComparator rowComparator) {
+        super(ctx);
+
+        this.rowComparator = rowComparator;
+
+    }
+
+    /**
+     * Compare two rows.
+     *
+     * @param rowData the first row
+     * @param compare the second row
+     * @return 0 if both rows are equal, -1 if the first row is smaller,
+     *         otherwise 1
+     */
+    private int compareRows(SearchRow rowData, SearchRow compare) {
+        return rowComparator.compareRows(rowData, compare);
     }
 
     /** {@inheritDoc} */
@@ -132,36 +152,8 @@ public final class ReduceIndexSorted extends ReduceIndex {
     }
 
     /** {@inheritDoc} */
-    @Override protected void addPage0(ReduceResultPage page) {
-        if (page.isFail()) {
-            lock.lock();
-
-            try {
-                if (failPage == null) {
-                    failPage = page;
-
-                    notEmpty.signalAll();
-                }
-            }
-            finally {
-                lock.unlock();
-            }
-        }
-        else {
-            UUID src = page.source();
-
-            streamsMap.get(src)[page.segmentId()].addPage(page);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void checkBounds(Row lastEvictedRow, SearchRow first, SearchRow last) {
-        // If our last evicted fetched row was smaller than the given lower bound,
-        // then we are ok. This is important for merge join to work.
-        if (lastEvictedRow != null && first != null && compareRows(lastEvictedRow, first) < 0)
-            return;
-
-        super.checkBounds(lastEvictedRow, first, last);
+    @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
+        return new FetchingCursor(first, last, it);
     }
 
     /** {@inheritDoc} */
@@ -187,14 +179,41 @@ public final class ReduceIndexSorted extends ReduceIndex {
         return new GridH2Cursor(iter);
     }
 
-    /** {@inheritDoc} */
-    @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder, HashSet<Column> allColumnsSet) {
-        return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, false, allColumnsSet);
+    /**
+     * @param lastEvictedRow Last evicted fetched row.
+     * @param first Lower bound.
+     * @param last Upper bound.
+     */
+    @Override protected void checkBounds(Row lastEvictedRow, SearchRow first, SearchRow last) {
+        // If our last evicted fetched row was smaller than the given lower bound,
+        // then we are ok. This is important for merge join to work.
+        if (lastEvictedRow != null && first != null && compareRows(lastEvictedRow, first) < 0)
+            return;
+
+        super.checkBounds(lastEvictedRow, first, last);
     }
 
     /** {@inheritDoc} */
-    @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
-        return new FetchingCursor(first, last, it);
+    @Override protected void addPage0(ReduceResultPage page) {
+        if (page.isFail()) {
+            lock.lock();
+
+            try {
+                if (failPage == null) {
+                    failPage = page;
+
+                    notEmpty.signalAll();
+                }
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+        else {
+            UUID src = page.source();
+
+            streamsMap.get(src)[page.segmentId()].addPage(page);
+        }
     }
 
     /**
@@ -205,7 +224,7 @@ public final class ReduceIndexSorted extends ReduceIndex {
         private boolean first = true;
 
         /** */
-        private volatile int off;
+        private int off;
 
         /** */
         private boolean hasNext;
@@ -297,13 +316,13 @@ public final class ReduceIndexSorted extends ReduceIndex {
      */
     private final class RowStream implements Pollable<ReduceResultPage> {
         /** */
-        Iterator<Value[]> iter = emptyIterator();
+        private Iterator<Value[]> iter = emptyIterator();
 
         /** */
-        Row cur;
+        private Row cur;
 
         /** */
-        ReduceResultPage nextPage;
+        private ReduceResultPage nextPage;
 
         /**
          * @param page Page.
@@ -384,4 +403,197 @@ public final class ReduceIndexSorted extends ReduceIndex {
             return cur;
         }
     }
+
+    /**
+     * Fetching cursor.
+     */
+    private class FetchingCursor implements Cursor {
+        /** */
+        private Iterator<Row> stream;
+
+        /** */
+        private List<Row> rows;
+
+        /** */
+        private int cur;
+
+        /** */
+        private SearchRow first;
+
+        /** */
+        private SearchRow last;
+
+        /** */
+        private int lastFound = Integer.MAX_VALUE;
+
+        /**
+         * @param first Lower bound.
+         * @param last Upper bound.
+         * @param stream Stream of all the rows from remote nodes.
+         */
+         FetchingCursor(SearchRow first, SearchRow last, Iterator<Row> stream) {
+            assert stream != null;
+
+            // Initially we will use all the fetched rows, after we will switch to the last block.
+            rows = fetched;
+
+            this.stream = stream;
+            this.first = first;
+            this.last = last;
+
+            if (haveBounds() && !rows.isEmpty())
+                cur = findBounds();
+
+            cur--; // Set current position before the first row.
+        }
+
+        /**
+         * @return {@code true} If we have bounds.
+         */
+        private boolean haveBounds() {
+            return first != null || last != null;
+        }
+
+        /**
+         * @return Lower bound.
+         */
+        private int findBounds() {
+            assert !rows.isEmpty(): "rows";
+
+            int firstFound = cur;
+
+            // Find the lower bound.
+            if (first != null) {
+                firstFound = binarySearchRow(rows, first, firstRowCmp, true);
+
+                assert firstFound >= cur && firstFound <= rows.size(): "firstFound";
+
+                if (firstFound == rows.size())
+                    return firstFound; // The lower bound is greater than all the rows we have.
+
+                first = null; // We have found the lower bound, do not need it anymore.
+            }
+
+            // Find the upper bound.
+            if (last != null) {
+                assert lastFound == Integer.MAX_VALUE: "lastFound";
+
+                int lastFound0 = binarySearchRow(rows, last, lastRowCmp, true);
+
+                // If the upper bound is too large we will ignore it.
+                if (lastFound0 != rows.size())
+                    lastFound = lastFound0;
+            }
+
+            return firstFound;
+        }
+
+        /**
+         * Fetch rows from the stream.
+         */
+        private void fetchRows() {
+            for (;;) {
+                // Take the current last block and set the position after last.
+                rows = fetched.lastBlock();
+                cur = rows.size();
+
+                // Fetch stream.
+                while (stream.hasNext()) {
+                    fetched.add(requireNonNull(stream.next()));
+
+                    // Evict block if we've fetched too many rows.
+                    if (fetched.size() == MAX_FETCH_SIZE) {
+                        onBlockEvict(fetched.evictFirstBlock());
+
+                        assert fetched.size() < MAX_FETCH_SIZE;
+                    }
+
+                    // No bounds -> no need to do binary search, can return the fetched row right away.
+                    if (!haveBounds())
+                        break;
+
+                    // When the last block changed, it means that we've filled the current last block.
+                    // We have fetched the needed number of rows for binary search.
+                    if (fetched.lastBlock() != rows) {
+                        assert fetched.lastBlock().isEmpty(); // The last row must be added to the previous block.
+
+                        break;
+                    }
+                }
+
+                if (cur == rows.size())
+                    cur = Integer.MAX_VALUE; // We were not able to fetch anything. Done.
+                else {
+                    // Update fetched count.
+                    if (haveBounds()) {
+                        cur = findBounds();
+
+                        if (cur == rows.size())
+                            continue; // The lower bound is too large, continue fetching rows.
+                    }
+                }
+
+                return;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (cur == Integer.MAX_VALUE)
+                return false;
+
+            if (++cur == rows.size())
+                fetchRows();
+
+            return cur < lastFound;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Row get() {
+            return rows.get(cur);
+        }
+
+        /** {@inheritDoc} */
+        @Override public SearchRow getSearchRow() {
+            return get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean previous() {
+            // Should never be called.
+            throw DbException.getUnsupportedException("previous");
+        }
+    }
+
+    /**
+     * @param rows Sorted rows list.
+     * @param searchRow Search row.
+     * @param cmp Comparator.
+     * @param checkLast If we need to optimistically check the last row right away.
+     * @return Insertion point for the search row.
+     */
+    protected static int binarySearchRow(
+        List<Row> rows,
+        SearchRow searchRow,
+        Comparator<SearchRow> cmp,
+        boolean checkLast
+    ) {
+        assert !rows.isEmpty();
+
+        // Optimistically compare with the last row as a first step.
+        if (checkLast) {
+            int res = cmp.compare(last(rows), searchRow);
+
+            assert res != 0; // Comparators must never return 0 here.
+
+            if (res < 0)
+                return rows.size(); // The search row is greater than the last row.
+        }
+
+        int res = Collections.binarySearch(rows, searchRow, cmp);
+
+        assert res < 0: res; // Comparator must never return 0.
+
+        return -res - 1;
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReduceIndexAdapter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReduceIndexAdapter.java
new file mode 100644
index 0000000..b78b520
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReduceIndexAdapter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.HashSet;
+import org.apache.ignite.internal.GridKernalContext;
+import org.h2.engine.Session;
+import org.h2.index.Index;
+import org.h2.index.IndexType;
+import org.h2.result.SortOrder;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
+import org.h2.table.TableFilter;
+
+/**
+ * H2 {@link Index} adapter for {@link UnsortedReducer}.
+ */
+public final class UnsortedReduceIndexAdapter extends AbstractReduceIndexAdapter {
+    /** Index type. */
+    private static final IndexType TYPE = IndexType.createScan(false);
+
+    /** */
+    private final UnsortedReducer delegate;
+
+    /**
+     * @param ctx Context.
+     * @param tbl Table.
+     * @param name Index name.
+     */
+    public UnsortedReduceIndexAdapter(GridKernalContext ctx, ReduceTable tbl, String name) {
+        super(ctx, tbl, name, TYPE, IndexColumn.wrap(tbl.getColumns()));
+
+        delegate = new UnsortedReducer(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected UnsortedReducer reducer() {
+        return delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder,
+        HashSet<Column> allColumnsSet) {
+        return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, true, allColumnsSet);
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReducer.java
similarity index 53%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexUnsorted.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReducer.java
index 951c63b..7c7d428 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/UnsortedReducer.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -28,78 +27,68 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
 import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRowFactory;
-import org.h2.engine.Session;
 import org.h2.index.Cursor;
-import org.h2.index.IndexType;
+import org.h2.message.DbException;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
-import org.h2.result.SortOrder;
-import org.h2.table.Column;
-import org.h2.table.IndexColumn;
-import org.h2.table.TableFilter;
 import org.h2.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Objects.requireNonNull;
 
 /**
  * Unsorted merge index.
  */
-public final class ReduceIndexUnsorted extends ReduceIndex {
-    /** */
-    private static final IndexType TYPE = IndexType.createScan(false);
-
+public class UnsortedReducer extends AbstractReducer {
     /** */
     private final PollableQueue<ReduceResultPage> queue = new PollableQueue<>();
 
     /** */
-    private final AtomicInteger activeSources = new AtomicInteger(-1);
+    private final AtomicInteger activeSourcesCnt = new AtomicInteger(-1);
 
     /** */
     private Iterator<Value[]> iter = Collections.emptyIterator();
 
     /**
+     * Constructor.
+     *
      * @param ctx Context.
-     * @param tbl  Table.
-     * @param name Index name.
      */
-    public ReduceIndexUnsorted(GridKernalContext ctx, ReduceTable tbl, String name) {
-        super(ctx, tbl, name, TYPE, IndexColumn.wrap(tbl.getColumns()));
+    public UnsortedReducer(GridKernalContext ctx) {
+        super(ctx);
     }
 
     /**
      * @param ctx Context.
      * @return Dummy index instance.
      */
-    public static ReduceIndexUnsorted createDummy(GridKernalContext ctx) {
-        return new ReduceIndexUnsorted(ctx);
-    }
-
-    /**
-     * @param ctx Context.
-     */
-    private ReduceIndexUnsorted(GridKernalContext ctx) {
-        super(ctx);
+    public static UnsortedReducer createDummy(GridKernalContext ctx) {
+        return new UnsortedReducer(ctx);
     }
 
     /** {@inheritDoc} */
     @Override public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
         super.setSources(nodes, segmentsCnt);
 
-        int x = nodes.size() * segmentsCnt;
+        int x = srcNodes.size() * segmentsCnt;
 
-        assert x > 0: x;
+        assert x > 0 : x;
 
-        activeSources.set(x);
+        activeSourcesCnt.set(x);
     }
 
     /** {@inheritDoc} */
     @Override public boolean fetchedAll() {
-        int x = activeSources.get();
+        int x = activeSourcesCnt.get();
 
-        assert x >= 0: x; // This method must not be called if the sources were not set.
+        assert x >= 0 : x; // This method must not be called if the sources were not set.
 
         return x == 0 && queue.isEmpty();
     }
 
-    /** {@inheritDoc} */
+    /**
+     * @param page Page.
+     */
     @Override protected void addPage0(ReduceResultPage page) {
         assert page.rowsInPage() > 0 || page.isLast() || page.isFail();
 
@@ -108,9 +97,9 @@ public final class ReduceIndexUnsorted extends ReduceIndex {
             queue.add(page);
 
         if (page.isLast()) {
-            int x = activeSources.decrementAndGet();
+            int x = activeSourcesCnt.decrementAndGet();
 
-            assert x >= 0: x;
+            assert x >= 0 : x;
 
             if (x == 0) // Always terminate with empty iterator.
                 queue.add(createDummyLastPage(page));
@@ -118,12 +107,7 @@ public final class ReduceIndexUnsorted extends ReduceIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder, HashSet<Column> allColumnsSet) {
-        return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, true, allColumnsSet);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Cursor findAllFetched(List<Row> fetched, SearchRow first, SearchRow last) {
+    @Override protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) {
         // This index is unsorted: have to ignore bounds.
         return new GridH2Cursor(fetched.iterator());
     }
@@ -131,7 +115,7 @@ public final class ReduceIndexUnsorted extends ReduceIndex {
     /** {@inheritDoc} */
     @Override protected Cursor findInStream(SearchRow first, SearchRow last) {
         // This index is unsorted: have to ignore bounds.
-        return new FetchingCursor(null, null, new Iterator<Row>() {
+        return new FetchingCursor(new Iterator<Row>() {
             @Override public boolean hasNext() {
                 iter = pollNextIterator(queue, iter);
 
@@ -149,8 +133,88 @@ public final class ReduceIndexUnsorted extends ReduceIndex {
     }
 
     /**
+     * Fetching cursor.
+     */
+    private class FetchingCursor implements Cursor {
+        /** */
+        private Iterator<Row> stream;
+
+        /** */
+        private List<Row> rows;
+
+        /** */
+        private int cur;
+
+        /**
+         * @param stream Stream of all the rows from remote nodes.
+         */
+         FetchingCursor(Iterator<Row> stream) {
+            assert stream != null;
+
+            // Initially we will use all the fetched rows, after we will switch to the last block.
+            rows = fetched;
+
+            this.stream = stream;
+
+            cur--; // Set current position before the first row.
+        }
+
+        /**
+         * Fetch rows from the stream.
+         */
+        private void fetchRows() {
+            // Take the current last block and set the position after last.
+            rows = fetched.lastBlock();
+            cur = rows.size();
+
+            // Fetch stream.
+            if (stream.hasNext()) {
+                fetched.add(requireNonNull(stream.next()));
+
+                // Evict block if we've fetched too many rows.
+                if (fetched.size() == MAX_FETCH_SIZE) {
+                    onBlockEvict(fetched.evictFirstBlock());
+
+                    assert fetched.size() < MAX_FETCH_SIZE;
+                }
+            }
+
+            if (cur == rows.size())
+                cur = Integer.MAX_VALUE; // We were not able to fetch anything. Done.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (cur == Integer.MAX_VALUE)
+                return false;
+
+            if (++cur == rows.size())
+                fetchRows();
+
+            return cur < Integer.MAX_VALUE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Row get() {
+            return rows.get(cur);
+        }
+
+        /** {@inheritDoc} */
+        @Override public SearchRow getSearchRow() {
+            return get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean previous() {
+            // Should never be called.
+            throw DbException.getUnsupportedException("previous");
+        }
+    }
+
+    /**
+     *
      */
-    private static class PollableQueue<X> extends LinkedBlockingQueue<X> implements Pollable<X> {
+    private static class PollableQueue<X> extends LinkedBlockingQueue<X> implements AbstractReducer.Pollable<X> {
         // No-op.
     }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index b5c1f77..cac6c65 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -449,6 +449,37 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     }
 
     /**
+     * Build query flags.
+     *
+     * @return  Query flags.
+     */
+    public static int queryFlags(boolean distributedJoins,
+        boolean enforceJoinOrder,
+        boolean lazy,
+        boolean replicatedOnly,
+        boolean explain,
+        Boolean dataPageScanEnabled) {
+        int flags = enforceJoinOrder ? FLAG_ENFORCE_JOIN_ORDER : 0;
+
+        // Distributed joins flag is set if it is either reald
+        if (distributedJoins)
+            flags |= FLAG_DISTRIBUTED_JOINS;
+
+        if (explain)
+            flags |= FLAG_EXPLAIN;
+
+        if (replicatedOnly)
+            flags |= FLAG_REPLICATED;
+
+        if (lazy)
+            flags |= FLAG_LAZY;
+
+        flags = setDataPageScanEnabled(flags, dataPageScanEnabled);
+
+        return flags;
+    }
+
+    /**
      * Checks if data page scan enabled.
      *
      * @return {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 0922b08..1aa3a98 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -47,7 +47,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.processors.query.h2.twostep.ReduceIndex;
+import org.apache.ignite.internal.processors.query.h2.twostep.AbstractReducer;
 import org.apache.ignite.internal.util.GridRandom;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -574,7 +574,7 @@ public class IgniteSqlSplitterSelfTest extends AbstractIndexingCommonTest {
             Integer.class, Value.class));
 
         try {
-            GridTestUtils.setFieldValue(null, ReduceIndex.class, "PREFETCH_SIZE", 8);
+            GridTestUtils.setFieldValue(null, AbstractReducer.class, "PREFETCH_SIZE", 8);
 
             Random rnd = new GridRandom();
 
@@ -624,7 +624,7 @@ public class IgniteSqlSplitterSelfTest extends AbstractIndexingCommonTest {
             }
         }
         finally {
-            GridTestUtils.setFieldValue(null, ReduceIndex.class, "PREFETCH_SIZE", 1024);
+            GridTestUtils.setFieldValue(null, AbstractReducer.class, "PREFETCH_SIZE", 1024);
 
             c.destroy();
         }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
index a96dafc..321de17 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
@@ -1310,10 +1310,10 @@ public class KillQueryTest extends GridCommonAbstractTest {
             setMapper(new ReducePartitionMapper(ctx, ctx.log(GridReduceQueryExecutor.class)) {
                 /** {@inheritDoc} */
                 @Override public ReducePartitionMapResult nodesForPartitions(List<Integer> cacheIds,
-                    AffinityTopologyVersion topVer, int[] parts, boolean isReplicatedOnly, long qryId) {
+                    AffinityTopologyVersion topVer, int[] parts, boolean isReplicatedOnly) {
 
                     return retryNodePartMapping ? RETRY_RESULT :
-                        super.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, qryId);
+                        super.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly);
                 }
             });
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
index 8f229c2..30f1023 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
@@ -319,8 +319,8 @@ public class RetryCauseMessageSelfTest extends AbstractIndexingCommonTest {
         GridTestUtils.setFieldValue(rdcQryExec, GridReduceQueryExecutor.class, "mapper",
             new ReducePartitionMapper(ctx, logger) {
                 @Override public ReducePartitionMapResult nodesForPartitions(List<Integer> cacheIds,
-                    AffinityTopologyVersion topVer, int[] parts, boolean isReplicatedOnly, long qryId) {
-                    final ReducePartitionMapResult res = super.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, qryId);
+                    AffinityTopologyVersion topVer, int[] parts, boolean isReplicatedOnly) {
+                    final ReducePartitionMapResult res = super.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly);
 
                     return new ReducePartitionMapResult(Collections.emptyList(), res.partitionsMap(), res.queryPartitionsMap());
                 }
@@ -354,8 +354,8 @@ public class RetryCauseMessageSelfTest extends AbstractIndexingCommonTest {
         GridTestUtils.setFieldValue(rdcQryExec, GridReduceQueryExecutor.class, "mapper",
             new ReducePartitionMapper(ctx, logger) {
                 @Override public ReducePartitionMapResult nodesForPartitions(List<Integer> cacheIds,
-                    AffinityTopologyVersion topVer, int[] parts, boolean isReplicatedOnly, long qryId) {
-                    final ReducePartitionMapResult res = super.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, qryId);
+                    AffinityTopologyVersion topVer, int[] parts, boolean isReplicatedOnly) {
+                    final ReducePartitionMapResult res = super.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly);
 
                     return new ReducePartitionMapResult(Collections.emptyList(), res.partitionsMap(), res.queryPartitionsMap());
                 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 53166e1..5d238e8 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -223,7 +223,6 @@ import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryMinMaxTest;
 import org.apache.ignite.internal.processors.query.h2.QueryDataPageScanTest;
 import org.apache.ignite.internal.processors.query.h2.RowCountTableStatisticsSurvivesNodeRestartTest;
 import org.apache.ignite.internal.processors.query.h2.RowCountTableStatisticsUsageTest;
-import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPoolSelfTest;
 import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest;
 import org.apache.ignite.internal.processors.query.h2.sql.ExplainSelfTest;
 import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
@@ -537,8 +536,6 @@ import org.junit.runners.Suite;
     SqlUserCommandSelfTest.class,
     EncryptedSqlTableTest.class,
 
-    ThreadLocalObjectPoolSelfTest.class,
-
     // Partition loss.
     IndexingCachePartitionLossPolicySelfTest.class,