You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by mm...@apache.org on 2021/10/20 13:11:38 UTC

[ignite] branch master updated: IGNITE-14703 Add MergeSort distributed cache query reducer. (#9081)

This is an automated email from the ASF dual-hosted git repository.

mmuzaf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ea52fa4  IGNITE-14703 Add MergeSort distributed cache query reducer. (#9081)
ea52fa4 is described below

commit ea52fa4719ffff0f330c98c83347aa6e5547ac9b
Author: Maksim Timonin <ti...@gmail.com>
AuthorDate: Wed Oct 20 16:11:07 2021 +0300

    IGNITE-14703 Add MergeSort distributed cache query reducer. (#9081)
---
 .../processors/cache/CacheObjectUtils.java         |   2 +-
 .../processors/cache/IgniteCacheProxyImpl.java     |   2 +-
 .../GridCacheDistributedFieldsQueryFuture.java     |  10 +-
 .../query/GridCacheDistributedQueryFuture.java     | 337 ++++++++++---------
 .../query/GridCacheDistributedQueryManager.java    | 230 +++----------
 .../query/GridCacheLocalFieldsQueryFuture.java     |   2 +-
 .../cache/query/GridCacheLocalQueryFuture.java     |  35 +-
 .../cache/query/GridCacheLocalQueryManager.java    |   5 -
 .../cache/query/GridCacheQueryAdapter.java         |   2 +-
 .../cache/query/GridCacheQueryFutureAdapter.java   | 358 ++++++---------------
 .../cache/query/GridCacheQueryManager.java         |  40 +--
 .../cache/query/GridCacheQueryRequest.java         |  85 ++++-
 .../processors/cache/query/ScoredCacheEntry.java   |  68 ++++
 .../cache/query/reducer/CacheQueryReducer.java     |  70 ++++
 .../query/reducer/MergeSortCacheQueryReducer.java  | 104 ++++++
 .../processors/cache/query/reducer/NodePage.java   |  80 +++++
 .../cache/query/reducer/NodePageStream.java        | 127 ++++++++
 .../query/reducer/UnsortedCacheQueryReducer.java   |  83 +++++
 .../processors/query/h2/opt/GridLuceneIndex.java   |  11 +-
 .../cache/GridCacheFullTextQueryAbstractTest.java  |  69 ++++
 .../cache/GridCacheFullTextQueryFailoverTest.java  |  77 +++++
 .../cache/GridCacheFullTextQueryLimitTest.java     |  94 ++++++
 ...ridCacheFullTextQueryMultithreadedSelfTest.java |   3 +-
 .../cache/GridCacheFullTextQueryPagesTest.java     | 182 +++++++++++
 .../IgniteBinaryCacheQueryTestSuite.java           |   6 +
 .../org/apache/ignite/util/KillCommandsTests.java  |  13 +-
 26 files changed, 1409 insertions(+), 686 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
index ed278b1..cf36ce1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
@@ -177,7 +177,7 @@ public class CacheObjectUtils {
      * @param ldr Class loader, used for deserialization from binary representation.
      * @return Unwrapped object.
      */
-    private static Object unwrapBinary(
+    public static Object unwrapBinary(
         CacheObjectValueContext ctx,
         Object o,
         boolean keepBinary,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index eada11c..81d3247 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -545,7 +545,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
         if (query instanceof TextQuery) {
             TextQuery q = (TextQuery)query;
 
-            qry = ctx.queries().createFullTextQuery(q.getType(), q.getText(), q.getLimit(), isKeepBinary);
+            qry = ctx.queries().createFullTextQuery(q.getType(), q.getText(), q.getLimit(), q.getPageSize(), isKeepBinary);
 
             if (grp != null)
                 qry.projection(grp);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java
index 743dc42..a730347 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedFieldsQueryFuture.java
@@ -34,9 +34,6 @@ import org.jetbrains.annotations.Nullable;
 public class GridCacheDistributedFieldsQueryFuture
     extends GridCacheDistributedQueryFuture<Object, Object, List<Object>>
     implements GridCacheQueryMetadataAware {
-    /** */
-    private static final long serialVersionUID = 0L;
-
     /** Meta data future. */
     private final GridFutureAdapter<List<GridQueryFieldMetadata>> metaFut;
 
@@ -44,10 +41,9 @@ public class GridCacheDistributedFieldsQueryFuture
      * @param ctx Cache context.
      * @param reqId Request ID.
      * @param qry Query.
-     * @param nodes Nodes.
      */
-    public GridCacheDistributedFieldsQueryFuture(GridCacheContext<?, ?> ctx, long reqId,
-        GridCacheQueryBean qry, Iterable<ClusterNode> nodes) {
+    public GridCacheDistributedFieldsQueryFuture(GridCacheContext<?, ?> ctx, long reqId, GridCacheQueryBean qry,
+        Collection<ClusterNode> nodes) {
         super((GridCacheContext<Object, Object>)ctx, reqId, qry, nodes);
 
         metaFut = new GridFutureAdapter<>();
@@ -93,7 +89,7 @@ public class GridCacheDistributedFieldsQueryFuture
     }
 
     /** {@inheritDoc} */
-    @Override boolean fields() {
+    @Override public boolean fields() {
         return true;
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 8f08dea..1efc895 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -17,257 +17,168 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteIllegalStateException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.query.reducer.MergeSortCacheQueryReducer;
+import org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream;
+import org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer;
 import org.apache.ignite.internal.util.lang.GridPlainCallable;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
+
 /**
  * Distributed query future.
  */
 public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutureAdapter<K, V, R> {
     /** */
-    private static final long serialVersionUID = 0L;
+    private final long reqId;
 
-    /** */
-    private long reqId;
+    /** Helps to send cache query requests to other nodes. */
+    private final GridCacheDistributedQueryManager<K, V> qryMgr;
 
-    /** */
-    private final Collection<UUID> subgrid = new HashSet<>();
+    /** Collection of streams. */
+    private final Map<UUID, NodePageStream<R>> streams;
 
-    /** */
-    private final Collection<UUID> rcvd = new HashSet<>();
+    /** Count of streams that finish receiving remote pages. */
+    private final AtomicInteger noRemotePagesStreamsCnt = new AtomicInteger();
 
-    /** */
-    private CountDownLatch firstPageLatch = new CountDownLatch(1);
+    /** Count down this latch when every node responses on initial cache query request. */
+    private final CountDownLatch firstPageLatch = new CountDownLatch(1);
+
+    /** Set of nodes that deliver their first page. */
+    private Set<UUID> rcvdFirstPage = ConcurrentHashMap.newKeySet();
 
     /**
      * @param ctx Cache context.
      * @param reqId Request ID.
      * @param qry Query.
-     * @param nodes Nodes.
      */
-    protected GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long reqId, GridCacheQueryBean qry,
-        Iterable<ClusterNode> nodes) {
+    protected GridCacheDistributedQueryFuture(
+        GridCacheContext<K, V> ctx,
+        long reqId,
+        GridCacheQueryBean qry,
+        Collection<ClusterNode> nodes
+    ) {
         super(ctx, qry, false);
 
         assert reqId > 0;
 
         this.reqId = reqId;
 
-        GridCacheQueryManager<K, V> mgr = ctx.queries();
+        qryMgr = (GridCacheDistributedQueryManager<K, V>) ctx.queries();
 
-        assert mgr != null;
+        streams = new ConcurrentHashMap<>(nodes.size());
 
-        synchronized (this) {
-            for (ClusterNode node : nodes)
-                subgrid.add(node.id());
+        for (ClusterNode node : nodes) {
+            streams.computeIfAbsent(node.id(), nodeId ->
+                new NodePageStream<>(nodeId, () -> requestPages(nodeId), () -> cancelPages(nodeId)));
         }
-    }
 
-    /** {@inheritDoc} */
-    @Override protected void cancelQuery() throws IgniteCheckedException {
-        final GridCacheQueryManager<K, V> qryMgr = cctx.queries();
-
-        assert qryMgr != null;
-
-        try {
-            Collection<ClusterNode> allNodes = cctx.discovery().allNodes();
-            Collection<ClusterNode> nodes;
-
-            synchronized (this) {
-                nodes = F.retain(allNodes, true,
-                    new P1<ClusterNode>() {
-                        @Override public boolean apply(ClusterNode node) {
-                            return !cctx.localNodeId().equals(node.id()) && subgrid.contains(node.id());
-                        }
-                    }
-                );
-
-                subgrid.clear();
-            }
+        Map<UUID, NodePageStream<R>> streamsMap = Collections.unmodifiableMap(streams);
 
-            final GridCacheQueryRequest req = new GridCacheQueryRequest(cctx.cacheId(),
-                reqId,
-                fields(),
-                qryMgr.queryTopologyVersion(),
-                cctx.deploymentEnabled());
+        reducer = qry.query().type() == TEXT ?
+            new MergeSortCacheQueryReducer<>(streamsMap)
+            : new UnsortedCacheQueryReducer<>(streamsMap);
+    }
 
-            // Process cancel query directly (without sending) for local node,
-            cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
-                @Override public Object call() {
-                    qryMgr.processQueryRequest(cctx.localNodeId(), req);
+    /** {@inheritDoc} */
+    @Override protected void cancelQuery(Throwable err) {
+        firstPageLatch.countDown();
 
-                    return null;
-                }
-            });
+        for (NodePageStream<R> s : streams.values())
+            s.cancel(err);
 
-            if (!nodes.isEmpty()) {
-                for (ClusterNode node : nodes) {
-                    try {
-                        cctx.io().send(node, req, cctx.ioPolicy());
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (cctx.io().checkNodeLeft(node.id(), e, false)) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to send cancel request, node failed: " + node);
-                        }
-                        else
-                            U.error(log, "Failed to send cancel request [node=" + node + ']', e);
-                    }
-                }
-            }
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send cancel request (will cancel query in any case).", e);
-        }
-
-        qryMgr.onQueryFutureCanceled(reqId);
+        cctx.queries().onQueryFutureCanceled(reqId);
 
         clear();
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
     @Override protected void onNodeLeft(UUID nodeId) {
-        boolean callOnPage;
+        boolean hasRemotePages = streams.get(nodeId).hasRemotePages();
 
-        synchronized (this) {
-            callOnPage = !loc && subgrid.contains(nodeId);
-        }
-
-        if (callOnPage)
-            onPage(nodeId, Collections.emptyList(),
-                new ClusterTopologyCheckedException("Remote node has left topology: " + nodeId), true);
+        if (hasRemotePages)
+            onError(new ClusterTopologyCheckedException("Remote node has left topology: " + nodeId));
     }
 
     /** {@inheritDoc} */
-    @Override public void awaitFirstPage() throws IgniteCheckedException {
-        try {
-            firstPageLatch.await();
+    @Override protected void onPage(UUID nodeId, Collection<R> data, boolean last) {
+        synchronized (firstPageLatch) {
+            if (rcvdFirstPage != null) {
+                rcvdFirstPage.add(nodeId);
 
-            if (isDone() && error() != null)
-                // Throw the exception if future failed.
-                get();
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
+                if (rcvdFirstPage.size() == streams.size()) {
+                    firstPageLatch.countDown();
 
-            throw new IgniteInterruptedCheckedException(e);
+                    rcvdFirstPage.clear();
+                    rcvdFirstPage = null;
+                }
+            }
         }
-    }
 
-    /** {@inheritDoc} */
-    @Override protected boolean onPage(UUID nodeId, boolean last) {
-        assert Thread.holdsLock(this);
+        NodePageStream<R> stream = streams.get(nodeId);
 
-        if (!loc) {
-            rcvd.add(nodeId);
+        if (stream == null)
+            return;
 
-            if (rcvd.containsAll(subgrid))
-                firstPageLatch.countDown();
-        }
-
-        boolean futFinish;
+        stream.addPage(data, last);
 
         if (last) {
-            futFinish = loc || (subgrid.remove(nodeId) && subgrid.isEmpty());
-
-            if (futFinish)
-                firstPageLatch.countDown();
-        }
-        else
-            futFinish = false;
-
-        return futFinish;
-    }
+            int cnt;
 
-    /** {@inheritDoc} */
-    @Override protected void loadPage() {
-        assert !Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = null;
+            do {
+                cnt = noRemotePagesStreamsCnt.get();
 
-        synchronized (this) {
-            if (!isDone() && rcvd.containsAll(subgrid)) {
-                rcvd.clear();
+            } while (!noRemotePagesStreamsCnt.compareAndSet(cnt, cnt + 1));
 
-                nodes = nodes();
-            }
+            if (cnt + 1 >= streams.size())
+                onDone();
         }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, false);
     }
 
     /** {@inheritDoc} */
-    @Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
-        assert !Thread.holdsLock(this);
-
+    @Override public void awaitFirstItemAvailable() throws IgniteCheckedException {
         U.await(firstPageLatch);
 
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && !subgrid.isEmpty())
-                nodes = nodes();
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, true);
-    }
-
-    /**
-     * @return Nodes to send requests to.
-     */
-    private Collection<ClusterNode> nodes() {
-        assert Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size());
-
-        for (UUID nodeId : subgrid) {
-            ClusterNode node = cctx.discovery().node(nodeId);
-
-            if (node != null)
-                nodes.add(node);
-        }
-
-        return nodes;
+        if (isDone() && error() != null)
+            // Throw the exception if future failed.
+            super.get();
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(Collection<R> res, Throwable err) {
-        boolean done = super.onDone(res, err);
-
-        // Must release the lath after onDone() in order for a waiting thread to see an exception, if any.
-        firstPageLatch.countDown();
-
-        return done;
+    @Override public Collection<R> get() throws IgniteCheckedException {
+        return get0();
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onCancelled() {
-        firstPageLatch.countDown();
-
-        return super.onCancelled();
+    @Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException {
+        return get0();
     }
 
     /** {@inheritDoc} */
-    @Override public void onTimeout() {
-        firstPageLatch.countDown();
+    @Override public Collection<R> getUninterruptibly() throws IgniteCheckedException {
+        return get0();
+    }
 
-        super.onTimeout();
+    /**
+     * Completion of distributed query future depends on user that iterates over query result with lazy page loading.
+     * Then {@link #get()} can lock on unpredictably long period of time. So we should avoid call it.
+     */
+    private Collection<R> get0() {
+        throw new IgniteIllegalStateException("Unexpected lock on iterator over distributed cache query result.");
     }
 
     /** {@inheritDoc} */
@@ -284,4 +195,82 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
     long requestId() {
         return reqId;
     }
+
+    /**
+     * Send initial query request to query nodes.
+     */
+    public void startQuery() {
+        try {
+            GridCacheQueryRequest req = GridCacheQueryRequest.startQueryRequest(cctx, reqId, this);
+
+            qryMgr.sendRequest(this, req, streams.keySet());
+        }
+        catch (IgniteCheckedException e) {
+            onError(e);
+        }
+    }
+
+    /**
+     * Send request to fetch new pages.
+     *
+     * @param nodeId Node to send request.
+     */
+    private void requestPages(UUID nodeId) {
+        try {
+            GridCacheQueryRequest req = GridCacheQueryRequest.pageRequest(cctx, reqId, query().query(), fields());
+
+            qryMgr.sendRequest(this, req, Collections.singletonList(nodeId));
+        }
+        catch (IgniteCheckedException e) {
+            onError(e);
+        }
+    }
+
+    /**
+     * Cancels remote pages.
+     *
+     * @param nodeId Node to send request.
+     */
+    // TODO IGNITE-15731: Refactor how CacheQueryReducer handles remote nodes.
+    private void cancelPages(UUID nodeId) {
+        try {
+            GridCacheQueryRequest req = GridCacheQueryRequest.cancelRequest(cctx, reqId, fields());
+
+            if (nodeId.equals(cctx.localNodeId())) {
+                // Process cancel query directly (without sending) for local node,
+                cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
+                    @Override public Object call() {
+                        qryMgr.processQueryRequest(cctx.localNodeId(), req);
+
+                        return null;
+                    }
+                });
+            }
+            else {
+                try {
+                    cctx.io().send(nodeId, req, cctx.ioPolicy());
+                }
+                catch (IgniteCheckedException e) {
+                    if (cctx.io().checkNodeLeft(nodeId, e, false)) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send cancel request, node failed: " + nodeId);
+                    }
+                    else
+                        U.error(log, "Failed to send cancel request [node=" + nodeId + ']', e);
+                }
+            }
+        }
+        catch (IgniteCheckedException e) {
+            U.error(logger(), "Failed to send cancel request (will cancel query in any case).", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onError(Throwable err) {
+        if (onDone(err)) {
+            streams.values().forEach(s -> s.cancel(err));
+
+            firstPageLatch.countDown();
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 770542b..6fb0907 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.metric.IoStatisticsHolder;
 import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
@@ -81,7 +80,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
     private ConcurrentMap<Long, Thread> threads = new ConcurrentHashMap<>();
 
     /** {request ID -> future} */
-    private ConcurrentMap<Long, GridCacheDistributedQueryFuture<?, ?, ?>> futs =
+    private final ConcurrentMap<Long, GridCacheDistributedQueryFuture<?, ?, ?>> futs =
         new ConcurrentHashMap<>();
 
     /** Received requests to cancel. */
@@ -158,24 +157,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
      * Removes query future from futures map.
      *
      * @param reqId Request id.
-     * @param fut Query future.
-     */
-    protected void addQueryFuture(long reqId, GridCacheDistributedQueryFuture<?, ?, ?> fut) {
-        futs.put(reqId, fut);
-
-        if (cctx.kernalContext().clientDisconnected()) {
-            IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
-                cctx.kernalContext().cluster().clientReconnectFuture(),
-                "Query was cancelled, client node disconnected.");
-
-            fut.onDone(err);
-        }
-    }
-
-    /**
-     * Removes query future from futures map.
-     *
-     * @param reqId Request id.
      */
     protected void removeQueryFuture(long reqId) {
         futs.remove(reqId);
@@ -197,7 +178,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
      * @param sndId Sender node id.
      * @param req Query request.
      */
-    @Override void processQueryRequest(UUID sndId, GridCacheQueryRequest req) {
+    @Override public void processQueryRequest(UUID sndId, GridCacheQueryRequest req) {
         assert req.mvccSnapshot() != null || !cctx.mvccEnabled() || req.cancel() ||
             (req.type() == null && !req.fields()) : req; // Last assertion means next page request.
 
@@ -430,22 +411,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
     }
 
     /** {@inheritDoc} */
-    @Override void onWaitAtStop() {
-        super.onWaitAtStop();
-
-        // Wait till all requests will be finished.
-        for (GridCacheQueryFutureAdapter fut : futs.values())
-            try {
-                fut.get();
-            }
-            catch (IgniteCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Received query error while waiting for query to finish [queryFuture= " + fut +
-                        ", error= " + e + ']');
-            }
-    }
-
-    /** {@inheritDoc} */
     @Override protected boolean onPageReady(boolean loc, GridCacheQueryInfo qryInfo,
         Collection<?> data, boolean finished, Throwable e) {
         GridCacheLocalQueryFuture<?, ?, ?> fut = qryInfo.localQueryFuture();
@@ -523,8 +488,12 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, final Collection<ClusterNode> nodes) {
+        return queryDistributed(qry, nodes, false);
+    }
+
+    /** */
+    private CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, final Collection<ClusterNode> nodes, boolean fields) {
         assert cctx.config().getCacheMode() != LOCAL;
 
         if (log.isDebugEnabled())
@@ -532,48 +501,14 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
 
         long reqId = cctx.io().nextIoId();
 
-        final GridCacheDistributedQueryFuture<K, V, ?> fut =
-            new GridCacheDistributedQueryFuture<>(cctx, reqId, qry, nodes);
+        final GridCacheDistributedQueryFuture fut = fields ?
+            new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes) :
+            new GridCacheDistributedQueryFuture(cctx, reqId, qry, nodes);
 
         try {
-            qry.query().validate();
-
-            String clsName = qry.query().queryClassName();
-            Boolean dataPageScanEnabled = qry.query().isDataPageScanEnabled();
-            MvccSnapshot mvccSnapshot = qry.query().mvccSnapshot();
-
-            boolean deployFilterOrTransformer = (qry.query().scanFilter() != null || qry.query().transform() != null
-                || qry.query().idxQryDesc() != null) && cctx.gridDeploy().enabled();
-
-            final GridCacheQueryRequest req = new GridCacheQueryRequest(
-                cctx.cacheId(),
-                reqId,
-                cctx.name(),
-                qry.query().type(),
-                false,
-                qry.query().clause(),
-                qry.query().idxQryDesc(),
-                qry.query().limit(),
-                clsName,
-                qry.query().scanFilter(),
-                qry.query().partition(),
-                qry.reducer(),
-                qry.transform(),
-                qry.query().pageSize(),
-                qry.query().includeBackups(),
-                qry.arguments(),
-                false,
-                qry.query().keepBinary(),
-                qry.query().taskHash(),
-                queryTopologyVersion(),
-                mvccSnapshot,
-                // Force deployment anyway if scan query is used.
-                cctx.deploymentEnabled() || deployFilterOrTransformer,
-                dataPageScanEnabled);
-
-            addQueryFuture(req.id(), fut);
+            fut.qry.query().validate();
 
-            final Object topic = topic(cctx.nodeId(), req.id());
+            final Object topic = topic(cctx.nodeId(), reqId);
 
             cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
 
@@ -583,17 +518,27 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 }
             });
 
-            sendRequest(fut, req, nodes);
+            futs.put(reqId, fut);
+
+            if (cctx.kernalContext().clientDisconnected()) {
+                IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
+                    cctx.kernalContext().cluster().clientReconnectFuture(),
+                    "Query was cancelled, client node disconnected.");
+
+                fut.onError(err);
+            }
+
+            fut.startQuery();
         }
         catch (IgniteCheckedException e) {
-            fut.onDone(e);
+            fut.onError(e);
         }
 
         return fut;
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings({"unchecked", "serial"})
+    @SuppressWarnings({"unchecked"})
     @Override public GridCloseableIterator scanQueryDistributed(final GridCacheQueryAdapter qry,
         Collection<ClusterNode> nodes) throws IgniteCheckedException {
         assert !cctx.isLocal() : cctx.name();
@@ -722,39 +667,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
     }
 
     /** {@inheritDoc} */
-    @Override public void loadPage(long id, GridCacheQueryAdapter<?> qry, Collection<ClusterNode> nodes, boolean all) {
-        assert cctx.config().getCacheMode() != LOCAL;
-        assert qry != null;
-        assert nodes != null;
-
-        GridCacheDistributedQueryFuture<?, ?, ?> fut = futs.get(id);
-
-        assert fut != null;
-
-        try {
-            GridCacheQueryRequest req = new GridCacheQueryRequest(
-                cctx.cacheId(),
-                id,
-                cctx.name(),
-                qry.pageSize(),
-                qry.includeBackups(),
-                fut.fields(),
-                all,
-                qry.keepBinary(),
-                qry.taskHash(),
-                queryTopologyVersion(),
-                // Force deployment anyway if scan query is used.
-                cctx.deploymentEnabled() || (qry.scanFilter() != null && cctx.gridDeploy().enabled()),
-                qry.isDataPageScanEnabled());
-
-            sendRequest(fut, req, nodes);
-        }
-        catch (IgniteCheckedException e) {
-            fut.onDone(e);
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public CacheQueryFuture<?> queryFieldsLocal(GridCacheQueryBean qry) {
         assert cctx.config().getCacheMode() != LOCAL;
 
@@ -776,65 +688,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
     }
 
     /** {@inheritDoc} */
-    @Override public CacheQueryFuture<?> queryFieldsDistributed(GridCacheQueryBean qry,
-        Collection<ClusterNode> nodes) {
-        assert cctx.config().getCacheMode() != LOCAL;
-
-        if (log.isDebugEnabled())
-            log.debug("Executing distributed query: " + qry);
-
-        long reqId = cctx.io().nextIoId();
-
-        final GridCacheDistributedFieldsQueryFuture fut =
-            new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes);
-
-        try {
-            qry.query().validate();
-
-            GridCacheQueryRequest req = new GridCacheQueryRequest(
-                cctx.cacheId(),
-                reqId,
-                cctx.name(),
-                qry.query().type(),
-                true,
-                qry.query().clause(),
-                null,
-                qry.query().limit(),
-                null,
-                null,
-                null,
-                qry.reducer(),
-                qry.transform(),
-                qry.query().pageSize(),
-                qry.query().includeBackups(),
-                qry.arguments(),
-                qry.query().includeMetadata(),
-                qry.query().keepBinary(),
-                qry.query().taskHash(),
-                queryTopologyVersion(),
-                null,
-                cctx.deploymentEnabled(),
-                qry.query().isDataPageScanEnabled());
-
-            addQueryFuture(req.id(), fut);
-
-            final Object topic = topic(cctx.nodeId(), req.id());
-
-            cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
-
-            fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> fut) {
-                    cctx.io().removeOrderedHandler(false, topic);
-                }
-            });
-
-            sendRequest(fut, req, nodes);
-        }
-        catch (IgniteCheckedException e) {
-            fut.onDone(e);
-        }
-
-        return fut;
+    @Override public CacheQueryFuture<?> queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
+        return queryDistributed(qry, nodes, true);
     }
 
     /**
@@ -842,32 +697,33 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
      *
      * @param fut Distributed future.
      * @param req Request.
-     * @param nodes Nodes.
+     * @param nodeIds Nodes.
      * @throws IgniteCheckedException In case of error.
      */
-    private void sendRequest(
+    // TODO IGNITE-15731: Refactor how CacheQueryReducer handles remote nodes.
+    public void sendRequest(
         final GridCacheDistributedQueryFuture<?, ?, ?> fut,
         final GridCacheQueryRequest req,
-        Collection<ClusterNode> nodes
+        Collection<UUID> nodeIds
     ) throws IgniteCheckedException {
         assert fut != null;
         assert req != null;
-        assert nodes != null;
+        assert nodeIds != null;
 
         final UUID locNodeId = cctx.localNodeId();
 
-        ClusterNode locNode = null;
+        boolean locNode = false;
 
-        Collection<ClusterNode> rmtNodes = null;
+        Collection<UUID> rmtNodes = null;
 
-        for (ClusterNode n : nodes) {
-            if (n.id().equals(locNodeId))
-                locNode = n;
+        for (UUID nodeId: nodeIds) {
+            if (nodeId.equals(locNodeId))
+                locNode = true;
             else {
                 if (rmtNodes == null)
-                    rmtNodes = new ArrayList<>(nodes.size());
+                    rmtNodes = new ArrayList<>(nodeIds.size());
 
-                rmtNodes.add(n);
+                rmtNodes.add(nodeId);
             }
         }
 
@@ -875,13 +731,13 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
         // For example, a remote reducer has a state, we should not serialize and then send
         // the reducer changed by the local node.
         if (!F.isEmpty(rmtNodes)) {
-            for (ClusterNode node : rmtNodes) {
+            for (UUID node : rmtNodes) {
                 try {
                     cctx.io().send(node, req, GridIoPolicy.QUERY_POOL);
                 }
                 catch (IgniteCheckedException e) {
-                    if (cctx.io().checkNodeLeft(node.id(), e, true)) {
-                        fut.onNodeLeft(node.id());
+                    if (cctx.io().checkNodeLeft(node, e, true)) {
+                        fut.onNodeLeft(node);
 
                         if (fut.isDone())
                             return;
@@ -892,7 +748,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
             }
         }
 
-        if (locNode != null) {
+        if (locNode) {
             cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
                 @Override public Object call() throws Exception {
                     req.beforeLocalExecution(cctx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java
index e83c35f..20e4066 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalFieldsQueryFuture.java
@@ -72,7 +72,7 @@ public class GridCacheLocalFieldsQueryFuture
     }
 
     /** {@inheritDoc} */
-    @Override boolean fields() {
+    @Override public boolean fields() {
         return true;
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
index bd60ed3..d8fd4f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
@@ -17,12 +17,17 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
+import java.util.Collection;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.query.reducer.CacheQueryReducer;
+import org.apache.ignite.internal.processors.cache.query.reducer.NodePageStream;
+import org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteReducer;
@@ -33,14 +38,14 @@ import org.apache.ignite.marshaller.Marshaller;
  */
 public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdapter<K, V, R> {
     /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
     private Runnable run;
 
     /** */
     private IgniteInternalFuture<?> fut;
 
+    /** Local node page stream with single page. */
+    private final NodePageStream<R> stream;
+
     /**
      * @param ctx Context.
      * @param qry Query.
@@ -49,6 +54,10 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
         super(ctx, qry, true);
 
         run = new LocalQueryRunnable();
+
+        stream = new NodePageStream<>(ctx.localNodeId(), () -> {}, () -> {});
+
+        reducer = new UnsortedCacheQueryReducer<>(F.asMap(stream.nodeId(), stream));
     }
 
     /**
@@ -59,29 +68,27 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
     }
 
     /** {@inheritDoc} */
-    @Override protected void cancelQuery() throws IgniteCheckedException {
+    @Override protected void cancelQuery(Throwable err) throws IgniteCheckedException {
         if (fut != null)
             fut.cancel();
-    }
 
-    /** {@inheritDoc} */
-    @Override protected boolean onPage(UUID nodeId, boolean last) {
-        return last;
+        stream.cancel(err);
     }
 
     /** {@inheritDoc} */
-    @Override protected void loadPage() {
-        // No-op.
+    @Override public void awaitFirstItemAvailable() throws IgniteCheckedException {
+        CacheQueryReducer.get(stream.headPage());
     }
 
     /** {@inheritDoc} */
-    @Override protected void loadAllPages() {
-        // No-op.
+    @Override protected void onError(Throwable err) {
+        if (onDone(err))
+            stream.cancel(err);
     }
 
     /** {@inheritDoc} */
-    @Override public void awaitFirstPage() throws IgniteCheckedException {
-        get();
+    @Override protected void onPage(UUID nodeId, Collection<R> data, boolean lastPage) {
+        stream.addPage(data, lastPage);
     }
 
     /** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
index 147725b..4d1e20e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
@@ -98,11 +98,6 @@ public class GridCacheLocalQueryManager<K, V> extends GridCacheQueryManager<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public void loadPage(long id, GridCacheQueryAdapter<?> qry, Collection<ClusterNode> nodes, boolean all) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Override public CacheQueryFuture<?> queryFieldsLocal(GridCacheQueryBean qry) {
         assert cctx.config().getCacheMode() == LOCAL;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 68c5b86..8a60d54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -907,7 +907,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                         return (cur = convert(fut.next())) != null;
 
                     try {
-                        fut.awaitFirstPage();
+                        fut.awaitFirstItemAvailable();
 
                         firstItemReturned = true;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index ee91c8d..691ea1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -20,26 +20,18 @@ package org.apache.ignite.internal.processors.cache.query;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.query.reducer.CacheQueryReducer;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -47,7 +39,10 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Query future adapter.
+ * Query future adapter. Future marked as done after all result pages are ready. Note that completeness of this future
+ * doesn't depend on whether all data are delievered to user or not. This class provides {@code Iterator} interface to
+ * access result data. Handling of recieved pages is triggered by {@code Iterator} methods.
+ * Reducing of result data (order, limit) is controlled by {@link CacheQueryReducer}.
  *
  * @param <R> Result type.
  */
@@ -74,23 +69,11 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     /** */
     private boolean limitDisabled;
 
-    /** Set of received keys used to deduplicate query result set. */
-    private final Collection<K> keys;
-
-    /** */
-    private final Queue<Collection<R>> queue = new LinkedList<>();
-
-    /** */
-    private final AtomicInteger cnt = new AtomicInteger();
-
-    /** */
-    private Iterator<R> iter;
-
     /** */
-    private IgniteUuid timeoutId = IgniteUuid.randomUuid();
+    private int cnt;
 
     /** */
-    private long startTime;
+    private final IgniteUuid timeoutId = IgniteUuid.randomUuid();
 
     /** */
     private long endTime;
@@ -98,10 +81,12 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     /** */
     protected boolean loc;
 
+    /** Reducer of cache query results. */
+    protected CacheQueryReducer<R> reducer;
+
     /** */
     protected GridCacheQueryFutureAdapter() {
         qry = null;
-        keys = null;
     }
 
     /**
@@ -117,7 +102,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridCacheQueryFutureAdapter.class);
 
-        startTime = U.currentTimeMillis();
+        long startTime = U.currentTimeMillis();
 
         long timeout = qry.query().timeout();
         capacity = query().query().limit();
@@ -132,8 +117,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
 
             cctx.time().addTimeoutObject(this);
         }
-
-        keys = qry.query().enableDedup() ? new HashSet<K>() : null;
     }
 
     /**
@@ -143,43 +126,62 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
         return qry;
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteLogger logger() {
+        return log;
+    }
+
     /**
      * @return If fields query.
      */
-    boolean fields() {
+    public boolean fields() {
         return false;
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(Collection<R> res, Throwable err) {
-        cctx.time().removeTimeoutObject(this);
-
-        return super.onDone(res, err);
-    }
-
-    /** {@inheritDoc} */
     @Override public R next() {
         try {
-            R next = unmaskNull(internalIterator().next());
+            if (!limitDisabled && cnt == capacity)
+                return null;
+
+            checkError();
+
+            R next = null;
 
-            cnt.decrementAndGet();
+            if (reducer.hasNextX()) {
+                next = unmaskNull(reducer.nextX());
+
+                if (!limitDisabled) {
+                    cnt++;
+
+                    // Exceed limit, stop page loading and cancel queries.
+                    if (cnt == capacity)
+                        cancel();
+                }
+            }
 
             return next;
         }
-        catch (NoSuchElementException ignored) {
-            return null;
-        }
         catch (IgniteCheckedException e) {
             throw CU.convertToCacheException(e);
         }
     }
 
     /**
-     * Waits for the first page to be received from remote node(s), if any.
+     * @return Cache context.
+     */
+    public GridCacheContext<K, V> cacheContext() {
+        return cctx;
+    }
+
+    /**
+     * TODO: IGNITE-15728 Provide custom reducer for ScanQueryFallbackClosableIterator.
+     *
+     * Waits for the first item is available to return.
      *
      * @throws IgniteCheckedException If query execution failed with an error.
      */
-    public abstract void awaitFirstPage() throws IgniteCheckedException;
+    public abstract void awaitFirstItemAvailable() throws IgniteCheckedException;
 
     /**
      * @throws IgniteCheckedException If future is done with an error.
@@ -193,65 +195,8 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     }
 
     /**
-     * @return Iterator.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private Iterator<R> internalIterator() throws IgniteCheckedException {
-        checkError();
-
-        Iterator<R> it = null;
-
-        while (it == null || !it.hasNext()) {
-            Collection<R> c;
-
-            synchronized (this) {
-                it = iter;
-
-                if (it != null && it.hasNext())
-                    break;
-
-                c = queue.poll();
-
-                if (c != null)
-                    it = iter = c.iterator();
-
-                if (isDone() && queue.peek() == null)
-                    break;
-            }
-
-            if (c == null && !isDone()) {
-                loadPage();
-
-                long timeout = qry.query().timeout();
-
-                long waitTime = timeout == 0 ? Long.MAX_VALUE : timeout - (U.currentTimeMillis() - startTime);
-
-                if (waitTime <= 0) {
-                    it = Collections.<R>emptyList().iterator();
-
-                    break;
-                }
-
-                synchronized (this) {
-                    try {
-                        if (queue.isEmpty() && !isDone())
-                            wait(waitTime);
-                    }
-                    catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-
-                        throw new IgniteCheckedException("Query was interrupted: " + qry, e);
-                    }
-                }
-            }
-        }
-
-        checkError();
-
-        return it;
-    }
-
-    /**
+     * Callback that invoked in case of a node left cluster.
+     *
      * @param evtNodeId Removed or failed node Id.
      */
     protected void onNodeLeft(UUID evtNodeId) {
@@ -259,59 +204,14 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     }
 
     /**
-     * @param col Collection.
-     */
-    protected void enqueue(Collection<?> col) {
-        assert Thread.holdsLock(this);
-
-        if (limitDisabled) {
-            queue.add((Collection<R>)col);
-
-            cnt.addAndGet(col.size());
-        }
-        else {
-            if (capacity >= col.size()) {
-                queue.add((Collection<R>)col);
-                capacity -= col.size();
-
-                cnt.addAndGet(col.size());
-            }
-            else if (capacity > 0) {
-                queue.add(new ArrayList<>((Collection<R>)col).subList(0, capacity));
-                capacity = 0;
-
-                cnt.addAndGet(capacity);
-            }
-        }
-    }
-
-    /**
-     * @param col Query data collection.
-     * @return If dedup flag is {@code true} deduplicated collection (considering keys), otherwise passed in collection
-     * without any modifications.
-     */
-    private Collection<?> dedupIfRequired(Collection<?> col) {
-        if (!qry.query().enableDedup())
-            return col;
-
-        Collection<Object> dedupCol = new ArrayList<>(col.size());
-
-        synchronized (this) {
-            for (Object o : col)
-                if (!(o instanceof Map.Entry) || keys.add(((Map.Entry<K, V>)o).getKey()))
-                    dedupCol.add(o);
-        }
-
-        return dedupCol;
-    }
-
-    /**
+     * Entrypoint for handling query result page from remote node.
+     *
      * @param nodeId Sender node.
      * @param data Page data.
      * @param err Error (if was).
-     * @param finished Finished or not.
+     * @param lastPage Whether it is the last page for sender node.
      */
-    public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data, @Nullable Throwable err, boolean finished) {
+    public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data, @Nullable Throwable err, boolean lastPage) {
         if (isCancelled())
             return;
 
@@ -320,71 +220,76 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
                 "nodeId", nodeId, false,
                 "data", data, true,
                 "err", err, false,
-                "finished", finished, false));
+                "finished", lastPage, false));
 
         try {
-            if (err != null)
-                synchronized (this) {
-                    enqueue(Collections.emptyList());
-
-                    if (err instanceof IgniteCheckedException)
-                        onDone(err);
-                    else
-                        onDone(new IgniteCheckedException(nodeId != null ?
-                            S.toString("Failed to execute query on node",
-                                "query", qry, true,
-                                "nodeId", nodeId, false) :
-                            S.toString("Failed to execute query locally",
-                                "query", qry, true),
-                            err));
-
-                    onPage(nodeId, true);
-
-                    notifyAll();
+            if (err != null) {
+                if (!(err instanceof IgniteCheckedException)) {
+                    err = new IgniteCheckedException(nodeId != null ?
+                        S.toString("Failed to execute query on node",
+                            "query", qry, true,
+                            "nodeId", nodeId, false) :
+                        S.toString("Failed to execute query locally",
+                            "query", qry, true),
+                        err);
                 }
+
+                onError(err);
+            }
             else {
                 if (data == null)
                     data = Collections.emptyList();
 
-                data = dedupIfRequired((Collection<Object>)data);
+                if (qry.query().type() == GridCacheQueryType.TEXT) {
+                    ArrayList unwrapped = new ArrayList();
 
-                data = cctx.unwrapBinariesIfNeeded((Collection<Object>)data, qry.query().keepBinary());
+                    for (Object o: data) {
+                        ScoredCacheEntry e = (ScoredCacheEntry) o;
 
-                synchronized (this) {
-                    enqueue(data);
+                        Object uKey = CacheObjectUtils.unwrapBinary(
+                            cctx.cacheObjectContext(), e.getKey(), qry.query().keepBinary(), true, null);
 
-                    if (onPage(nodeId, finished)) {
-                        onDone(/* data */);
+                        Object uVal = CacheObjectUtils.unwrapBinary(
+                            cctx.cacheObjectContext(), e.getValue(), qry.query().keepBinary(), true, null);
 
-                        clear();
+                        if (uKey != e.getKey() || uVal != e.getValue())
+                            unwrapped.add(new ScoredCacheEntry<>(uKey, uVal, e.score()));
+                        else
+                            unwrapped.add(o);
                     }
 
-                    notifyAll();
-                }
+                    data = unwrapped;
+
+                } else
+                    data = cctx.unwrapBinariesIfNeeded((Collection<Object>)data, qry.query().keepBinary());
+
+                onPage(nodeId, (Collection<R>) data, lastPage);
+
+                if (isDone())
+                    clear();
             }
         }
         catch (Throwable e) {
-            onPageError(nodeId, e);
+            onError(e);
 
             if (e instanceof Error)
                 throw (Error)e;
         }
     }
 
-    /**
-     * @param nodeId Sender node id.
-     * @param e Error.
-     */
-    private void onPageError(@Nullable UUID nodeId, Throwable e) {
-        synchronized (this) {
-            enqueue(Collections.emptyList());
+    /** Invokes in case of this future error. */
+    protected abstract void onError(Throwable err);
 
-            onPage(nodeId, true);
+    /** Handles new data page from query node. */
+    protected abstract void onPage(UUID nodeId, Collection<R> data, boolean lastPage);
 
-            onDone(e);
+    /** {@inheritDoc} */
+    @Override public boolean onDone(Collection<R> res, Throwable err) {
+        boolean done = super.onDone(res, err);
 
-            notifyAll();
-        }
+        cctx.time().removeTimeoutObject(this);
+
+        return done;
     }
 
     /**
@@ -423,49 +328,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
         return obj != NULL ? obj : null;
     }
 
-    /** {@inheritDoc} */
-    @Override public Collection<R> get() throws IgniteCheckedException {
-        if (!isDone())
-            loadAllPages();
-
-        return super.get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException {
-        if (!isDone())
-            loadAllPages();
-
-        return super.get(timeout, unit);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<R> getUninterruptibly() throws IgniteCheckedException {
-        if (!isDone())
-            loadAllPages();
-
-        return super.getUninterruptibly();
-    }
-
-    /**
-     * @param nodeId Sender node id.
-     * @param last Whether page is last.
-     * @return Is query finished or not.
-     */
-    protected abstract boolean onPage(UUID nodeId, boolean last);
-
-    /**
-     * Loads next page.
-     */
-    protected abstract void loadPage();
-
-    /**
-     * Loads all left pages.
-     *
-     * @throws IgniteInterruptedCheckedException If thread is interrupted.
-     */
-    protected abstract void loadAllPages() throws IgniteInterruptedCheckedException;
-
     /**
      * Clears future.
      */
@@ -476,7 +338,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     /** {@inheritDoc} */
     @Override public boolean cancel() throws IgniteCheckedException {
         if (onCancelled()) {
-            cancelQuery();
+            cancelQuery(new IgniteCheckedException("Query was cancelled."));
 
             return true;
         }
@@ -485,9 +347,12 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     }
 
     /**
+     * Cancels query on remote nodes and cleanes owned resources.
+     *
+     * @param err If query was cancelled with error.
      * @throws IgniteCheckedException In case of error.
      */
-    protected abstract void cancelQuery() throws IgniteCheckedException;
+    protected abstract void cancelQuery(Throwable err) throws IgniteCheckedException;
 
     /** {@inheritDoc} */
     @Override public IgniteUuid timeoutId() {
@@ -501,14 +366,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
 
     /** {@inheritDoc} */
     @Override public void onTimeout() {
-        try {
-            cancelQuery();
-
-            onDone(new IgniteFutureTimeoutCheckedException("Query timed out."));
-        }
-        catch (IgniteCheckedException e) {
-            onDone(e);
-        }
+        onError(new IgniteFutureTimeoutCheckedException("Query timed out."));
     }
 
     /** {@inheritDoc} */
@@ -520,14 +378,4 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     @Override public String toString() {
         return S.toString(GridCacheQueryFutureAdapter.class, this);
     }
-
-    /**
-     *
-     */
-    public void printMemoryStats() {
-        X.println(">>> Query future memory statistics.");
-        X.println(">>>  queueSize: " + queue.size());
-        X.println(">>>  keysSize: " + keys.size());
-        X.println(">>>  cnt: " + cnt);
-    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 3190fd0..d02316b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -319,10 +319,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         cctx.events().removeListener(lsnr);
 
-        if (cancel)
-            onCancelAtStop();
-        else
-            onWaitAtStop();
+        onCancelAtStop();
     }
 
     /**
@@ -367,13 +364,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
-     * Wait flag handler at stop.
-     */
-    void onWaitAtStop() {
-        // No-op.
-    }
-
-    /**
      * Processes cache query request.
      *
      * @param sndId Sender node id.
@@ -515,16 +505,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         Collection<ClusterNode> nodes) throws IgniteCheckedException;
 
     /**
-     * Loads page.
-     *
-     * @param id Query ID.
-     * @param qry Query.
-     * @param nodes Nodes.
-     * @param all Whether to load all pages.
-     */
-    public abstract void loadPage(long id, GridCacheQueryAdapter<?> qry, Collection<ClusterNode> nodes, boolean all);
-
-    /**
      * Executes distributed fields query.
      *
      * @param qry Query.
@@ -1224,7 +1204,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                         final K key = row.getKey();
 
-                        V val = row.getValue();
+                        final V val = row.getValue();
 
                         if (log.isDebugEnabled()) {
                             ClusterNode primaryNode = cctx.affinity().primaryByKey(key,
@@ -1324,8 +1304,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             else
                                 continue;
                         }
-                        else
-                            data.add(new T2<>(key, val));
+                        else {
+                            if (type == TEXT)
+                                // (K, V, score). Value transfers as BinaryObject.
+                                data.add(row0);
+                            else
+                                data.add(new T2<>(key, val));
+                        }
                     }
 
                     if (!loc) {
@@ -2851,11 +2836,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @param clsName Query class name.
      * @param search Search clause.
      * @param limit Limits response records count. If 0 or less, considered to be no limit.
+     * @param pageSize Query page size.
      * @param keepBinary Keep binary flag.
      * @return Created query.
      */
     public CacheQuery<Map.Entry<K, V>> createFullTextQuery(String clsName,
-        String search, int limit, boolean keepBinary) {
+        String search, int limit, int pageSize, boolean keepBinary) {
         A.notNull("clsName", clsName);
         A.notNull("search", search);
 
@@ -2867,7 +2853,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             null,
             false,
             keepBinary,
-            null).limit(limit);
+            null)
+            .limit(limit)
+            .pageSize(pageSize);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 3d779f4..0912db3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -162,6 +162,85 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
     }
 
     /**
+     * Send initial query request to specified nodes.
+     *
+     * @param reqId Request (cache query) ID.
+     * @param fut Cache query future, contains query info.
+     */
+    public static GridCacheQueryRequest startQueryRequest(GridCacheContext<?, ?> cctx, long reqId,
+        GridCacheDistributedQueryFuture<?, ?, ?> fut) {
+        GridCacheQueryBean bean = fut.query();
+        GridCacheQueryAdapter<?> qry = bean.query();
+
+        boolean deployFilterOrTransformer = (qry.scanFilter() != null || qry.transform() != null)
+            && cctx.gridDeploy().enabled();
+
+        return new GridCacheQueryRequest(
+            cctx.cacheId(),
+            reqId,
+            cctx.name(),
+            qry.type(),
+            fut.fields(),
+            qry.clause(),
+            qry.idxQryDesc(),
+            qry.limit(),
+            qry.queryClassName(),
+            qry.scanFilter(),
+            qry.partition(),
+            bean.reducer(),
+            qry.transform(),
+            qry.pageSize(),
+            qry.includeBackups(),
+            bean.arguments(),
+            qry.includeMetadata(),
+            qry.keepBinary(),
+            qry.taskHash(),
+            cctx.startTopologyVersion(),
+            qry.mvccSnapshot(),
+            // Force deployment anyway if scan query is used.
+            cctx.deploymentEnabled() || deployFilterOrTransformer,
+            qry.isDataPageScanEnabled());
+    }
+
+    /**
+     * Send request for fetching query result pages to specified nodes.
+     *
+     * @param reqId Request (cache query) ID.
+     */
+    public static GridCacheQueryRequest pageRequest(GridCacheContext<?, ?> cctx, long reqId,
+        GridCacheQueryAdapter<?> qry, boolean fields) {
+
+        return new GridCacheQueryRequest(
+            cctx.cacheId(),
+            reqId,
+            cctx.name(),
+            qry.pageSize(),
+            qry.includeBackups(),
+            fields,
+            false,
+            qry.keepBinary(),
+            qry.taskHash(),
+            cctx.startTopologyVersion(),
+            // Force deployment anyway if scan query is used.
+            cctx.deploymentEnabled() || (qry.scanFilter() != null && cctx.gridDeploy().enabled()),
+            qry.isDataPageScanEnabled());
+    }
+
+    /**
+     * Send cancel query request, so no new pages will be sent.
+     *
+     * @param reqId Query request ID.
+     * @param fieldsQry Whether query is a fields query.
+     */
+    public static GridCacheQueryRequest cancelRequest(GridCacheContext<?, ?> cctx, long reqId, boolean fieldsQry) {
+        return new GridCacheQueryRequest(cctx.cacheId(),
+            reqId,
+            fieldsQry,
+            cctx.startTopologyVersion(),
+            cctx.deploymentEnabled());
+    }
+
+    /**
      * Creates cancel query request.
      *
      * @param cacheId Cache ID.
@@ -170,7 +249,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
      * @param topVer Topology version.
      * @param addDepInfo Deployment info flag.
      */
-    public GridCacheQueryRequest(
+    private GridCacheQueryRequest(
         int cacheId,
         long id,
         boolean fields,
@@ -202,7 +281,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
      * @param addDepInfo Deployment info flag.
      * @param dataPageScanEnabled Flag to enable data page scan.
      */
-    public GridCacheQueryRequest(
+    private GridCacheQueryRequest(
         int cacheId,
         long id,
         String cacheName,
@@ -256,7 +335,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
      * @param mvccSnapshot Mvcc snapshot.
      * @param addDepInfo Deployment info flag.
      */
-    public GridCacheQueryRequest(
+    private GridCacheQueryRequest(
         int cacheId,
         long id,
         String cacheName,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScoredCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScoredCacheEntry.java
new file mode 100644
index 0000000..e498c92
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScoredCacheEntry.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/** Represents cache key-value pair and score to compare cache entry by custom rule. */
+public class ScoredCacheEntry<K, V> extends IgniteBiTuple<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private float score;
+
+    /** */
+    public ScoredCacheEntry() {}
+
+    /** */
+    public ScoredCacheEntry(K key, V val, float score) {
+        super(key, val);
+
+        this.score = score;
+    }
+
+    /** */
+    public float score() {
+        return score;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+
+        out.writeObject(score);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+
+        score = (float)in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ScoredCacheEntry.class, this);
+    }
+}
+
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java
new file mode 100644
index 0000000..abbe949
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/CacheQueryReducer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
+
+/**
+ * This abstract class is base class for cache query reducers. They are responsible for reducing results of cache query.
+ *
+ * <T> is a type of cache query result item.
+ */
+public abstract class CacheQueryReducer<T> extends GridIteratorAdapter<T> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Page streams collection. */
+    protected final Map<UUID, NodePageStream<T>> pageStreams;
+
+    /** */
+    protected CacheQueryReducer(final Map<UUID, NodePageStream<T>> pageStreams) {
+        this.pageStreams = pageStreams;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeX() throws IgniteCheckedException {
+        throw new UnsupportedOperationException("CacheQueryReducer doesn't support removing items.");
+    }
+
+    /**
+     * @return Page with query results data from specified stream.
+     */
+    public static <T> NodePage<T> get(CompletableFuture<?> pageFut) throws IgniteCheckedException {
+        try {
+            return (NodePage<T>) pageFut.get();
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteCheckedException("Query was interrupted.", e);
+        }
+        catch (ExecutionException e) {
+            Throwable t = e.getCause();
+
+            if (t instanceof IgniteCheckedException)
+                throw (IgniteCheckedException) t;
+
+            throw new IgniteCheckedException("Page future was completed with unexpected error.", e);
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java
new file mode 100644
index 0000000..d2b3a3b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/MergeSortCacheQueryReducer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.query.ScoredCacheEntry;
+
+/**
+ * Reducer of cache query results that sort result through all nodes. Note that it's assumed that every node
+ * returns pre-sorted collection of data.
+ */
+public class MergeSortCacheQueryReducer<R> extends CacheQueryReducer<R> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Queue of pages from all nodes. Order of streams controls with order of head items of pages. */
+    private PriorityQueue<NodePage<R>> nodePages;
+
+    /**
+     * Page are iterated within the {@link #nextX()} method. In case a page is done, the corresponding stream is
+     * asked for new page. We have to wait it in {@link #hasNextX()}.
+     */
+    private UUID pendingNodeId;
+
+    /** */
+    public MergeSortCacheQueryReducer(final Map<UUID, NodePageStream<R>> pageStreams) {
+        super(pageStreams);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNextX() throws IgniteCheckedException {
+        // Initial sort.
+        if (nodePages == null) {
+            // Compares head pages from all nodes to get the lowest value at the moment.
+            Comparator<NodePage<R>> pageCmp = (o1, o2) -> textResultComparator.compare(
+                (ScoredCacheEntry<?, ?>)o1.head(), (ScoredCacheEntry<?, ?>)o2.head());
+
+            nodePages = new PriorityQueue<>(pageStreams.size(), pageCmp);
+
+            for (NodePageStream<R> s : pageStreams.values()) {
+                NodePage<R> p = get(s.headPage());
+
+                if (p == null || !p.hasNext())
+                    continue;
+
+                nodePages.add(p);
+            }
+        }
+
+        if (pendingNodeId != null) {
+            NodePageStream<R> stream = pageStreams.get(pendingNodeId);
+
+            if (!stream.closed()) {
+                NodePage<R> p = get(stream.headPage());
+
+                if (p != null && p.hasNext())
+                    nodePages.add(p);
+            }
+        }
+
+        return !nodePages.isEmpty();
+    }
+
+    /** {@inheritDoc} */
+    @Override public R nextX() throws IgniteCheckedException {
+        if (nodePages.isEmpty())
+            throw new NoSuchElementException("No next element. Please, be sure to invoke hasNext() before next().");
+
+        NodePage<R> page = nodePages.poll();
+
+        R o = page.next();
+
+        if (page.hasNext())
+            nodePages.offer(page);
+        else if (!pageStreams.get(page.nodeId()).closed())
+            pendingNodeId = page.nodeId();
+
+        return o;
+    }
+
+    /** Compares rows for {@code TextQuery} results for ordering results in MergeSort reducer. */
+    private static final Comparator<ScoredCacheEntry<?, ?>> textResultComparator = (c1, c2) ->
+        Float.compare(c2.score(), c1.score());
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePage.java
new file mode 100644
index 0000000..8336fd9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePage.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+/**
+ * Represents a single page with results of cache query from single node.
+ */
+public class NodePage<R> implements Iterator<R> {
+    /** Iterator over data in this page. */
+    private final Iterator<R> pageIter;
+
+    /** Node ID. */
+    private final UUID nodeId;
+
+    /** Head of page. Requires {@link #hasNext()} to be called on {@link #pageIter} before. */
+    private R head;
+
+    /**
+     * @param nodeId Node ID.
+     * @param data Page data.
+     */
+    public NodePage(UUID nodeId, Collection<R> data) {
+        pageIter = data.iterator();
+        this.nodeId = nodeId;
+    }
+
+    /** */
+    public R head() {
+        return head;
+    }
+
+    /** */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() {
+        if (head != null)
+            return true;
+        else {
+            if (!pageIter.hasNext())
+                return false;
+            else
+                return (head = pageIter.next()) != null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public R next() {
+        if (head == null)
+            throw new NoSuchElementException("Node page is empty.");
+
+        R o = head;
+
+        head = null;
+
+        return o;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
new file mode 100644
index 0000000..b6cfb08
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This class provides an interface {@link #headPage()} that returns a future will be completed with {@link NodePage}
+ * of cache query result from single node. A new page requests when previous page was fetched by class consumer.
+ */
+public class NodePageStream<R> {
+    /** Node ID to stream pages. */
+    private final UUID nodeId;
+
+    /** Request pages action. */
+    private final Runnable reqPages;
+
+    /** Cancel remote pages action. */
+    private final Runnable cancelPages;
+
+    /** Flags shows whether there are available pages on a query node. */
+    private boolean hasRemotePages = true;
+
+    /** Promise to notify the stream consumer about delivering new page. */
+    private CompletableFuture<NodePage<R>> head = new CompletableFuture<>();
+
+    /** */
+    public NodePageStream(UUID nodeId, Runnable reqPages, Runnable cancelPages) {
+        this.nodeId = nodeId;
+        this.reqPages = reqPages;
+        this.cancelPages = cancelPages;
+    }
+
+    /** */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * Returns a last delivered page from this stream.
+     *
+     * @return Future that will be completed with query result page.
+     */
+    public synchronized CompletableFuture<NodePage<R>> headPage() {
+        return head;
+    }
+
+    /**
+     * Add new query result page of data.
+     *
+     * @param data Collection of query result items.
+     * @param last Whether it is the last page from this node.
+     */
+    public synchronized void addPage(Collection<R> data, boolean last) {
+        head.complete(new NodePage<R>(nodeId, data) {
+            /** Flag shows whether the request for new page was triggered. */
+            private boolean reqNext;
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                if (!reqNext) {
+                    synchronized (NodePageStream.this) {
+                        if (hasRemotePages) {
+                            head = new CompletableFuture<>();
+
+                            reqPages.run();
+                        }
+                        else
+                            head = null;
+                    }
+
+                    reqNext = true;
+                }
+
+                return super.hasNext();
+            }
+        });
+
+        if (last)
+            hasRemotePages = false;
+    }
+
+    /**
+     * Cancel query on all nodes.
+     */
+    public synchronized void cancel(Throwable err) {
+        if (closed())
+            return;
+
+        head.completeExceptionally(err);
+
+        cancelPages.run();
+
+        hasRemotePages = false;
+    }
+
+    /**
+     * @return {@code true} if there are some undelivered page from the node, otherwise {@code false}.
+     */
+    public synchronized boolean hasRemotePages() {
+        return hasRemotePages;
+    }
+
+    /**
+     * @return {@code true} if this stream delivers all query results from the node to a consumer.
+     */
+    public synchronized boolean closed() {
+        return !hasRemotePages && (head == null);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java
new file mode 100644
index 0000000..9c50b2c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/UnsortedCacheQueryReducer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Reducer of cache query results, no ordering of results is provided.
+ */
+public class UnsortedCacheQueryReducer<R> extends CacheQueryReducer<R> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Current page to return data to user. */
+    private NodePage<R> page;
+
+    /** Pending futures for requeseted pages. */
+    private final CompletableFuture<NodePage<R>>[] futs;
+
+    /** */
+    public UnsortedCacheQueryReducer(Map<UUID, NodePageStream<R>> pageStreams) {
+        super(pageStreams);
+
+        futs = new CompletableFuture[pageStreams.size()];
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNextX() throws IgniteCheckedException {
+        while (page == null || !page.hasNext()) {
+            int pendingNodesCnt = 0;
+
+            for (NodePageStream<R> s: pageStreams.values()) {
+                if (s.closed())
+                    continue;
+
+                CompletableFuture<NodePage<R>> f = s.headPage();
+
+                if (f.isDone()) {
+                    page = get(f);
+
+                    if (page.hasNext())
+                        return true;
+                } else
+                    futs[pendingNodesCnt++] = f;
+            }
+
+            if (pendingNodesCnt == 0)
+                return false;
+
+            CompletableFuture[] pendingFuts = Arrays.copyOf(futs, pendingNodesCnt);
+
+            Arrays.fill(futs, 0, pendingNodesCnt, null);
+
+            page = get(CompletableFuture.anyOf(pendingFuts));
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public R nextX() throws IgniteCheckedException {
+        return page.next();
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
index a173785..4a183d5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.query.ScoredCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
@@ -358,7 +359,6 @@ public class GridLuceneIndex implements AutoCloseable {
          * @return Object.
          * @throws IgniteCheckedException If failed.
          */
-        @SuppressWarnings("unchecked")
         private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws IgniteCheckedException {
             if (coctx == null) // For tests.
                 return (Z)JdbcUtils.deserialize(bytes, null);
@@ -371,15 +371,18 @@ public class GridLuceneIndex implements AutoCloseable {
          *
          * @throws IgniteCheckedException If failed.
          */
-        @SuppressWarnings("unchecked")
         private void findNext() throws IgniteCheckedException {
             curr = null;
 
             while (idx < docs.length) {
                 Document doc;
+                float score;
 
                 try {
-                    doc = searcher.doc(docs[idx++].doc);
+                    doc = searcher.doc(docs[idx].doc);
+                    score = docs[idx].score;
+
+                    idx++;
                 }
                 catch (IOException e) {
                     throw new IgniteCheckedException(e);
@@ -401,7 +404,7 @@ public class GridLuceneIndex implements AutoCloseable {
 
                 assert v != null;
 
-                curr = new IgniteBiTuple<>(k, v);
+                curr = new ScoredCacheEntry(k, v, score);
 
                 break;
             }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryAbstractTest.java
new file mode 100644
index 0000000..3faba1b
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryAbstractTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.annotations.QueryTextField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/** */
+public class GridCacheFullTextQueryAbstractTest extends GridCommonAbstractTest {
+    /** Cache name */
+    protected static final String PERSON_CACHE = "Person";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        CacheConfiguration<Integer, Person> cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setName(PERSON_CACHE)
+            .setIndexedTypes(Integer.class, Person.class);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /** */
+    protected IgniteCache<Integer, Person> cache() {
+        return grid(0).cache(PERSON_CACHE);
+    }
+
+    /** */
+    protected static class Person {
+        /** */
+        @QueryTextField
+        @GridToStringInclude
+        final String name;
+
+        /** */
+        public Person(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryFailoverTest.java
new file mode 100644
index 0000000..ca8e366
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryFailoverTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/** */
+public class GridCacheFullTextQueryFailoverTest extends GridCacheFullTextQueryAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        IgniteCache<Integer, Person> cache = startGrids(2).cache(PERSON_CACHE);
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, new Person("str" + i));
+    }
+
+    /** */
+    @Test
+    public void testStopNodeDuringQuery() throws Exception {
+        TextQuery<Integer, Person> qry = new TextQuery<Integer, Person>(Person.class, "str~")
+            .setPageSize(10);
+
+        Iterator<Cache.Entry<Integer, Person>> iter = cache().query(qry).iterator();
+
+        // Initialize internal structures.
+        iter.next();
+
+        stopGrid(1);
+
+        awaitPartitionMapExchange();
+
+        GridTestUtils.assertThrows(log, iter::hasNext, CacheException.class, "Remote node has left topology");
+    }
+
+    /** */
+    @Test
+    public void testCancelQuery() {
+        TextQuery<Integer, Person> qry = new TextQuery<Integer, Person>(Person.class, "str~")
+            .setPageSize(10);
+
+        QueryCursor<Cache.Entry<Integer, Person>> cursor = cache().query(qry);
+
+        Iterator<Cache.Entry<Integer, Person>> iter = cursor.iterator();
+
+        // Initialize internal structures.
+        iter.next();
+
+        cursor.close();
+
+        assertFalse(iter.hasNext());
+
+        GridTestUtils.assertThrows(log, iter::next, NoSuchElementException.class, null);
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryLimitTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryLimitTest.java
new file mode 100644
index 0000000..8e30364
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryLimitTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import javax.cache.Cache;
+import org.apache.ignite.cache.query.TextQuery;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** */
+@RunWith(Parameterized.class)
+public class GridCacheFullTextQueryLimitTest extends GridCacheFullTextQueryAbstractTest {
+    /** Cache size. */
+    private static final int MAX_ITEM_PER_NODE_COUNT = 100;
+
+    /** Number of nodes. */
+    @Parameterized.Parameter
+    public int nodesCnt;
+
+    /** */
+    @Parameterized.Parameters(name = "nodesCnt={0}")
+    public static Iterable<Object[]> params() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (int i = 1; i <= 8; i++) {
+            Object[] p = new Object[1];
+            p[0] = i;
+
+            params.add(p);
+        }
+
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** */
+    @Test
+    public void testResultOrderedByScore() throws Exception {
+        startGrids(nodesCnt);
+
+        int items = MAX_ITEM_PER_NODE_COUNT * nodesCnt;
+
+        int helloPivot = new Random().nextInt(items);
+
+        for (int i = 0; i < items; i++) {
+            String name = i == helloPivot ? "hello" : String.format("hello%02d", i);
+
+            cache().put(i, new Person(name));
+        }
+
+        // Extract all data that matches even little.
+        for (int limit = 1; limit <= items; limit++) {
+            TextQuery<Integer, Person> qry = new TextQuery<Integer, Person>(Person.class, "hello~")
+                .setLimit(limit);
+
+            List<Cache.Entry<Integer, Person>> result = cache().query(qry).getAll();
+
+            // Lucene returns top 50 results by query from single node even if limit is higher.
+            // Number of docs depends on amount of data on every node.
+            if (limit <= 50)
+                assertEquals(limit, result.size());
+            else
+                assertTrue(limit >= result.size());
+
+            // hello has to be on the top.
+            assertEquals("Limit=" + limit, "hello", result.get(0).getValue().name);
+        }
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
index 862a87e..0b83ef2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.annotations.QueryTextField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -96,7 +97,7 @@ public class GridCacheFullTextQueryMultithreadedSelfTest extends GridCacheAbstra
 
         // Create query.
         final CacheQuery<Map.Entry<Integer, H2TextValue>> qry = c.context().queries().createFullTextQuery(
-            H2TextValue.class.getSimpleName(), txt, limit, false);
+            H2TextValue.class.getSimpleName(), txt, limit, Query.DFLT_PAGE_SIZE, false);
 
         qry.enableDedup(false);
         qry.includeBackups(false);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryPagesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryPagesTest.java
new file mode 100644
index 0000000..ad0354d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryPagesTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.junit.Test;
+
+/**
+ * Test pages loading for text queries tests.
+ */
+public class GridCacheFullTextQueryPagesTest extends GridCacheFullTextQueryAbstractTest {
+    /** Cache size. */
+    private static final int MAX_ITEM_COUNT = 10_000;
+
+    /** Nodes count. */
+    private static final int NODES = 4;
+
+    /** Query page size. */
+    private static final int PAGE_SIZE = 100;
+
+    /** Limitation to query response size */
+    private static final int QUERY_LIMIT = 100;
+
+    /** Client node to start query. */
+    private static IgniteEx client;
+
+    /** */
+    private static TestRecordingCommunicationSpi spi;
+
+    /** */
+    private static int dataCnt;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(NODES);
+
+        client = startClientGrid();
+
+        populateCache(client, MAX_ITEM_COUNT, (IgnitePredicate<Integer>)x -> String.valueOf(x).startsWith("1"));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        spi = TestRecordingCommunicationSpi.spi(client);
+
+        spi.record((n, m) -> m instanceof GridCacheQueryRequest);
+    }
+
+    /** Test that there is no cancel queries and number of requests corresponds to count of data rows on remote nodes. */
+    @Test
+    public void testTextQueryMultiplePagesNoLimit() {
+        checkTextQuery("1*", 0, PAGE_SIZE);
+
+        checkPages(4, 8, 0);
+    }
+
+    /** Test that do not send cache page request after limit exceeded. */
+    @Test
+    public void testTextQueryLimitedMultiplePages() {
+        checkTextQuery("1*", QUERY_LIMIT, 30);
+
+        checkPages(4, 7, 3);
+    }
+
+    /** Test that rerequest some pages but then send a cancel query after limit exceeded. */
+    @Test
+    public void testTextQueryHighLimitedMultiplePages() {
+        checkTextQuery("1*", QUERY_LIMIT, 20);
+
+        checkPages(4, 8, 3);
+    }
+
+    /** */
+    private void checkPages(int expInitCnt, int expLoadCnt, int expCancelCnt) {
+        List<Object> messages = spi.recordedMessages(true);
+
+        for (Object msg: messages) {
+            GridCacheQueryRequest req = (GridCacheQueryRequest) msg;
+
+            if (req.cancel())
+                expCancelCnt--;
+            else if (req.clause() != null)
+                expInitCnt--;
+            else
+                expLoadCnt--;
+        }
+
+        assertEquals(0, expInitCnt);
+        assertEquals(0, expLoadCnt);
+        assertEquals(0, expCancelCnt);
+    }
+
+    /**
+     * Fill cache.
+     *
+     * @throws IgniteCheckedException if failed.
+     */
+    private void populateCache(IgniteEx ignite, int cnt, IgnitePredicate<Integer> expectedEntryFilter) throws IgniteCheckedException {
+        IgniteInternalCache<Integer, Person> cache = ignite.cachex(PERSON_CACHE);
+
+        Affinity<Integer> aff = cache.affinity();
+
+        Map<UUID, Integer> nodeToCnt = new HashMap<>();
+
+        Set<String> vals = new HashSet<>();
+
+        for (int i = 0; i < cnt; i++) {
+            cache.put(i, new Person(String.valueOf(i)));
+
+            if (expectedEntryFilter.apply(i)) {
+                vals.add(String.valueOf(i));
+
+                UUID nodeId = aff.mapKeyToNode(i).id();
+
+                if (nodeId.equals(ignite.localNode().id()))
+                    continue;
+
+                nodeToCnt.putIfAbsent(nodeId, 0);
+
+                nodeToCnt.compute(nodeId, (k, v) -> v + 1);
+            }
+        }
+
+        dataCnt = vals.size();
+    }
+
+    /**
+     * @param clause Query clause.
+     * @param limit limits response size.
+     */
+    private void checkTextQuery(String clause, int limit, int pageSize) {
+        TextQuery<Integer, Person> qry = new TextQuery<Integer, Person>(Person.class, clause)
+            .setLimit(limit).setPageSize(pageSize);
+
+        IgniteCache<Integer, Person> cache = client.cache(PERSON_CACHE);
+
+        List<Cache.Entry<Integer, Person>> result = cache.query(qry).getAll();
+
+        int expRes = qry.getLimit() == 0 ? dataCnt : qry.getLimit();
+
+        assertEquals(expRes, result.size());
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 3ee65f3..2518d8a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -48,7 +48,10 @@ import org.apache.ignite.internal.processors.cache.DdlTransactionSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheCrossCacheQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheDynamicLoadOnClientPersistentTest;
 import org.apache.ignite.internal.processors.cache.GridCacheDynamicLoadOnClientTest;
+import org.apache.ignite.internal.processors.cache.GridCacheFullTextQueryFailoverTest;
+import org.apache.ignite.internal.processors.cache.GridCacheFullTextQueryLimitTest;
 import org.apache.ignite.internal.processors.cache.GridCacheFullTextQueryMultithreadedSelfTest;
+import org.apache.ignite.internal.processors.cache.GridCacheFullTextQueryPagesTest;
 import org.apache.ignite.internal.processors.cache.GridCacheFullTextQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheLazyQueryPartitionsReleaseTest;
 import org.apache.ignite.internal.processors.cache.GridCacheQueryIndexDisabledSelfTest;
@@ -495,8 +498,11 @@ import org.junit.runners.Suite;
     CacheQueryEvictDataLostTest.class,
 
     // Full text queries.
+    GridCacheFullTextQueryFailoverTest.class,
     GridCacheFullTextQuerySelfTest.class,
     GridCacheFullTextQueryMultithreadedSelfTest.class,
+    GridCacheFullTextQueryPagesTest.class,
+    GridCacheFullTextQueryLimitTest.class,
     IgniteCacheFullTextQueryNodeJoiningSelfTest.class,
 
     // Ignite cache and H2 comparison.
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
index 7e5450d..ccaf3f8 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
@@ -143,15 +143,16 @@ class KillCommandsTests {
         // Cancel first query.
         qryCanceler.accept(qryInfo);
 
-        // Fetch all cached entries. It's size equal to the {@code PAGE_SZ * NODES_CNT}.
-        for (int i = 0; i < PAGE_SZ * srvs.size() - 1; i++)
-            assertNotNull(iter1.next());
+        // Fetch of the next page should throw the exception. New page is delivered in parallel to iterating.
+        assertThrowsWithCause(() -> {
+            for (int i = 0; i < PAGE_SZ * PAGES_CNT - 1; i++)
+                assertNotNull(iter1.next());
 
-        // Fetch of the next page should throw the exception.
-        assertThrowsWithCause(iter1::next, IgniteCheckedException.class);
+            return null;
+        }, IgniteCheckedException.class);
 
         // Checking that second query works fine after canceling first.
-        for (int i = 0; i < PAGE_SZ * PAGE_SZ - 1; i++)
+        for (int i = 0; i < PAGE_SZ * PAGES_CNT - 1; i++)
             assertNotNull(iter2.next());
 
         checkScanQueryResources(cli, srvs, qryInfo.get3());