You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2019/01/22 11:11:07 UTC

[ignite] branch master updated: IGNITE-10993 Web Console: Implemented queries cancellation. This closes #5869.

This is an automated email from the ASF dual-hosted git repository.

akuznetsov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5861a92  IGNITE-10993 Web Console: Implemented queries cancellation. This closes #5869.
5861a92 is described below

commit 5861a9248fc66013cba960e96de3bbaac4106776
Author: Vasiliy Sisko <vs...@gridgain.com>
AuthorDate: Tue Jan 22 18:09:53 2019 +0700

    IGNITE-10993 Web Console: Implemented queries cancellation. This closes #5869.
---
 .../visor/node/VisorNodeBaselineStatus.java        |   2 +
 .../visor/query/VisorQueryCleanupTask.java         |  22 +-
 .../internal/visor/query/VisorQueryCursor.java     |  95 -------
 .../visor/query/VisorQueryFetchFirstPageTask.java  | 107 ++++++++
 .../internal/visor/query/VisorQueryHolder.java     | 171 ++++++++++++
 .../visor/query/VisorQueryNextPageTask.java        |  73 +----
 .../internal/visor/query/VisorQueryTask.java       |  83 +-----
 .../internal/visor/query/VisorQueryUtils.java      | 301 +++++++++++++++++++--
 .../internal/visor/query/VisorScanQueryTask.java   | 112 +-------
 .../commands/cache/VisorCacheScanCommand.scala     |  24 +-
 modules/web-console/backend/app/browsersHandler.js |   2 +
 .../components/queries-notebook/controller.ts      | 229 ++++++++++++++--
 .../components/queries-notebook/template.tpl.pug   |   6 +
 .../app/modules/agent/AgentManager.service.js      | 107 ++++----
 14 files changed, 892 insertions(+), 442 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java
index ea90be3..a273ca9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java
@@ -25,8 +25,10 @@ import org.jetbrains.annotations.Nullable;
 public enum VisorNodeBaselineStatus {
     /** */
     NODE_IN_BASELINE,
+
     /** */
     NODE_NOT_IN_BASELINE,
+
     /** */
     BASELINE_NOT_AVAILABLE;
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java
index 9dfa0cf..e62393a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.internal.processors.task.GridInternal;
@@ -34,6 +33,8 @@ import org.apache.ignite.internal.visor.VisorTaskArgument;
 import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.removeQueryHolder;
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.log;
 import static org.apache.ignite.internal.visor.util.VisorTaskUtils.logMapped;
 
 /**
@@ -106,14 +107,21 @@ public class VisorQueryCleanupTask extends VisorMultiNodeTask<VisorQueryCleanupT
 
         /** {@inheritDoc} */
         @Override protected Void run(Collection<String> qryIds) {
-            ConcurrentMap<String, VisorQueryCursor> storage = ignite.cluster().nodeLocalMap();
+            long start = U.currentTimeMillis();
+
+            if (debug) {
+                start = log(
+                    ignite.log(),
+                    "Queries cancellation started: [" + String.join(", ", qryIds) + "]",
+                    getClass(),
+                    start);
+            }
 
-            for (String qryId : qryIds) {
-                VisorQueryCursor cur = storage.remove(qryId);
+            for (String qryId : qryIds)
+                removeQueryHolder(ignite, qryId);
 
-                if (cur != null)
-                    cur.close();
-            }
+            if (debug)
+                log(ignite.log(), "Queries cancellation finished", getClass(), start);
 
             return null;
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java
deleted file mode 100644
index 3d5c58d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.visor.query;
-
-import java.util.Collection;
-import java.util.Iterator;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Wrapper for query cursor.
- */
-public class VisorQueryCursor<T> implements Iterator<T>, AutoCloseable {
-    /** */
-    private final QueryCursor<T> cur;
-
-    /** */
-    private final Iterator<T> itr;
-
-    /** Flag indicating that this cursor was read from last check. */
-    private volatile boolean accessed;
-
-    /**
-     * @param cur Cursor.
-     */
-    public VisorQueryCursor(QueryCursor<T> cur) {
-        this.cur = cur;
-
-        itr = cur.iterator();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean hasNext() {
-        return itr.hasNext();
-    }
-
-    /** {@inheritDoc} */
-    @Override public T next() {
-        return itr.next();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void remove() {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        cur.close();
-    }
-
-    /**
-     * @return SQL Fields query result metadata.
-     */
-    @SuppressWarnings("unchecked")
-    public Collection<GridQueryFieldMetadata> fieldsMeta() {
-        return ((QueryCursorImpl)cur).fieldsMeta();
-    }
-
-    /**
-     * @return Flag indicating that this future was read from last check..
-     */
-    public boolean accessed() {
-        return accessed;
-    }
-
-    /**
-     * @param accessed New accessed.
-     */
-    public void accessed(boolean accessed) {
-        this.accessed = accessed;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(VisorQueryCursor.class, this);
-    }
-}
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryFetchFirstPageTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryFetchFirstPageTask.java
new file mode 100644
index 0000000..98d4801
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryFetchFirstPageTask.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorEither;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorOneNodeTask;
+import org.apache.ignite.internal.visor.util.VisorExceptionWrapper;
+
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.fetchQueryRows;
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.getQueryHolder;
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.removeQueryHolder;
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.log;
+
+/**
+ * Task for check a query execution and receiving first page of query result.
+ */
+@GridInternal
+public class VisorQueryFetchFirstPageTask extends VisorOneNodeTask<VisorQueryNextPageTaskArg, VisorEither<VisorQueryResult>> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected VisorQueryFetchFirstPageJob job(VisorQueryNextPageTaskArg arg) {
+        return new VisorQueryFetchFirstPageJob(arg, debug);
+    }
+
+    /**
+     * Job for collecting first page previously executed SQL or SCAN query.
+     */
+    private static class VisorQueryFetchFirstPageJob extends VisorJob<VisorQueryNextPageTaskArg, VisorEither<VisorQueryResult>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Create job with specified argument.
+         *
+         * @param arg Job argument.
+         * @param debug Debug flag.
+         */
+        private VisorQueryFetchFirstPageJob(VisorQueryNextPageTaskArg arg, boolean debug) {
+            super(arg, debug);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected VisorEither<VisorQueryResult> run(VisorQueryNextPageTaskArg arg) {
+            String qryId = arg.getQueryId();
+
+            long start = U.currentTimeMillis();
+
+            if (debug)
+                start = log(ignite.log(), "Fetch query first page started: " + qryId, getClass(), start);
+
+            VisorQueryHolder holder = getQueryHolder(ignite, qryId);
+
+            if (holder.getErr() != null)
+                return new VisorEither<>(new VisorExceptionWrapper(holder.getErr()));
+
+            List<Object[]> rows = null;
+            List<VisorQueryField> cols = holder.getColumns();
+
+            boolean hasMore = cols == null;
+
+            if (cols != null) {
+                Iterator itr = holder.getIterator();
+                rows = fetchQueryRows(itr, qryId, arg.getPageSize());
+                hasMore = itr.hasNext();
+            }
+
+            if (hasMore)
+                holder.setAccessed(true);
+            else
+                removeQueryHolder(ignite, qryId);
+
+            if (debug)
+                log(ignite.log(), "Fetch query first page finished: " + qryId, getClass(), start);
+
+            return new VisorEither<>(
+                new VisorQueryResult(ignite.localNode().id(), qryId, cols, rows, hasMore, holder.duration()));
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(VisorQueryFetchFirstPageJob.class, this);
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryHolder.java
new file mode 100644
index 0000000..af54f27
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryHolder.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+
+/**
+ * Holds identify information of executing query and its result.
+ */
+public class VisorQueryHolder implements AutoCloseable {
+    /** Prefix for node local key for SQL queries. */
+    private static final String SQL_QRY_PREFIX = "VISOR_SQL_QUERY";
+
+    /** Prefix for node local key for SCAN queries. */
+    private static final String SCAN_QRY_PREFIX = "VISOR_SCAN_QUERY";
+
+    /** Query ID for extraction query data result. */
+    private final String qryId;
+
+    /** Cancel query object. */
+    private final GridQueryCancel cancel;
+
+    /** Query column descriptors. */
+    private volatile List<VisorQueryField> cols;
+
+    /** Error in process of query result receiving. */
+    private volatile Throwable err;
+
+    /** Query duration in ms. */
+    private volatile long duration;
+
+    /** Flag indicating that this cursor was read from last check. */
+    private volatile boolean accessed;
+
+    /** Query cursor. */
+    private volatile QueryCursor cur;
+
+    /** Result set iterator. */
+    private volatile Iterator itr;
+
+    /**
+     * @param qryId Query ID.
+     * @return {@code true} if holder contains SQL query.
+     */
+    public static boolean isSqlQuery(String qryId) {
+        return qryId.startsWith(SQL_QRY_PREFIX);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param sqlQry Flag indicating that holder contains SQL or SCAN query.
+     * @param cur Query cursor.
+     * @param cancel Cancel object.
+     */
+    VisorQueryHolder(boolean sqlQry, QueryCursor cur, GridQueryCancel cancel) {
+        this.cur = cur;
+        this.cancel = cancel;
+
+        // Generate query ID to store query cursor in node local storage.
+        qryId = (sqlQry ? SQL_QRY_PREFIX : SCAN_QRY_PREFIX) + "-" + UUID.randomUUID();
+    }
+
+    /**
+     * @return Query ID for extraction query data result.
+     */
+    public String getQueryID() {
+        return qryId;
+    }
+
+    /**
+     * @return Result set iterator.
+     */
+    public synchronized Iterator getIterator() {
+        assert cur != null;
+
+        if (itr == null)
+            itr = cur.iterator();
+
+        return itr;
+    }
+
+    /**
+     * @return Query column descriptors.
+     */
+    public List<VisorQueryField> getColumns() {
+        return cols;
+    }
+
+    /**
+     * Complete query execution.
+     *
+     * @param cur Query cursor.
+     * @param duration Duration of query execution.
+     * @param cols Query column descriptors.
+     */
+    public void complete(QueryCursor cur, long duration, List<VisorQueryField> cols) {
+        this.cur = cur;
+        this.duration = duration;
+        this.cols = cols;
+        this.accessed = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (cur != null)
+            cur.close();
+
+        if (cancel != null)
+            cancel.cancel();
+    }
+
+    /**
+     * @return Error in process of query result receiving.
+     */
+    public Throwable getErr() {
+        return err;
+    }
+
+    /**
+     * Set error caught during query execution.
+     *
+     * @param err Error caught during query execution.
+     */
+    public void setError(Throwable err) {
+        this.err = err;
+
+        if (cur != null)
+            cur.close();
+    }
+
+    /**
+     * @return Flag indicating that this future was read from last check..
+     */
+    public boolean isAccessed() {
+        return accessed;
+    }
+
+    /**
+     * @param accessed New accessed.
+     */
+    public void setAccessed(boolean accessed) {
+        this.accessed = accessed;
+    }
+
+    /**
+     * @return Duration of query execution.
+     */
+    public long duration() {
+        return duration;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java
index 4684c49..dc7b09f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java
@@ -17,16 +17,17 @@
 
 package org.apache.ignite.internal.visor.query;
 
+import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import javax.cache.Cache;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.VisorOneNodeTask;
 
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.getQueryHolder;
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.removeQueryHolder;
+
 /**
  * Task for collecting next page previously executed SQL or SCAN query.
  */
@@ -59,77 +60,29 @@ public class VisorQueryNextPageTask extends VisorOneNodeTask<VisorQueryNextPageT
 
         /** {@inheritDoc} */
         @Override protected VisorQueryResult run(VisorQueryNextPageTaskArg arg) {
-            return arg.getQueryId().startsWith(VisorQueryUtils.SCAN_QRY_NAME) ? nextScanPage(arg) : nextSqlPage(arg);
-        }
-
-        /**
-         * Collect data from SQL query.
-         *
-         * @param arg Query name and page size.
-         * @return Query result with next page.
-         */
-        private VisorQueryResult nextSqlPage(VisorQueryNextPageTaskArg arg) {
             long start = U.currentTimeMillis();
 
-            ConcurrentMap<String, VisorQueryCursor<List<?>>> storage = ignite.cluster().nodeLocalMap();
-
             String qryId = arg.getQueryId();
 
-            VisorQueryCursor<List<?>> cur = storage.get(qryId);
+            VisorQueryHolder holder = getQueryHolder(ignite, qryId);
 
-            if (cur == null)
-                throw new IgniteException("SQL query results are expired.");
+            Iterator itr = holder.getIterator();
 
-            List<Object[]> nextRows = VisorQueryUtils.fetchSqlQueryRows(cur, arg.getPageSize());
+            List<Object[]> nextRows = VisorQueryHolder.isSqlQuery(qryId)
+                ? VisorQueryUtils.fetchSqlQueryRows(itr, arg.getPageSize())
+                : VisorQueryUtils.fetchScanQueryRows(itr, arg.getPageSize());
 
-            boolean hasMore = cur.hasNext();
+            boolean hasMore = itr.hasNext();
 
             if (hasMore)
-                cur.accessed(true);
-            else {
-                storage.remove(qryId);
-
-                cur.close();
-            }
+                holder.setAccessed(true);
+            else
+                removeQueryHolder(ignite, qryId);
 
             return new VisorQueryResult(ignite.localNode().id(), qryId, null, nextRows, hasMore,
                 U.currentTimeMillis() - start);
         }
 
-        /**
-         * Collect data from SCAN query
-         *
-         * @param arg Query name and page size.
-         * @return Next page with data.
-         */
-        private VisorQueryResult nextScanPage(VisorQueryNextPageTaskArg arg) {
-            long start = U.currentTimeMillis();
-
-            ConcurrentMap<String, VisorQueryCursor<Cache.Entry<Object, Object>>> storage = ignite.cluster().nodeLocalMap();
-
-            String qryId = arg.getQueryId();
-
-            VisorQueryCursor<Cache.Entry<Object, Object>> cur = storage.get(qryId);
-
-            if (cur == null)
-                throw new IgniteException("Scan query results are expired.");
-
-            List<Object[]> rows = VisorQueryUtils.fetchScanQueryRows(cur, arg.getPageSize());
-
-            boolean hasMore = cur.hasNext();
-
-            if (hasMore)
-                cur.accessed(true);
-            else {
-                storage.remove(qryId);
-
-                cur.close();
-            }
-
-            return new VisorQueryResult(ignite.localNode().id(), qryId, null, rows, hasMore,
-                U.currentTimeMillis() - start);
-        }
-
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(VisorQueryNextPageJob.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
index 97ee83e..88a1c8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
@@ -17,29 +17,18 @@
 
 package org.apache.ignite.internal.visor.query;
 
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import java.util.Map;
 import java.util.UUID;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.processors.task.GridVisorManagementTask;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorEither;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.VisorOneNodeTask;
 import org.apache.ignite.internal.visor.util.VisorExceptionWrapper;
 
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.SQL_QRY_NAME;
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.fetchSqlQueryRows;
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.scheduleResultSetHolderRemoval;
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.scheduleQueryStart;
 
 /**
  * Task for execute SQL fields query and get first page of results.
@@ -77,71 +66,17 @@ public class VisorQueryTask extends VisorOneNodeTask<VisorQueryTaskArg, VisorEit
             try {
                 UUID nid = ignite.localNode().id();
 
-                SqlFieldsQuery qry = new SqlFieldsQuery(arg.getQueryText());
-                qry.setPageSize(arg.getPageSize());
-                qry.setLocal(arg.isLocal());
-                qry.setDistributedJoins(arg.isDistributedJoins());
-                qry.setCollocated(arg.isCollocated());
-                qry.setEnforceJoinOrder(arg.isEnforceJoinOrder());
-                qry.setReplicatedOnly(arg.isReplicatedOnly());
-                qry.setLazy(arg.getLazy());
+                GridQueryCancel cancel = new GridQueryCancel();
 
-                long start = U.currentTimeMillis();
+                Map<String, VisorQueryHolder> storage = ignite.cluster().nodeLocalMap();
 
-                List<FieldsQueryCursor<List<?>>> qryCursors;
+                VisorQueryHolder holder = new VisorQueryHolder(true, null, cancel);
 
-                String cacheName = arg.getCacheName();
+                storage.put(holder.getQueryID(), holder);
 
-                if (F.isEmpty(cacheName))
-                    qryCursors = ignite.context().query().querySqlFields(qry, true, false);
-                else {
-                    IgniteCache<Object, Object> c = ignite.cache(cacheName);
+                scheduleQueryStart(ignite, holder, arg, cancel);
 
-                    if (c == null)
-                        throw new SQLException("Fail to execute query. Cache not found: " + cacheName);
-
-                    qryCursors = ((IgniteCacheProxy)c.withKeepBinary()).queryMultipleStatements(qry);
-                }
-
-                // In case of multiple statements leave opened only last cursor.
-                for (int i = 0; i < qryCursors.size() - 1; i++)
-                    U.closeQuiet(qryCursors.get(i));
-
-                // In case of multiple statements return last cursor as result.
-                VisorQueryCursor<List<?>> cur = new VisorQueryCursor<>(F.last(qryCursors));
-
-                Collection<GridQueryFieldMetadata> meta = cur.fieldsMeta();
-
-                if (meta == null)
-                    return new VisorEither<>(
-                        new VisorExceptionWrapper(new SQLException("Fail to execute query. No metadata available.")));
-                else {
-                    List<VisorQueryField> names = new ArrayList<>(meta.size());
-
-                    for (GridQueryFieldMetadata col : meta)
-                        names.add(new VisorQueryField(col.schemaName(), col.typeName(),
-                            col.fieldName(), col.fieldTypeName()));
-
-                    List<Object[]> rows = fetchSqlQueryRows(cur, arg.getPageSize());
-
-                    // Query duration + fetch duration.
-                    long duration = U.currentTimeMillis() - start;
-
-                    boolean hasNext = cur.hasNext();
-
-                    // Generate query ID to store query cursor in node local storage.
-                    String qryId = SQL_QRY_NAME + "-" + UUID.randomUUID();
-
-                    if (hasNext) {
-                        ignite.cluster().<String, VisorQueryCursor<List<?>>>nodeLocalMap().put(qryId, cur);
-
-                        scheduleResultSetHolderRemoval(qryId, ignite);
-                    }
-                    else
-                        cur.close();
-
-                    return new VisorEither<>(new VisorQueryResult(nid, qryId, names, rows, hasNext, duration));
-                }
+                return new VisorEither<>(new VisorQueryResult(nid, holder.getQueryID(), null, null, false, 0));
             }
             catch (Throwable e) {
                 return new VisorEither<>(new VisorExceptionWrapper(e));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java
index a47acf6..6143e30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java
@@ -19,21 +19,39 @@ package org.apache.ignite.internal.visor.query;
 
 import java.math.BigDecimal;
 import java.net.URL;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.binary.BinaryObjectEx;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
 
 /**
  * Contains utility methods for Visor query tasks and jobs.
@@ -42,11 +60,11 @@ public class VisorQueryUtils {
     /** How long to store future with query in node local map: 5 minutes. */
     public static final Integer RMV_DELAY = 5 * 60 * 1000;
 
-    /** Prefix for node local key for SQL queries. */
-    public static final String SQL_QRY_NAME = "VISOR_SQL_QUERY";
+    /** Message for query result expired error. */
+    private static final String SQL_QRY_RESULTS_EXPIRED_ERR = "SQL query results are expired.";
 
-    /** Prefix for node local key for SCAN queries. */
-    public static final String SCAN_QRY_NAME = "VISOR_SCAN_QUERY";
+    /** Message for scan result expired error. */
+    private static final String SCAN_QRY_RESULTS_EXPIRED_ERR = "Scan query results are expired.";
 
     /** Columns for SCAN queries. */
     public static final List<VisorQueryField> SCAN_COL_NAMES = Arrays.asList(
@@ -55,7 +73,7 @@ public class VisorQueryUtils {
     );
 
     /**
-     * @param o - Object.
+     * @param o Source object.
      * @return String representation of object class.
      */
     private static String typeOf(Object o) {
@@ -130,17 +148,19 @@ public class VisorQueryUtils {
     /**
      * Fetch rows from SCAN query future.
      *
-     * @param cur Query future to fetch rows from.
+     * @param itr Result set iterator.
      * @param pageSize Number of rows to fetch.
      * @return Fetched rows.
      */
-    public static List<Object[]> fetchScanQueryRows(VisorQueryCursor<Cache.Entry<Object, Object>> cur, int pageSize) {
+    public static List<Object[]> fetchScanQueryRows(Iterator itr, int pageSize) {
         List<Object[]> rows = new ArrayList<>();
 
         int cnt = 0;
 
-        while (cur.hasNext() && cnt < pageSize) {
-            Cache.Entry<Object, Object> next = cur.next();
+        Iterator<Cache.Entry<Object, Object>> scanItr = (Iterator<Cache.Entry<Object, Object>>)itr;
+
+        while (scanItr.hasNext() && cnt < pageSize) {
+            Cache.Entry<Object, Object> next = scanItr.next();
 
             Object k = next.getKey();
             Object v = next.getValue();
@@ -227,6 +247,12 @@ public class VisorQueryUtils {
             "typeId", obj.type().typeId(), true);
     }
 
+    /**
+     * Convert object that can be passed to client.
+     *
+     * @param original Source object.
+     * @return Converted value.
+     */
     public static Object convertValue(Object original) {
         if (original == null)
             return null;
@@ -241,17 +267,19 @@ public class VisorQueryUtils {
     /**
      * Collects rows from sql query future, first time creates meta and column names arrays.
      *
-     * @param cur Query cursor to fetch rows from.
+     * @param itr Result set iterator.
      * @param pageSize Number of rows to fetch.
      * @return Fetched rows.
      */
-    public static List<Object[]> fetchSqlQueryRows(VisorQueryCursor<List<?>> cur, int pageSize) {
+    public static List<Object[]> fetchSqlQueryRows(Iterator itr, int pageSize) {
         List<Object[]> rows = new ArrayList<>();
 
         int cnt = 0;
 
-        while (cur.hasNext() && cnt < pageSize) {
-            List<?> next = cur.next();
+        Iterator<List<?>> sqlItr = (Iterator<List<?>>)itr;
+
+        while (sqlItr.hasNext() && cnt < pageSize) {
+            List<?> next = sqlItr.next();
 
             int sz = next.size();
 
@@ -269,27 +297,252 @@ public class VisorQueryUtils {
     }
 
     /**
+     * Get holder for query or throw exception if not found.
+     *
+     * @param ignite IgniteEx instance.
+     * @param qryId Query ID to get holder.
+     * @return Query holder for specified query ID.
+     * @throws IgniteException When holder is not found.
+     */
+    public static VisorQueryHolder getQueryHolder(final IgniteEx ignite, final String qryId) throws IgniteException {
+        ConcurrentMap<String, VisorQueryHolder> storage = ignite.cluster().nodeLocalMap();
+
+        VisorQueryHolder holder = storage.get(qryId);
+
+        if (holder == null)
+            throw new IgniteException(VisorQueryHolder.isSqlQuery(qryId)
+                ? SQL_QRY_RESULTS_EXPIRED_ERR
+                : SCAN_QRY_RESULTS_EXPIRED_ERR);
+
+        return holder;
+    }
+
+    /**
+     * Remove query holder from local storage for query with specified ID and cancel query if it is in progress.
+     *
+     * @param ignite IgniteEx instance.
+     * @param qryId Query ID to get holder.
+     */
+    public static void removeQueryHolder(final IgniteEx ignite, final String qryId) {
+        ConcurrentMap<String, VisorQueryHolder> storage = ignite.cluster().nodeLocalMap();
+        VisorQueryHolder holder = storage.remove(qryId);
+
+        if (holder != null)
+            holder.close();
+    }
+
+    /**
+     * Fetch rows from query cursor.
+     *
+     * @param itr Result set iterator.
+     * @param qryId Query ID.
+     * @param pageSize Page size.
+     */
+    public static List<Object[]> fetchQueryRows(Iterator itr, String qryId, int pageSize) {
+        return itr.hasNext()
+            ? (VisorQueryHolder.isSqlQuery(qryId)
+                ? fetchSqlQueryRows(itr, pageSize)
+                : fetchScanQueryRows(itr, pageSize))
+            : Collections.emptyList();
+    }
+
+    /**
+     * Schedule start of SQL query execution.
+     *
+     * @param ignite IgniteEx instance.
+     * @param holder Query holder object.
+     * @param arg Query task argument with query properties.
+     * @param cancel Object to cancel query.
+     */
+    public static void scheduleQueryStart(
+        final IgniteEx ignite,
+        final VisorQueryHolder holder,
+        final VisorQueryTaskArg arg,
+        final GridQueryCancel cancel
+    ) {
+        ignite.context().closure().runLocalSafe(() -> {
+            try {
+                SqlFieldsQuery qry = new SqlFieldsQuery(arg.getQueryText());
+
+                qry.setPageSize(arg.getPageSize());
+                qry.setLocal(arg.isLocal());
+                qry.setDistributedJoins(arg.isDistributedJoins());
+                qry.setCollocated(arg.isCollocated());
+                qry.setEnforceJoinOrder(arg.isEnforceJoinOrder());
+                qry.setReplicatedOnly(arg.isReplicatedOnly());
+                qry.setLazy(arg.getLazy());
+
+                String cacheName = arg.getCacheName();
+
+                if (!F.isEmpty(cacheName))
+                    qry.setSchema(cacheName);
+
+                long start = U.currentTimeMillis();
+
+                List<FieldsQueryCursor<List<?>>> qryCursors = ignite
+                    .context()
+                    .query()
+                    .querySqlFields(null, qry, null, true, false, cancel);
+
+                // In case of multiple statements leave opened only last cursor.
+                for (int i = 0; i < qryCursors.size() - 1; i++)
+                    U.closeQuiet(qryCursors.get(i));
+
+                // In case of multiple statements return last cursor as result.
+                FieldsQueryCursor<List<?>> cur = F.last(qryCursors);
+
+                try {
+                    // Ensure holder was not removed from node local storage from separate thread if user cancel query.
+                    VisorQueryHolder actualHolder = getQueryHolder(ignite, holder.getQueryID());
+
+                    List<GridQueryFieldMetadata> meta = ((QueryCursorEx)cur).fieldsMeta();
+
+                    if (meta == null)
+                        actualHolder.setError(new SQLException("Fail to execute query. No metadata available."));
+                    else {
+                        List<VisorQueryField> cols = new ArrayList<>(meta.size());
+
+                        for (GridQueryFieldMetadata col : meta) {
+                            cols.add(new VisorQueryField(
+                                col.schemaName(),
+                                col.typeName(),
+                                col.fieldName(),
+                                col.fieldTypeName())
+                            );
+                        }
+
+                        actualHolder.complete(cur, U.currentTimeMillis() - start, cols);
+
+                        scheduleQueryHolderRemoval(ignite, actualHolder.getQueryID());
+                    }
+                }
+                catch (Throwable e) {
+                    U.closeQuiet(cur);
+
+                    throw e;
+                }
+            }
+            catch (Throwable e) {
+                holder.setError(e);
+            }
+        }, MANAGEMENT_POOL);
+    }
+
+    /**
+     * Schedule start of SCAN query execution.
+     *
+     * @param ignite IgniteEx instance.
+     * @param holder Query holder object.
+     * @param arg Query task argument with query properties.
+     */
+    public static void scheduleScanStart(
+        final IgniteEx ignite,
+        final VisorQueryHolder holder,
+        final VisorScanQueryTaskArg arg
+    ) {
+        ignite.context().closure().runLocalSafe(() -> {
+            try {
+                IgniteCache<Object, Object> c = ignite.cache(arg.getCacheName());
+                String filterText = arg.getFilter();
+                IgniteBiPredicate<Object, Object> filter = null;
+
+                if (!F.isEmpty(filterText))
+                    filter = new VisorQueryScanRegexFilter(arg.isCaseSensitive(), arg.isRegEx(), filterText);
+
+                QueryCursor<Cache.Entry<Object, Object>> cur;
+
+                long start = U.currentTimeMillis();
+
+                if (arg.isNear())
+                    cur = new VisorNearCacheCursor<>(c.localEntries(CachePeekMode.NEAR).iterator());
+                else {
+                    ScanQuery<Object, Object> qry = new ScanQuery<>(filter);
+                    qry.setPageSize(arg.getPageSize());
+                    qry.setLocal(arg.isLocal());
+
+                    cur = c.withKeepBinary().query(qry);
+                }
+
+                try {
+                    // Ensure holder was not removed from node local storage from separate thread if user cancel query.
+                    VisorQueryHolder actualHolder = getQueryHolder(ignite, holder.getQueryID());
+
+                    actualHolder.complete(cur, U.currentTimeMillis() - start, SCAN_COL_NAMES);
+
+                    scheduleQueryHolderRemoval(ignite, actualHolder.getQueryID());
+                }
+                catch (Throwable e) {
+                    U.closeQuiet(cur);
+
+                    throw e;
+                }
+            }
+            catch (Throwable e) {
+                holder.setError(e);
+            }
+        }, MANAGEMENT_POOL);
+    }
+
+    /**
+     * Wrapper for cache iterator to behave like {@link QueryCursor}.
+     */
+    private static class VisorNearCacheCursor<T> implements QueryCursor<T> {
+        /** Wrapped iterator.  */
+        private final Iterator<T> it;
+
+        /**
+         * Wrapping constructor.
+         *
+         * @param it Near cache iterator to wrap.
+         */
+        private VisorNearCacheCursor(Iterator<T> it) {
+            this.it = it;
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<T> getAll() {
+            List<T> all = new ArrayList<>();
+
+            while(it.hasNext())
+                all.add(it.next());
+
+            return all;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            // Nothing to close.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<T> iterator() {
+            return it;
+        }
+    }
+
+    /**
+     * Schedule clearing of query context by timeout.
+     *
      * @param qryId Unique query result id.
+     * @param ignite IgniteEx instance.
      */
-    public static void scheduleResultSetHolderRemoval(final String qryId, final IgniteEx ignite) {
+    public static void scheduleQueryHolderRemoval(final IgniteEx ignite, final String qryId) {
         ignite.context().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(RMV_DELAY) {
             @Override public void onTimeout() {
-                ConcurrentMap<String, VisorQueryCursor> storage = ignite.cluster().nodeLocalMap();
+                ConcurrentMap<String, VisorQueryHolder> storage = ignite.cluster().nodeLocalMap();
 
-                VisorQueryCursor cur = storage.get(qryId);
+                VisorQueryHolder holder = storage.get(qryId);
 
-                if (cur != null) {
-                    // If cursor was accessed since last scheduling, set access flag to false and reschedule.
-                    if (cur.accessed()) {
-                        cur.accessed(false);
+                if (holder != null) {
+                    if (holder.isAccessed()) {
+                        holder.setAccessed(false);
 
-                        scheduleResultSetHolderRemoval(qryId, ignite);
+                        // Holder was accessed, we need to keep it for one more period.
+                        scheduleQueryHolderRemoval(ignite, qryId);
                     }
                     else {
                         // Remove stored cursor otherwise.
-                        storage.remove(qryId);
-
-                        cur.close();
+                        removeQueryHolder(ignite, qryId);
                     }
                 }
             }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorScanQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorScanQueryTask.java
index 0c1c418..7c0fac9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorScanQueryTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorScanQueryTask.java
@@ -17,29 +17,15 @@
 
 package org.apache.ignite.internal.visor.query;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
 import java.util.UUID;
-import javax.cache.Cache;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.internal.processors.task.GridInternal;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorEither;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.VisorOneNodeTask;
 import org.apache.ignite.internal.visor.util.VisorExceptionWrapper;
-import org.apache.ignite.lang.IgniteBiPredicate;
 
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.SCAN_COL_NAMES;
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.SCAN_QRY_NAME;
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.fetchScanQueryRows;
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.scheduleResultSetHolderRemoval;
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.scheduleScanStart;
 
 /**
  * Task for execute SCAN query and get first page of results.
@@ -71,69 +57,18 @@ public class VisorScanQueryTask extends VisorOneNodeTask<VisorScanQueryTaskArg,
             super(arg, debug);
         }
 
-        /**
-         * Execute scan query.
-         *
-         * @param c Cache to scan.
-         * @param arg Job argument with query parameters.
-         * @return Query cursor.
-         */
-        private QueryCursor<Cache.Entry<Object, Object>> scan(IgniteCache<Object, Object> c, VisorScanQueryTaskArg arg,
-            IgniteBiPredicate<Object, Object> filter) {
-            ScanQuery<Object, Object> qry = new ScanQuery<>(filter);
-            qry.setPageSize(arg.getPageSize());
-            qry.setLocal(arg.isLocal());
-
-            return c.withKeepBinary().query(qry);
-        }
-
-        /**
-         * Scan near cache.
-         *
-         * @param c Cache to scan near entries.
-         * @return Cache entries iterator wrapped with query cursor.
-         */
-        private QueryCursor<Cache.Entry<Object, Object>> near(IgniteCache<Object, Object> c) {
-            return new VisorNearCacheCursor<>(c.localEntries(CachePeekMode.NEAR).iterator());
-        }
-
         /** {@inheritDoc} */
         @Override protected VisorEither<VisorQueryResult> run(final VisorScanQueryTaskArg arg) {
             try {
-                IgniteCache<Object, Object> c = ignite.cache(arg.getCacheName());
                 UUID nid = ignite.localNode().id();
 
-                String filterText = arg.getFilter();
-
-                long start = U.currentTimeMillis();
-
-                IgniteBiPredicate<Object, Object> filter = null;
-
-                if (!F.isEmpty(filterText))
-                    filter = new VisorQueryScanRegexFilter(arg.isCaseSensitive(), arg.isRegEx(), filterText);
-
-                VisorQueryCursor<Cache.Entry<Object, Object>> cur =
-                    new VisorQueryCursor<>(arg.isNear() ? near(c) : scan(c, arg, filter));
-
-                List<Object[]> rows = fetchScanQueryRows(cur, arg.getPageSize());
-
-                long duration = U.currentTimeMillis() - start; // Scan duration + fetch duration.
+                VisorQueryHolder holder = new VisorQueryHolder(false, null, null);
 
-                boolean hasNext = cur.hasNext();
+                ignite.cluster().<String, VisorQueryHolder>nodeLocalMap().put(holder.getQueryID(), holder);
 
-                // Generate query ID to store query cursor in node local storage.
-                String qryId = SCAN_QRY_NAME + "-" + UUID.randomUUID();
+                scheduleScanStart(ignite, holder, arg);
 
-                if (hasNext) {
-                    ignite.cluster().<String, VisorQueryCursor>nodeLocalMap().put(qryId, cur);
-
-                    scheduleResultSetHolderRemoval(qryId, ignite);
-                }
-                else
-                    cur.close();
-
-                return new VisorEither<>(new VisorQueryResult(nid, qryId, SCAN_COL_NAMES, rows, hasNext,
-                    duration));
+                return new VisorEither<>(new VisorQueryResult(nid, holder.getQueryID(), null, null, false, 0));
             }
             catch (Throwable e) {
                 return new VisorEither<>(new VisorExceptionWrapper(e));
@@ -144,42 +79,5 @@ public class VisorScanQueryTask extends VisorOneNodeTask<VisorScanQueryTaskArg,
         @Override public String toString() {
             return S.toString(VisorScanQueryJob.class, this);
         }
-
-        /**
-         * Wrapper for cache iterator to behave like {@link QueryCursor}.
-         */
-        private static class VisorNearCacheCursor<T> implements QueryCursor<T> {
-            /** Wrapped iterator.  */
-            private final Iterator<T> it;
-
-            /**
-             * Wrapping constructor.
-             *
-             * @param it Near cache iterator to wrap.
-             */
-            private VisorNearCacheCursor(Iterator<T> it) {
-                this.it = it;
-            }
-
-            /** {@inheritDoc} */
-            @Override public List<T> getAll() {
-                List<T> all = new ArrayList<>();
-
-                while(it.hasNext())
-                    all.add(it.next());
-
-                return all;
-            }
-
-            /** {@inheritDoc} */
-            @Override public void close() {
-                // Nothing to close.
-            }
-
-            /** {@inheritDoc} */
-            @Override public Iterator<T> iterator() {
-                return it;
-            }
-        }
     }
 }
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
index 72d4cab..8d19839 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
@@ -148,7 +148,29 @@ class VisorCacheScanCommand {
                         error(x.getError)
 
                         return
-                    case x => x.getResult
+                    case x if x.getResult.getRows != null =>
+                        x.getResult
+
+                    case x =>
+                        var res = x.getResult
+
+                        Thread.sleep(100)
+
+                        while (res.getRows == null) {
+                            res = executeOne(res.getResponseNodeId, classOf[VisorQueryFetchFirstPageTask],
+                                new VisorQueryNextPageTaskArg(res.getQueryId, pageSize)) match {
+                                case x if x.getError != null =>
+                                    error(x.getError)
+
+                                    return
+                                case x => x.getResult
+                            }
+
+                            if (res.getRows == null)
+                                Thread.sleep(500)
+                        }
+
+                        res
                 }
             catch {
                 case e: ClusterGroupEmptyException =>
diff --git a/modules/web-console/backend/app/browsersHandler.js b/modules/web-console/backend/app/browsersHandler.js
index 820c3d4..680e340 100644
--- a/modules/web-console/backend/app/browsersHandler.js
+++ b/modules/web-console/backend/app/browsersHandler.js
@@ -239,6 +239,8 @@ module.exports = {
                 this.registerVisorTask('queryFetch', internalVisor('query.VisorQueryNextPageTask'), 'org.apache.ignite.lang.IgniteBiTuple', 'java.lang.String', 'java.lang.Integer');
                 this.registerVisorTask('queryFetchX2', internalVisor('query.VisorQueryNextPageTask'), internalVisor('query.VisorQueryNextPageTaskArg'));
 
+                this.registerVisorTask('queryFetchFirstPage', internalVisor('query.VisorQueryFetchFirstPageTask'), internalVisor('query.VisorQueryNextPageTaskArg'));
+
                 this.registerVisorTask('queryClose', internalVisor('query.VisorQueryCleanupTask'), 'java.util.Map', 'java.util.UUID', 'java.util.Set');
                 this.registerVisorTask('queryCloseX2', internalVisor('query.VisorQueryCleanupTask'), internalVisor('query.VisorQueryCleanupTaskArg'));
 
diff --git a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts
index d841cb5..76aae1c 100644
--- a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts
+++ b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts
@@ -18,8 +18,8 @@
 import _ from 'lodash';
 import {nonEmpty, nonNil} from 'app/utils/lodashMixins';
 import id8 from 'app/utils/id8';
-import {timer, merge, defer, from} from 'rxjs';
-import {mergeMap, tap, switchMap, exhaustMap, take} from 'rxjs/operators';
+import {timer, merge, defer, from, of} from 'rxjs';
+import {mergeMap, tap, switchMap, exhaustMap, take, filter, map, catchError} from 'rxjs/operators';
 
 import {CSV} from 'app/services/CSV';
 
@@ -255,16 +255,16 @@ class Paragraph {
 
 // Controller for SQL notebook screen.
 export class NotebookCtrl {
-    static $inject = ['IgniteInput', '$rootScope', '$scope', '$http', '$q', '$timeout', '$interval', '$animate', '$location', '$anchorScroll', '$state', '$filter', '$modal', '$popover', 'IgniteLoading', 'IgniteLegacyUtils', 'IgniteMessages', 'IgniteConfirm', 'AgentManager', 'IgniteChartColors', 'IgniteNotebook', 'IgniteNodes', 'uiGridExporterConstants', 'IgniteVersion', 'IgniteActivitiesData', 'JavaTypes', 'IgniteCopyToClipboard', 'CSV', 'IgniteErrorParser', 'DemoInfo'];
+    static $inject = ['IgniteInput', '$rootScope', '$scope', '$http', '$q', '$timeout', '$interval', '$animate', '$location', '$anchorScroll', '$state', '$filter', '$modal', '$popover', '$window', 'IgniteLoading', 'IgniteLegacyUtils', 'IgniteMessages', 'IgniteConfirm', 'AgentManager', 'IgniteChartColors', 'IgniteNotebook', 'IgniteNodes', 'uiGridExporterConstants', 'IgniteVersion', 'IgniteActivitiesData', 'JavaTypes', 'IgniteCopyToClipboard', 'CSV', 'IgniteErrorParser', 'DemoInfo'];
 
     /**
      * @param {CSV} CSV
      */
-    constructor(private IgniteInput: InputDialog, $root, private $scope, $http, $q, $timeout, $interval, $animate, $location, $anchorScroll, $state, $filter, $modal, $popover, Loading, LegacyUtils, private Messages: ReturnType<typeof MessagesServiceFactory>, private Confirm: ReturnType<typeof LegacyConfirmServiceFactory>, agentMgr, IgniteChartColors, private Notebook: Notebook, Nodes, uiGridExporterConstants, Version, ActivitiesData, JavaTypes, IgniteCopyToClipboard, CSV, errorParser, De [...]
+    constructor(private IgniteInput: InputDialog, $root, private $scope, $http, $q, $timeout, $interval, $animate, $location, $anchorScroll, $state, $filter, $modal, $popover, $window, Loading, LegacyUtils, private Messages: ReturnType<typeof MessagesServiceFactory>, private Confirm: ReturnType<typeof LegacyConfirmServiceFactory>, agentMgr, IgniteChartColors, private Notebook: Notebook, Nodes, uiGridExporterConstants, Version, ActivitiesData, JavaTypes, IgniteCopyToClipboard, CSV, errorP [...]
         const $ctrl = this;
 
         this.CSV = CSV;
-        Object.assign(this, { $root, $scope, $http, $q, $timeout, $interval, $animate, $location, $anchorScroll, $state, $filter, $modal, $popover, Loading, LegacyUtils, Messages, Confirm, agentMgr, IgniteChartColors, Notebook, Nodes, uiGridExporterConstants, Version, ActivitiesData, JavaTypes, errorParser, DemoInfo });
+        Object.assign(this, { $root, $scope, $http, $q, $timeout, $interval, $animate, $location, $anchorScroll, $state, $filter, $modal, $popover, $window, Loading, LegacyUtils, Messages, Confirm, agentMgr, IgniteChartColors, Notebook, Nodes, uiGridExporterConstants, Version, ActivitiesData, JavaTypes, errorParser, DemoInfo });
 
         // Define template urls.
         $ctrl.paragraphRateTemplateUrl = paragraphRateTemplateUrl;
@@ -1218,7 +1218,47 @@ export class NotebookCtrl {
             _rebuildColumns(paragraph);
         };
 
-        const _showLoading = (paragraph, enable) => paragraph.loading = enable;
+        const _showLoading = (paragraph, enable) => {
+            if (paragraph.qryType === 'scan')
+                paragraph.scanningInProgress = enable;
+
+            paragraph.loading = enable;
+        };
+
+        const _fetchQueryResult = (paragraph, clearChart, res, qryArg) => {
+            if (!_.isNil(res.rows)) {
+                _processQueryResult(paragraph, clearChart, res);
+                _tryStartRefresh(paragraph);
+
+                $scope.$applyAsync();
+
+                return;
+            }
+
+            const subscription = timer(100, 500).pipe(
+                exhaustMap(() => agentMgr.queryFetchFistsPage(qryArg.nid, res.queryId, qryArg.pageSize)),
+                filter((res) => !_.isNil(res.rows)),
+                take(1),
+                map((res) => _fetchQueryResult(paragraph, false, res, qryArg)),
+                catchError((err) => {
+                    if (paragraph.subscription) {
+                        paragraph.subscription.unsubscribe();
+                        paragraph.setError(err);
+
+                        _showLoading(paragraph, false);
+                        delete paragraph.subscription;
+
+                        $scope.$applyAsync();
+                    }
+
+                    return of(err);
+                })
+            ).subscribe();
+
+            Object.defineProperty(paragraph, 'subscription', {value: subscription, configurable: true});
+
+            return subscription;
+        };
 
         /**
          * @param {Object} paragraph Query
@@ -1230,6 +1270,11 @@ export class NotebookCtrl {
             const prevKeyCols = paragraph.chartKeyCols;
             const prevValCols = paragraph.chartValCols;
 
+            if (paragraph.subscription) {
+                paragraph.subscription.unsubscribe();
+                delete paragraph.subscription;
+            }
+
             if (!_.eq(paragraph.meta, res.columns)) {
                 paragraph.meta = [];
 
@@ -1323,12 +1368,37 @@ export class NotebookCtrl {
         const _closeOldQuery = (paragraph) => {
             const nid = paragraph.resNodeId;
 
-            if (paragraph.queryId && _.find($scope.caches, ({nodes}) => _.find(nodes, {nid: nid.toUpperCase()})))
-                return agentMgr.queryClose(nid, paragraph.queryId);
+            if (paragraph.queryId) {
+                const qryId = paragraph.queryId;
+                delete paragraph.queryId;
+
+                return agentMgr.queryClose(nid, qryId);
+            }
 
             return $q.when();
         };
 
+        $scope.cancelQuery = (paragraph) => {
+            if (paragraph.subscription) {
+                paragraph.subscription.unsubscribe();
+                delete paragraph.subscription;
+            }
+
+            _showLoading(paragraph, false);
+            this.$scope.stopRefresh(paragraph);
+
+            _closeOldQuery(paragraph)
+                .then(() => _showLoading(paragraph, false))
+                .catch((err) => {
+                    _showLoading(paragraph, false);
+                    paragraph.setError(err);
+                });
+        };
+
+        $scope.cancelQueryAvailable = (paragraph) => {
+            return !!paragraph.subscription;
+        };
+
         /**
          * @param {String} name Cache name.
          * @param {Array.<String>} nids Cache name.
@@ -1381,9 +1451,23 @@ export class NotebookCtrl {
             agentMgr.awaitCluster()
                 .then(() => _closeOldQuery(paragraph))
                 .then(() => args.localNid || _chooseNode(args.cacheName, false))
-                .then((nid) => agentMgr.querySql(nid, args.cacheName, args.query, args.nonCollocatedJoins,
-                    args.enforceJoinOrder, false, !!args.localNid, args.pageSize, args.lazy, args.collocated))
-                .then((res) => _processQueryResult(paragraph, false, res))
+                .then((nid) => {
+                    const qryArg = {
+                        nid,
+                        cacheName: args.cacheName,
+                        query: args.query,
+                        nonCollocatedJoins: args.nonCollocatedJoins,
+                        enforceJoinOrder: args.enforceJoinOrder,
+                        replicatedOnly: false,
+                        local: !!args.localNid,
+                        pageSize: args.pageSize,
+                        lazy: args.lazy,
+                        collocated: args.collocated
+                    };
+
+                    return agentMgr.querySql(qryArg)
+                        .then((res) => _fetchQueryResult(paragraph, false, res, qryArg));
+                })
                 .catch((err) => paragraph.setError(err));
         };
 
@@ -1393,8 +1477,6 @@ export class NotebookCtrl {
             if (_.get(paragraph, 'rate.installed') && paragraph.queryExecuted()) {
                 $scope.chartAcceptKeyColumn(paragraph, TIME_LINE);
 
-                _executeRefresh(paragraph);
-
                 const delay = paragraph.rate.value * paragraph.rate.unit;
 
                 paragraph.rate.stopTime = $interval(_executeRefresh, delay, 0, false, paragraph);
@@ -1430,6 +1512,19 @@ export class NotebookCtrl {
             return $scope.ddlAvailable() && !paragraph.useAsDefaultSchema ? null : paragraph.cacheName;
         };
 
+        const _initQueryResult = (paragraph, res) => {
+            paragraph.resNodeId = res.responseNodeId;
+            paragraph.queryId = res.queryId;
+
+            paragraph.rows = [];
+            paragraph.gridOptions.adjustHeight(paragraph.rows.length);
+
+            paragraph.meta = [];
+            paragraph.setError({message: ''});
+
+            paragraph.hasNext = false;
+        };
+
         $scope.execute = (paragraph, local = false) => {
             const nonCollocatedJoins = !!paragraph.nonCollocatedJoins;
             const enforceJoinOrder = !!paragraph.enforceJoinOrder;
@@ -1467,13 +1562,25 @@ export class NotebookCtrl {
                             ActivitiesData.post({ group: 'sql', action: '/queries/execute' });
 
                             const qry = args.maxPages ? addLimit(args.query, args.pageSize * args.maxPages) : query;
+                            const qryArg = {
+                                nid,
+                                cacheName: args.cacheName,
+                                query: qry,
+                                nonCollocatedJoins,
+                                enforceJoinOrder,
+                                replicatedOnly: false,
+                                local,
+                                pageSize: args.pageSize,
+                                lazy,
+                                collocated
+                            };
 
-                            return agentMgr.querySql(nid, args.cacheName, qry, nonCollocatedJoins, enforceJoinOrder, false, local, args.pageSize, lazy, collocated);
-                        })
-                        .then((res) => {
-                            _processQueryResult(paragraph, true, res);
+                            return agentMgr.querySql(qryArg)
+                                .then((res) => {
+                                    _initQueryResult(paragraph, res);
 
-                            _tryStartRefresh(paragraph);
+                                    return _fetchQueryResult(paragraph, true, res, qryArg);
+                                });
                         })
                         .catch((err) => {
                             paragraph.setError(err);
@@ -1525,9 +1632,25 @@ export class NotebookCtrl {
 
                     ActivitiesData.post({ group: 'sql', action: '/queries/explain' });
 
-                    return agentMgr.querySql(nid, args.cacheName, args.query, nonCollocatedJoins, enforceJoinOrder, false, false, args.pageSize, false, collocated);
+                    const qryArg = {
+                        nid,
+                        cacheName: args.cacheName,
+                        query: args.query,
+                        nonCollocatedJoins,
+                        enforceJoinOrder,
+                        replicatedOnly: false,
+                        local: false,
+                        pageSize: args.pageSize,
+                        lazy: false, collocated
+                    };
+
+                    return agentMgr.querySql(qryArg)
+                        .then((res) => {
+                            _initQueryResult(paragraph, res);
+
+                            return _fetchQueryResult(paragraph, true, res, qryArg);
+                        });
                 })
-                .then((res) => _processQueryResult(paragraph, true, res))
                 .catch((err) => {
                     paragraph.setError(err);
 
@@ -1545,7 +1668,6 @@ export class NotebookCtrl {
             $scope.scanAvailable(paragraph) && _chooseNode(cacheName, local)
                 .then((nid) => {
                     paragraph.localQueryMode = local;
-                    paragraph.scanningInProgress = true;
 
                     Notebook.save($scope.notebook)
                         .catch(Messages.showError);
@@ -1569,15 +1691,28 @@ export class NotebookCtrl {
 
                             ActivitiesData.post({ group: 'sql', action: '/queries/scan' });
 
-                            return agentMgr.queryScan(nid, cacheName, filter, false, caseSensitive, false, local, pageSize);
+                            const qryArg = {
+                                nid,
+                                cacheName,
+                                filter,
+                                regEx: false,
+                                caseSensitive,
+                                near: false,
+                                local,
+                                pageSize
+                            };
+
+                            return agentMgr.queryScan(qryArg).then((res) => {
+                                _initQueryResult(paragraph, res);
+
+                                return _fetchQueryResult(paragraph, true, res, qryArg);
+                            });
                         })
-                        .then((res) => _processQueryResult(paragraph, true, res))
                         .catch((err) => {
                             paragraph.setError(err);
 
                             _showLoading(paragraph, false);
-                        })
-                        .then(() => paragraph.scanningInProgress = false);
+                        });
                 });
         };
 
@@ -1718,8 +1853,26 @@ export class NotebookCtrl {
 
             return Promise.resolve(args.localNid || _chooseNode(args.cacheName, false))
                 .then((nid) => args.type === 'SCAN'
-                    ? agentMgr.queryScanGetAll(nid, args.cacheName, args.query, !!args.regEx, !!args.caseSensitive, !!args.near, !!args.localNid)
-                    : agentMgr.querySqlGetAll(nid, args.cacheName, args.query, !!args.nonCollocatedJoins, !!args.enforceJoinOrder, false, !!args.localNid, !!args.lazy, !!args.collocated))
+                    ? agentMgr.queryScanGetAll({
+                        nid,
+                        cacheName: args.cacheName,
+                        filter: args.query,
+                        regEx: !!args.regEx,
+                        caseSensitive: !!args.caseSensitive,
+                        near: !!args.near,
+                        local: !!args.localNid
+                    })
+                    : agentMgr.querySqlGetAll({
+                        nid,
+                        cacheName: args.cacheName,
+                        query: args.query,
+                        nonCollocatedJoins: !!args.nonCollocatedJoins,
+                        enforceJoinOrder: !!args.enforceJoinOrder,
+                        replicatedOnly: false,
+                        local: !!args.localNid,
+                        lazy: !!args.lazy,
+                        collocated: !!args.collocated
+                    }))
                 .then((res) => _export(exportFileName(paragraph, true), paragraph.gridOptions.columnDefs, res.columns, res.rows))
                 .catch(Messages.showError)
                 .then(() => {
@@ -1758,7 +1911,7 @@ export class NotebookCtrl {
             paragraph.rate.installed = true;
 
             if (paragraph.queryExecuted() && !paragraph.scanExplain())
-                _tryStartRefresh(paragraph);
+                _executeRefresh(paragraph);
         };
 
         $scope.stopRefresh = function(paragraph) {
@@ -1904,6 +2057,22 @@ export class NotebookCtrl {
                 $modal({scope, templateUrl: messageTemplateUrl, show: true});
             }
         };
+
+        $window.addEventListener('beforeunload', () => {
+            this._closeOpenedQueries(this.$scope.notebook.paragraphs);
+        });
+    }
+
+    _closeOpenedQueries(paragraphs) {
+        _.forEach(paragraphs, ({queryId, subscription, resNodeId}) => {
+            if (subscription)
+                subscription.unsubscribe();
+
+            if (queryId) {
+                this.agentMgr.queryClose(resNodeId, queryId)
+                    .catch(() => { /* No-op. */ });
+            }
+        });
     }
 
     scanActions: QueryActions<Paragraph & {type: 'scan'}> = [
@@ -1964,6 +2133,8 @@ export class NotebookCtrl {
             await this.Confirm.confirm('Are you sure you want to remove query: "' + paragraph.name + '"?');
             this.$scope.stopRefresh(paragraph);
 
+            this._closeOpenedQueries([paragraph]);
+
             const paragraph_idx = _.findIndex(this.$scope.notebook.paragraphs, (item) => paragraph === item);
             const panel_idx = _.findIndex(this.$scope.expandedParagraphs, (item) => paragraph_idx === item);
 
@@ -1995,6 +2166,8 @@ export class NotebookCtrl {
     }
 
     $onDestroy() {
+        this._closeOpenedQueries(this.$scope.notebook.paragraphs);
+
         if (this.refresh$)
             this.refresh$.unsubscribe();
     }
diff --git a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/template.tpl.pug b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/template.tpl.pug
index e2c4408..c4033e6 100644
--- a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/template.tpl.pug
+++ b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/template.tpl.pug
@@ -187,6 +187,9 @@ mixin query-actions
     button.btn-ignite.btn-ignite--secondary(ng-disabled='!queryAvailable(paragraph)' ng-click='explain(paragraph)' data-placement='bottom' bs-tooltip='' data-title='{{queryTooltip(paragraph, "explain query")}}')
         | Explain
 
+    button.btn-ignite.btn-ignite--secondary(ng-if='cancelQueryAvailable(paragraph)' ng-click='cancelQuery(paragraph)' data-placement='bottom' bs-tooltip='' data-title='{{"Cancel query execution"}}')
+        | Cancel
+
 mixin table-result-heading-query
     .total.row
         .col-xs-7
@@ -339,6 +342,9 @@ mixin paragraph-scan
                     span.icon-left.fa.fa-fw.fa-play(ng-hide='paragraph.checkScanInProgress(true)')
                     span.icon-left.fa.fa-fw.fa-refresh.fa-spin(ng-show='paragraph.checkScanInProgress(true)')
                     | Scan on selected node
+
+                button.btn-ignite.btn-ignite--secondary(ng-if='cancelQueryAvailable(paragraph)' ng-click='cancelQuery(paragraph)' data-placement='bottom' bs-tooltip='' data-title='{{"Cancel query execution"}}')
+                    | Cancel
             div
 
         .col-sm-12.sql-result(ng-if='paragraph.queryExecuted() && !paragraph.scanningInProgress' ng-switch='paragraph.resultType()')
diff --git a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
index faf0672..8e4dd2d 100644
--- a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
+++ b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
@@ -18,8 +18,8 @@
 import _ from 'lodash';
 import {nonEmpty, nonNil} from 'app/utils/lodashMixins';
 
-import {BehaviorSubject} from 'rxjs';
-import {first, pluck, tap, distinctUntilChanged, map, filter} from 'rxjs/operators';
+import {timer, BehaviorSubject, of, from} from 'rxjs';
+import {exhaustMap, first, pluck, tap, distinctUntilChanged, map, filter, expand, takeWhile, last} from 'rxjs/operators';
 
 import io from 'socket.io-client';
 
@@ -720,14 +720,14 @@ export default class AgentManager {
      * @param {Boolean} enforceJoinOrder Flag whether enforce join order is enabled.
      * @param {Boolean} replicatedOnly Flag whether query contains only replicated tables.
      * @param {Boolean} local Flag whether to execute query locally.
-     * @param {Number} pageSz
+     * @param {Number} pageSize
      * @param {Boolean} [lazy] query flag.
      * @param {Boolean} [collocated] Collocated query.
      * @returns {Promise}
      */
-    querySql(nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSz, lazy = false, collocated = false) {
+    querySql({nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSize, lazy = false, collocated = false}) {
         if (this.available(IGNITE_2_0)) {
-            let args = [cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSz];
+            let args = [cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSize];
 
             if (this.available(...COLLOCATED_QUERY_SINCE))
                 args = [...args, lazy, collocated];
@@ -747,11 +747,11 @@ export default class AgentManager {
         let queryPromise;
 
         if (enforceJoinOrder)
-            queryPromise = this.visorTask('querySqlV3', nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, local, pageSz);
+            queryPromise = this.visorTask('querySqlV3', nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, local, pageSize);
         else if (nonCollocatedJoins)
-            queryPromise = this.visorTask('querySqlV2', nid, cacheName, query, nonCollocatedJoins, local, pageSz);
+            queryPromise = this.visorTask('querySqlV2', nid, cacheName, query, nonCollocatedJoins, local, pageSize);
         else
-            queryPromise = this.visorTask('querySql', nid, cacheName, query, local, pageSz);
+            queryPromise = this.visorTask('querySql', nid, cacheName, query, local, pageSize);
 
         return queryPromise
             .then(({key, value}) => {
@@ -764,6 +764,21 @@ export default class AgentManager {
 
     /**
      * @param {String} nid Node id.
+     * @param {String} queryId Query ID.
+     * @param {Number} pageSize
+     * @returns {Promise}
+     */
+    queryFetchFistsPage(nid, queryId, pageSize) {
+        return this.visorTask('queryFetchFirstPage', nid, queryId, pageSize).then(({error, result}) => {
+            if (_.isEmpty(error))
+                return result;
+
+            return Promise.reject(error);
+        });
+    }
+
+    /**
+     * @param {String} nid Node id.
      * @param {Number} queryId
      * @param {Number} pageSize
      * @returns {Promise}
@@ -775,6 +790,34 @@ export default class AgentManager {
         return this.visorTask('queryFetch', nid, queryId, pageSize);
     }
 
+    _fetchQueryAllResults(acc, pageSize) {
+        if (!_.isNil(acc.rows)) {
+            if (!acc.hasMore)
+                return acc;
+
+            return of(acc).pipe(
+                expand((acc) => {
+                    return from(this.queryNextPage(acc.responseNodeId, acc.queryId, pageSize)
+                        .then((res) => {
+                            acc.rows = acc.rows.concat(res.rows);
+                            acc.hasMore = res.hasMore;
+
+                            return acc;
+                        }));
+                }),
+                takeWhile((acc) => acc.hasMore),
+                last()
+            ).toPromise();
+        }
+
+        return timer(100, 500).pipe(
+            exhaustMap(() => this.queryFetchFistsPage(acc.responseNodeId, acc.queryId, pageSize)),
+            filter((res) => !_.isNil(res.rows)),
+            first(),
+            map((res) => this._fetchQueryAllResults(res, 1024))
+        ).toPromise();
+    }
+
     /**
      * @param {String} nid Node id.
      * @param {String} cacheName Cache name.
@@ -787,26 +830,12 @@ export default class AgentManager {
      * @param {Boolean} collocated Collocated query.
      * @returns {Promise}
      */
-    querySqlGetAll(nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, lazy, collocated) {
+    querySqlGetAll({nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, lazy, collocated}) {
         // Page size for query.
-        const pageSz = 1024;
+        const pageSize = 1024;
 
-        const fetchResult = (acc) => {
-            if (!acc.hasMore)
-                return acc;
-
-            return this.queryNextPage(acc.responseNodeId, acc.queryId, pageSz)
-                .then((res) => {
-                    acc.rows = acc.rows.concat(res.rows);
-
-                    acc.hasMore = res.hasMore;
-
-                    return fetchResult(acc);
-                });
-        };
-
-        return this.querySql(nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSz, lazy, collocated)
-            .then(fetchResult);
+        return this.querySql({nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSize, lazy, collocated})
+            .then((res) => this._fetchQueryAllResults(res, pageSize));
     }
 
     /**
@@ -834,7 +863,7 @@ export default class AgentManager {
      * @param {Number} pageSize Page size.
      * @returns {Promise}
      */
-    queryScan(nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize) {
+    queryScan({nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize}) {
         if (this.available(IGNITE_2_0)) {
             return this.visorTask('queryScanX2', nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize)
                 .then(({error, result}) => {
@@ -854,7 +883,7 @@ export default class AgentManager {
         const prefix = caseSensitive ? SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE : SCAN_CACHE_WITH_FILTER;
         const query = `${prefix}${filter}`;
 
-        return this.querySql(nid, cacheName, query, false, false, false, local, pageSize);
+        return this.querySql({nid, cacheName, query, nonCollocatedJoins: false, enforceJoinOrder: false, replicatedOnly: false, local, pageSize});
     }
 
     /**
@@ -867,26 +896,12 @@ export default class AgentManager {
      * @param {Boolean} local Flag whether to execute query locally.
      * @returns {Promise}
      */
-    queryScanGetAll(nid, cacheName, filter, regEx, caseSensitive, near, local) {
+    queryScanGetAll({nid, cacheName, filter, regEx, caseSensitive, near, local}) {
         // Page size for query.
-        const pageSz = 1024;
-
-        const fetchResult = (acc) => {
-            if (!acc.hasMore)
-                return acc;
-
-            return this.queryNextPage(acc.responseNodeId, acc.queryId, pageSz)
-                .then((res) => {
-                    acc.rows = acc.rows.concat(res.rows);
-
-                    acc.hasMore = res.hasMore;
-
-                    return fetchResult(acc);
-                });
-        };
+        const pageSize = 1024;
 
-        return this.queryScan(nid, cacheName, filter, regEx, caseSensitive, near, local, pageSz)
-            .then(fetchResult);
+        return this.queryScan({nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize})
+            .then((res) => this._fetchQueryAllResults(res, pageSize));
     }
 
     /**