You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2019/01/22 11:11:07 UTC
[ignite] branch master updated: IGNITE-10993 Web Console:
Implemented queries cancellation. This closes #5869.
This is an automated email from the ASF dual-hosted git repository.
akuznetsov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5861a92 IGNITE-10993 Web Console: Implemented queries cancellation. This closes #5869.
5861a92 is described below
commit 5861a9248fc66013cba960e96de3bbaac4106776
Author: Vasiliy Sisko <vs...@gridgain.com>
AuthorDate: Tue Jan 22 18:09:53 2019 +0700
IGNITE-10993 Web Console: Implemented queries cancellation. This closes #5869.
---
.../visor/node/VisorNodeBaselineStatus.java | 2 +
.../visor/query/VisorQueryCleanupTask.java | 22 +-
.../internal/visor/query/VisorQueryCursor.java | 95 -------
.../visor/query/VisorQueryFetchFirstPageTask.java | 107 ++++++++
.../internal/visor/query/VisorQueryHolder.java | 171 ++++++++++++
.../visor/query/VisorQueryNextPageTask.java | 73 +----
.../internal/visor/query/VisorQueryTask.java | 83 +-----
.../internal/visor/query/VisorQueryUtils.java | 301 +++++++++++++++++++--
.../internal/visor/query/VisorScanQueryTask.java | 112 +-------
.../commands/cache/VisorCacheScanCommand.scala | 24 +-
modules/web-console/backend/app/browsersHandler.js | 2 +
.../components/queries-notebook/controller.ts | 229 ++++++++++++++--
.../components/queries-notebook/template.tpl.pug | 6 +
.../app/modules/agent/AgentManager.service.js | 107 ++++----
14 files changed, 892 insertions(+), 442 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java
index ea90be3..a273ca9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeBaselineStatus.java
@@ -25,8 +25,10 @@ import org.jetbrains.annotations.Nullable;
public enum VisorNodeBaselineStatus {
/** */
NODE_IN_BASELINE,
+
/** */
NODE_NOT_IN_BASELINE,
+
/** */
BASELINE_NOT_AVAILABLE;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java
index 9dfa0cf..e62393a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.internal.processors.task.GridInternal;
@@ -34,6 +33,8 @@ import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.removeQueryHolder;
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.log;
import static org.apache.ignite.internal.visor.util.VisorTaskUtils.logMapped;
/**
@@ -106,14 +107,21 @@ public class VisorQueryCleanupTask extends VisorMultiNodeTask<VisorQueryCleanupT
/** {@inheritDoc} */
@Override protected Void run(Collection<String> qryIds) {
- ConcurrentMap<String, VisorQueryCursor> storage = ignite.cluster().nodeLocalMap();
+ long start = U.currentTimeMillis();
+
+ if (debug) {
+ start = log(
+ ignite.log(),
+ "Queries cancellation started: [" + String.join(", ", qryIds) + "]",
+ getClass(),
+ start);
+ }
- for (String qryId : qryIds) {
- VisorQueryCursor cur = storage.remove(qryId);
+ for (String qryId : qryIds)
+ removeQueryHolder(ignite, qryId);
- if (cur != null)
- cur.close();
- }
+ if (debug)
+ log(ignite.log(), "Queries cancellation finished", getClass(), start);
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java
deleted file mode 100644
index 3d5c58d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.visor.query;
-
-import java.util.Collection;
-import java.util.Iterator;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Wrapper for query cursor.
- */
-public class VisorQueryCursor<T> implements Iterator<T>, AutoCloseable {
- /** */
- private final QueryCursor<T> cur;
-
- /** */
- private final Iterator<T> itr;
-
- /** Flag indicating that this cursor was read from last check. */
- private volatile boolean accessed;
-
- /**
- * @param cur Cursor.
- */
- public VisorQueryCursor(QueryCursor<T> cur) {
- this.cur = cur;
-
- itr = cur.iterator();
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
- return itr.hasNext();
- }
-
- /** {@inheritDoc} */
- @Override public T next() {
- return itr.next();
- }
-
- /** {@inheritDoc} */
- @Override public void remove() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- cur.close();
- }
-
- /**
- * @return SQL Fields query result metadata.
- */
- @SuppressWarnings("unchecked")
- public Collection<GridQueryFieldMetadata> fieldsMeta() {
- return ((QueryCursorImpl)cur).fieldsMeta();
- }
-
- /**
- * @return Flag indicating that this future was read from last check..
- */
- public boolean accessed() {
- return accessed;
- }
-
- /**
- * @param accessed New accessed.
- */
- public void accessed(boolean accessed) {
- this.accessed = accessed;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(VisorQueryCursor.class, this);
- }
-}
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryFetchFirstPageTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryFetchFirstPageTask.java
new file mode 100644
index 0000000..98d4801
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryFetchFirstPageTask.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.VisorEither;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorOneNodeTask;
+import org.apache.ignite.internal.visor.util.VisorExceptionWrapper;
+
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.fetchQueryRows;
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.getQueryHolder;
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.removeQueryHolder;
+import static org.apache.ignite.internal.visor.util.VisorTaskUtils.log;
+
+/**
+ * Task for check a query execution and receiving first page of query result.
+ */
+@GridInternal
+public class VisorQueryFetchFirstPageTask extends VisorOneNodeTask<VisorQueryNextPageTaskArg, VisorEither<VisorQueryResult>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override protected VisorQueryFetchFirstPageJob job(VisorQueryNextPageTaskArg arg) {
+ return new VisorQueryFetchFirstPageJob(arg, debug);
+ }
+
+ /**
+ * Job for collecting first page previously executed SQL or SCAN query.
+ */
+ private static class VisorQueryFetchFirstPageJob extends VisorJob<VisorQueryNextPageTaskArg, VisorEither<VisorQueryResult>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Create job with specified argument.
+ *
+ * @param arg Job argument.
+ * @param debug Debug flag.
+ */
+ private VisorQueryFetchFirstPageJob(VisorQueryNextPageTaskArg arg, boolean debug) {
+ super(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected VisorEither<VisorQueryResult> run(VisorQueryNextPageTaskArg arg) {
+ String qryId = arg.getQueryId();
+
+ long start = U.currentTimeMillis();
+
+ if (debug)
+ start = log(ignite.log(), "Fetch query first page started: " + qryId, getClass(), start);
+
+ VisorQueryHolder holder = getQueryHolder(ignite, qryId);
+
+ if (holder.getErr() != null)
+ return new VisorEither<>(new VisorExceptionWrapper(holder.getErr()));
+
+ List<Object[]> rows = null;
+ List<VisorQueryField> cols = holder.getColumns();
+
+ boolean hasMore = cols == null;
+
+ if (cols != null) {
+ Iterator itr = holder.getIterator();
+ rows = fetchQueryRows(itr, qryId, arg.getPageSize());
+ hasMore = itr.hasNext();
+ }
+
+ if (hasMore)
+ holder.setAccessed(true);
+ else
+ removeQueryHolder(ignite, qryId);
+
+ if (debug)
+ log(ignite.log(), "Fetch query first page finished: " + qryId, getClass(), start);
+
+ return new VisorEither<>(
+ new VisorQueryResult(ignite.localNode().id(), qryId, cols, rows, hasMore, holder.duration()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VisorQueryFetchFirstPageJob.class, this);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryHolder.java
new file mode 100644
index 0000000..af54f27
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryHolder.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+
+/**
+ * Holds identify information of executing query and its result.
+ */
+public class VisorQueryHolder implements AutoCloseable {
+ /** Prefix for node local key for SQL queries. */
+ private static final String SQL_QRY_PREFIX = "VISOR_SQL_QUERY";
+
+ /** Prefix for node local key for SCAN queries. */
+ private static final String SCAN_QRY_PREFIX = "VISOR_SCAN_QUERY";
+
+ /** Query ID for extraction query data result. */
+ private final String qryId;
+
+ /** Cancel query object. */
+ private final GridQueryCancel cancel;
+
+ /** Query column descriptors. */
+ private volatile List<VisorQueryField> cols;
+
+ /** Error in process of query result receiving. */
+ private volatile Throwable err;
+
+ /** Query duration in ms. */
+ private volatile long duration;
+
+ /** Flag indicating that this cursor was read from last check. */
+ private volatile boolean accessed;
+
+ /** Query cursor. */
+ private volatile QueryCursor cur;
+
+ /** Result set iterator. */
+ private volatile Iterator itr;
+
+ /**
+ * @param qryId Query ID.
+ * @return {@code true} if holder contains SQL query.
+ */
+ public static boolean isSqlQuery(String qryId) {
+ return qryId.startsWith(SQL_QRY_PREFIX);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param sqlQry Flag indicating that holder contains SQL or SCAN query.
+ * @param cur Query cursor.
+ * @param cancel Cancel object.
+ */
+ VisorQueryHolder(boolean sqlQry, QueryCursor cur, GridQueryCancel cancel) {
+ this.cur = cur;
+ this.cancel = cancel;
+
+ // Generate query ID to store query cursor in node local storage.
+ qryId = (sqlQry ? SQL_QRY_PREFIX : SCAN_QRY_PREFIX) + "-" + UUID.randomUUID();
+ }
+
+ /**
+ * @return Query ID for extraction query data result.
+ */
+ public String getQueryID() {
+ return qryId;
+ }
+
+ /**
+ * @return Result set iterator.
+ */
+ public synchronized Iterator getIterator() {
+ assert cur != null;
+
+ if (itr == null)
+ itr = cur.iterator();
+
+ return itr;
+ }
+
+ /**
+ * @return Query column descriptors.
+ */
+ public List<VisorQueryField> getColumns() {
+ return cols;
+ }
+
+ /**
+ * Complete query execution.
+ *
+ * @param cur Query cursor.
+ * @param duration Duration of query execution.
+ * @param cols Query column descriptors.
+ */
+ public void complete(QueryCursor cur, long duration, List<VisorQueryField> cols) {
+ this.cur = cur;
+ this.duration = duration;
+ this.cols = cols;
+ this.accessed = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ if (cur != null)
+ cur.close();
+
+ if (cancel != null)
+ cancel.cancel();
+ }
+
+ /**
+ * @return Error in process of query result receiving.
+ */
+ public Throwable getErr() {
+ return err;
+ }
+
+ /**
+ * Set error caught during query execution.
+ *
+ * @param err Error caught during query execution.
+ */
+ public void setError(Throwable err) {
+ this.err = err;
+
+ if (cur != null)
+ cur.close();
+ }
+
+ /**
+ * @return Flag indicating that this future was read from last check..
+ */
+ public boolean isAccessed() {
+ return accessed;
+ }
+
+ /**
+ * @param accessed New accessed.
+ */
+ public void setAccessed(boolean accessed) {
+ this.accessed = accessed;
+ }
+
+ /**
+ * @return Duration of query execution.
+ */
+ public long duration() {
+ return duration;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java
index 4684c49..dc7b09f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java
@@ -17,16 +17,17 @@
package org.apache.ignite.internal.visor.query;
+import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import javax.cache.Cache;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorOneNodeTask;
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.getQueryHolder;
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.removeQueryHolder;
+
/**
* Task for collecting next page previously executed SQL or SCAN query.
*/
@@ -59,77 +60,29 @@ public class VisorQueryNextPageTask extends VisorOneNodeTask<VisorQueryNextPageT
/** {@inheritDoc} */
@Override protected VisorQueryResult run(VisorQueryNextPageTaskArg arg) {
- return arg.getQueryId().startsWith(VisorQueryUtils.SCAN_QRY_NAME) ? nextScanPage(arg) : nextSqlPage(arg);
- }
-
- /**
- * Collect data from SQL query.
- *
- * @param arg Query name and page size.
- * @return Query result with next page.
- */
- private VisorQueryResult nextSqlPage(VisorQueryNextPageTaskArg arg) {
long start = U.currentTimeMillis();
- ConcurrentMap<String, VisorQueryCursor<List<?>>> storage = ignite.cluster().nodeLocalMap();
-
String qryId = arg.getQueryId();
- VisorQueryCursor<List<?>> cur = storage.get(qryId);
+ VisorQueryHolder holder = getQueryHolder(ignite, qryId);
- if (cur == null)
- throw new IgniteException("SQL query results are expired.");
+ Iterator itr = holder.getIterator();
- List<Object[]> nextRows = VisorQueryUtils.fetchSqlQueryRows(cur, arg.getPageSize());
+ List<Object[]> nextRows = VisorQueryHolder.isSqlQuery(qryId)
+ ? VisorQueryUtils.fetchSqlQueryRows(itr, arg.getPageSize())
+ : VisorQueryUtils.fetchScanQueryRows(itr, arg.getPageSize());
- boolean hasMore = cur.hasNext();
+ boolean hasMore = itr.hasNext();
if (hasMore)
- cur.accessed(true);
- else {
- storage.remove(qryId);
-
- cur.close();
- }
+ holder.setAccessed(true);
+ else
+ removeQueryHolder(ignite, qryId);
return new VisorQueryResult(ignite.localNode().id(), qryId, null, nextRows, hasMore,
U.currentTimeMillis() - start);
}
- /**
- * Collect data from SCAN query
- *
- * @param arg Query name and page size.
- * @return Next page with data.
- */
- private VisorQueryResult nextScanPage(VisorQueryNextPageTaskArg arg) {
- long start = U.currentTimeMillis();
-
- ConcurrentMap<String, VisorQueryCursor<Cache.Entry<Object, Object>>> storage = ignite.cluster().nodeLocalMap();
-
- String qryId = arg.getQueryId();
-
- VisorQueryCursor<Cache.Entry<Object, Object>> cur = storage.get(qryId);
-
- if (cur == null)
- throw new IgniteException("Scan query results are expired.");
-
- List<Object[]> rows = VisorQueryUtils.fetchScanQueryRows(cur, arg.getPageSize());
-
- boolean hasMore = cur.hasNext();
-
- if (hasMore)
- cur.accessed(true);
- else {
- storage.remove(qryId);
-
- cur.close();
- }
-
- return new VisorQueryResult(ignite.localNode().id(), qryId, null, rows, hasMore,
- U.currentTimeMillis() - start);
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(VisorQueryNextPageJob.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
index 97ee83e..88a1c8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryTask.java
@@ -17,29 +17,18 @@
package org.apache.ignite.internal.visor.query;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.task.GridVisorManagementTask;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorEither;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorOneNodeTask;
import org.apache.ignite.internal.visor.util.VisorExceptionWrapper;
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.SQL_QRY_NAME;
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.fetchSqlQueryRows;
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.scheduleResultSetHolderRemoval;
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.scheduleQueryStart;
/**
* Task for execute SQL fields query and get first page of results.
@@ -77,71 +66,17 @@ public class VisorQueryTask extends VisorOneNodeTask<VisorQueryTaskArg, VisorEit
try {
UUID nid = ignite.localNode().id();
- SqlFieldsQuery qry = new SqlFieldsQuery(arg.getQueryText());
- qry.setPageSize(arg.getPageSize());
- qry.setLocal(arg.isLocal());
- qry.setDistributedJoins(arg.isDistributedJoins());
- qry.setCollocated(arg.isCollocated());
- qry.setEnforceJoinOrder(arg.isEnforceJoinOrder());
- qry.setReplicatedOnly(arg.isReplicatedOnly());
- qry.setLazy(arg.getLazy());
+ GridQueryCancel cancel = new GridQueryCancel();
- long start = U.currentTimeMillis();
+ Map<String, VisorQueryHolder> storage = ignite.cluster().nodeLocalMap();
- List<FieldsQueryCursor<List<?>>> qryCursors;
+ VisorQueryHolder holder = new VisorQueryHolder(true, null, cancel);
- String cacheName = arg.getCacheName();
+ storage.put(holder.getQueryID(), holder);
- if (F.isEmpty(cacheName))
- qryCursors = ignite.context().query().querySqlFields(qry, true, false);
- else {
- IgniteCache<Object, Object> c = ignite.cache(cacheName);
+ scheduleQueryStart(ignite, holder, arg, cancel);
- if (c == null)
- throw new SQLException("Fail to execute query. Cache not found: " + cacheName);
-
- qryCursors = ((IgniteCacheProxy)c.withKeepBinary()).queryMultipleStatements(qry);
- }
-
- // In case of multiple statements leave opened only last cursor.
- for (int i = 0; i < qryCursors.size() - 1; i++)
- U.closeQuiet(qryCursors.get(i));
-
- // In case of multiple statements return last cursor as result.
- VisorQueryCursor<List<?>> cur = new VisorQueryCursor<>(F.last(qryCursors));
-
- Collection<GridQueryFieldMetadata> meta = cur.fieldsMeta();
-
- if (meta == null)
- return new VisorEither<>(
- new VisorExceptionWrapper(new SQLException("Fail to execute query. No metadata available.")));
- else {
- List<VisorQueryField> names = new ArrayList<>(meta.size());
-
- for (GridQueryFieldMetadata col : meta)
- names.add(new VisorQueryField(col.schemaName(), col.typeName(),
- col.fieldName(), col.fieldTypeName()));
-
- List<Object[]> rows = fetchSqlQueryRows(cur, arg.getPageSize());
-
- // Query duration + fetch duration.
- long duration = U.currentTimeMillis() - start;
-
- boolean hasNext = cur.hasNext();
-
- // Generate query ID to store query cursor in node local storage.
- String qryId = SQL_QRY_NAME + "-" + UUID.randomUUID();
-
- if (hasNext) {
- ignite.cluster().<String, VisorQueryCursor<List<?>>>nodeLocalMap().put(qryId, cur);
-
- scheduleResultSetHolderRemoval(qryId, ignite);
- }
- else
- cur.close();
-
- return new VisorEither<>(new VisorQueryResult(nid, qryId, names, rows, hasNext, duration));
- }
+ return new VisorEither<>(new VisorQueryResult(nid, holder.getQueryID(), null, null, false, 0));
}
catch (Throwable e) {
return new VisorEither<>(new VisorExceptionWrapper(e));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java
index a47acf6..6143e30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java
@@ -19,21 +19,39 @@ package org.apache.ignite.internal.visor.query;
import java.math.BigDecimal;
import java.net.URL;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryObjectEx;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
/**
* Contains utility methods for Visor query tasks and jobs.
@@ -42,11 +60,11 @@ public class VisorQueryUtils {
/** How long to store future with query in node local map: 5 minutes. */
public static final Integer RMV_DELAY = 5 * 60 * 1000;
- /** Prefix for node local key for SQL queries. */
- public static final String SQL_QRY_NAME = "VISOR_SQL_QUERY";
+ /** Message for query result expired error. */
+ private static final String SQL_QRY_RESULTS_EXPIRED_ERR = "SQL query results are expired.";
- /** Prefix for node local key for SCAN queries. */
- public static final String SCAN_QRY_NAME = "VISOR_SCAN_QUERY";
+ /** Message for scan result expired error. */
+ private static final String SCAN_QRY_RESULTS_EXPIRED_ERR = "Scan query results are expired.";
/** Columns for SCAN queries. */
public static final List<VisorQueryField> SCAN_COL_NAMES = Arrays.asList(
@@ -55,7 +73,7 @@ public class VisorQueryUtils {
);
/**
- * @param o - Object.
+ * @param o Source object.
* @return String representation of object class.
*/
private static String typeOf(Object o) {
@@ -130,17 +148,19 @@ public class VisorQueryUtils {
/**
* Fetch rows from SCAN query future.
*
- * @param cur Query future to fetch rows from.
+ * @param itr Result set iterator.
* @param pageSize Number of rows to fetch.
* @return Fetched rows.
*/
- public static List<Object[]> fetchScanQueryRows(VisorQueryCursor<Cache.Entry<Object, Object>> cur, int pageSize) {
+ public static List<Object[]> fetchScanQueryRows(Iterator itr, int pageSize) {
List<Object[]> rows = new ArrayList<>();
int cnt = 0;
- while (cur.hasNext() && cnt < pageSize) {
- Cache.Entry<Object, Object> next = cur.next();
+ Iterator<Cache.Entry<Object, Object>> scanItr = (Iterator<Cache.Entry<Object, Object>>)itr;
+
+ while (scanItr.hasNext() && cnt < pageSize) {
+ Cache.Entry<Object, Object> next = scanItr.next();
Object k = next.getKey();
Object v = next.getValue();
@@ -227,6 +247,12 @@ public class VisorQueryUtils {
"typeId", obj.type().typeId(), true);
}
+ /**
+ * Convert object that can be passed to client.
+ *
+ * @param original Source object.
+ * @return Converted value.
+ */
public static Object convertValue(Object original) {
if (original == null)
return null;
@@ -241,17 +267,19 @@ public class VisorQueryUtils {
/**
* Collects rows from sql query future, first time creates meta and column names arrays.
*
- * @param cur Query cursor to fetch rows from.
+ * @param itr Result set iterator.
* @param pageSize Number of rows to fetch.
* @return Fetched rows.
*/
- public static List<Object[]> fetchSqlQueryRows(VisorQueryCursor<List<?>> cur, int pageSize) {
+ public static List<Object[]> fetchSqlQueryRows(Iterator itr, int pageSize) {
List<Object[]> rows = new ArrayList<>();
int cnt = 0;
- while (cur.hasNext() && cnt < pageSize) {
- List<?> next = cur.next();
+ Iterator<List<?>> sqlItr = (Iterator<List<?>>)itr;
+
+ while (sqlItr.hasNext() && cnt < pageSize) {
+ List<?> next = sqlItr.next();
int sz = next.size();
@@ -269,27 +297,252 @@ public class VisorQueryUtils {
}
/**
+ * Get holder for query or throw exception if not found.
+ *
+ * @param ignite IgniteEx instance.
+ * @param qryId Query ID to get holder.
+ * @return Query holder for specified query ID.
+ * @throws IgniteException When holder is not found.
+ */
+ public static VisorQueryHolder getQueryHolder(final IgniteEx ignite, final String qryId) throws IgniteException {
+ ConcurrentMap<String, VisorQueryHolder> storage = ignite.cluster().nodeLocalMap();
+
+ VisorQueryHolder holder = storage.get(qryId);
+
+ if (holder == null)
+ throw new IgniteException(VisorQueryHolder.isSqlQuery(qryId)
+ ? SQL_QRY_RESULTS_EXPIRED_ERR
+ : SCAN_QRY_RESULTS_EXPIRED_ERR);
+
+ return holder;
+ }
+
+ /**
+ * Remove query holder from local storage for query with specified ID and cancel query if it is in progress.
+ *
+ * @param ignite IgniteEx instance.
+ * @param qryId Query ID to get holder.
+ */
+ public static void removeQueryHolder(final IgniteEx ignite, final String qryId) {
+ ConcurrentMap<String, VisorQueryHolder> storage = ignite.cluster().nodeLocalMap();
+ VisorQueryHolder holder = storage.remove(qryId);
+
+ if (holder != null)
+ holder.close();
+ }
+
+ /**
+ * Fetch rows from query cursor.
+ *
+ * @param itr Result set iterator.
+ * @param qryId Query ID.
+ * @param pageSize Page size.
+ */
+ public static List<Object[]> fetchQueryRows(Iterator itr, String qryId, int pageSize) {
+ return itr.hasNext()
+ ? (VisorQueryHolder.isSqlQuery(qryId)
+ ? fetchSqlQueryRows(itr, pageSize)
+ : fetchScanQueryRows(itr, pageSize))
+ : Collections.emptyList();
+ }
+
+ /**
+ * Schedule start of SQL query execution.
+ *
+ * @param ignite IgniteEx instance.
+ * @param holder Query holder object.
+ * @param arg Query task argument with query properties.
+ * @param cancel Object to cancel query.
+ */
+ public static void scheduleQueryStart(
+ final IgniteEx ignite,
+ final VisorQueryHolder holder,
+ final VisorQueryTaskArg arg,
+ final GridQueryCancel cancel
+ ) {
+ ignite.context().closure().runLocalSafe(() -> {
+ try {
+ SqlFieldsQuery qry = new SqlFieldsQuery(arg.getQueryText());
+
+ qry.setPageSize(arg.getPageSize());
+ qry.setLocal(arg.isLocal());
+ qry.setDistributedJoins(arg.isDistributedJoins());
+ qry.setCollocated(arg.isCollocated());
+ qry.setEnforceJoinOrder(arg.isEnforceJoinOrder());
+ qry.setReplicatedOnly(arg.isReplicatedOnly());
+ qry.setLazy(arg.getLazy());
+
+ String cacheName = arg.getCacheName();
+
+ if (!F.isEmpty(cacheName))
+ qry.setSchema(cacheName);
+
+ long start = U.currentTimeMillis();
+
+ List<FieldsQueryCursor<List<?>>> qryCursors = ignite
+ .context()
+ .query()
+ .querySqlFields(null, qry, null, true, false, cancel);
+
+ // In case of multiple statements leave opened only last cursor.
+ for (int i = 0; i < qryCursors.size() - 1; i++)
+ U.closeQuiet(qryCursors.get(i));
+
+ // In case of multiple statements return last cursor as result.
+ FieldsQueryCursor<List<?>> cur = F.last(qryCursors);
+
+ try {
+ // Ensure holder was not removed from node local storage from separate thread if user cancel query.
+ VisorQueryHolder actualHolder = getQueryHolder(ignite, holder.getQueryID());
+
+ List<GridQueryFieldMetadata> meta = ((QueryCursorEx)cur).fieldsMeta();
+
+ if (meta == null)
+ actualHolder.setError(new SQLException("Fail to execute query. No metadata available."));
+ else {
+ List<VisorQueryField> cols = new ArrayList<>(meta.size());
+
+ for (GridQueryFieldMetadata col : meta) {
+ cols.add(new VisorQueryField(
+ col.schemaName(),
+ col.typeName(),
+ col.fieldName(),
+ col.fieldTypeName())
+ );
+ }
+
+ actualHolder.complete(cur, U.currentTimeMillis() - start, cols);
+
+ scheduleQueryHolderRemoval(ignite, actualHolder.getQueryID());
+ }
+ }
+ catch (Throwable e) {
+ U.closeQuiet(cur);
+
+ throw e;
+ }
+ }
+ catch (Throwable e) {
+ holder.setError(e);
+ }
+ }, MANAGEMENT_POOL);
+ }
+
+ /**
+ * Schedule start of SCAN query execution.
+ *
+ * @param ignite IgniteEx instance.
+ * @param holder Query holder object.
+ * @param arg Query task argument with query properties.
+ */
+ public static void scheduleScanStart(
+ final IgniteEx ignite,
+ final VisorQueryHolder holder,
+ final VisorScanQueryTaskArg arg
+ ) {
+ ignite.context().closure().runLocalSafe(() -> {
+ try {
+ IgniteCache<Object, Object> c = ignite.cache(arg.getCacheName());
+ String filterText = arg.getFilter();
+ IgniteBiPredicate<Object, Object> filter = null;
+
+ if (!F.isEmpty(filterText))
+ filter = new VisorQueryScanRegexFilter(arg.isCaseSensitive(), arg.isRegEx(), filterText);
+
+ QueryCursor<Cache.Entry<Object, Object>> cur;
+
+ long start = U.currentTimeMillis();
+
+ if (arg.isNear())
+ cur = new VisorNearCacheCursor<>(c.localEntries(CachePeekMode.NEAR).iterator());
+ else {
+ ScanQuery<Object, Object> qry = new ScanQuery<>(filter);
+ qry.setPageSize(arg.getPageSize());
+ qry.setLocal(arg.isLocal());
+
+ cur = c.withKeepBinary().query(qry);
+ }
+
+ try {
+ // Ensure holder was not removed from node local storage from separate thread if user cancel query.
+ VisorQueryHolder actualHolder = getQueryHolder(ignite, holder.getQueryID());
+
+ actualHolder.complete(cur, U.currentTimeMillis() - start, SCAN_COL_NAMES);
+
+ scheduleQueryHolderRemoval(ignite, actualHolder.getQueryID());
+ }
+ catch (Throwable e) {
+ U.closeQuiet(cur);
+
+ throw e;
+ }
+ }
+ catch (Throwable e) {
+ holder.setError(e);
+ }
+ }, MANAGEMENT_POOL);
+ }
+
+ /**
+ * Wrapper for cache iterator to behave like {@link QueryCursor}.
+ */
+ private static class VisorNearCacheCursor<T> implements QueryCursor<T> {
+ /** Wrapped iterator. */
+ private final Iterator<T> it;
+
+ /**
+ * Wrapping constructor.
+ *
+ * @param it Near cache iterator to wrap.
+ */
+ private VisorNearCacheCursor(Iterator<T> it) {
+ this.it = it;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<T> getAll() {
+ List<T> all = new ArrayList<>();
+
+ while(it.hasNext())
+ all.add(it.next());
+
+ return all;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // Nothing to close.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<T> iterator() {
+ return it;
+ }
+ }
+
+ /**
+ * Schedule clearing of query context by timeout.
+ *
* @param qryId Unique query result id.
+ * @param ignite IgniteEx instance.
*/
- public static void scheduleResultSetHolderRemoval(final String qryId, final IgniteEx ignite) {
+ public static void scheduleQueryHolderRemoval(final IgniteEx ignite, final String qryId) {
ignite.context().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(RMV_DELAY) {
@Override public void onTimeout() {
- ConcurrentMap<String, VisorQueryCursor> storage = ignite.cluster().nodeLocalMap();
+ ConcurrentMap<String, VisorQueryHolder> storage = ignite.cluster().nodeLocalMap();
- VisorQueryCursor cur = storage.get(qryId);
+ VisorQueryHolder holder = storage.get(qryId);
- if (cur != null) {
- // If cursor was accessed since last scheduling, set access flag to false and reschedule.
- if (cur.accessed()) {
- cur.accessed(false);
+ if (holder != null) {
+ if (holder.isAccessed()) {
+ holder.setAccessed(false);
- scheduleResultSetHolderRemoval(qryId, ignite);
+ // Holder was accessed, we need to keep it for one more period.
+ scheduleQueryHolderRemoval(ignite, qryId);
}
else {
// Remove stored cursor otherwise.
- storage.remove(qryId);
-
- cur.close();
+ removeQueryHolder(ignite, qryId);
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorScanQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorScanQueryTask.java
index 0c1c418..7c0fac9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorScanQueryTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorScanQueryTask.java
@@ -17,29 +17,15 @@
package org.apache.ignite.internal.visor.query;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
import java.util.UUID;
-import javax.cache.Cache;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.internal.processors.task.GridInternal;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorEither;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorOneNodeTask;
import org.apache.ignite.internal.visor.util.VisorExceptionWrapper;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.SCAN_COL_NAMES;
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.SCAN_QRY_NAME;
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.fetchScanQueryRows;
-import static org.apache.ignite.internal.visor.query.VisorQueryUtils.scheduleResultSetHolderRemoval;
+import static org.apache.ignite.internal.visor.query.VisorQueryUtils.scheduleScanStart;
/**
* Task for execute SCAN query and get first page of results.
@@ -71,69 +57,18 @@ public class VisorScanQueryTask extends VisorOneNodeTask<VisorScanQueryTaskArg,
super(arg, debug);
}
- /**
- * Execute scan query.
- *
- * @param c Cache to scan.
- * @param arg Job argument with query parameters.
- * @return Query cursor.
- */
- private QueryCursor<Cache.Entry<Object, Object>> scan(IgniteCache<Object, Object> c, VisorScanQueryTaskArg arg,
- IgniteBiPredicate<Object, Object> filter) {
- ScanQuery<Object, Object> qry = new ScanQuery<>(filter);
- qry.setPageSize(arg.getPageSize());
- qry.setLocal(arg.isLocal());
-
- return c.withKeepBinary().query(qry);
- }
-
- /**
- * Scan near cache.
- *
- * @param c Cache to scan near entries.
- * @return Cache entries iterator wrapped with query cursor.
- */
- private QueryCursor<Cache.Entry<Object, Object>> near(IgniteCache<Object, Object> c) {
- return new VisorNearCacheCursor<>(c.localEntries(CachePeekMode.NEAR).iterator());
- }
-
/** {@inheritDoc} */
@Override protected VisorEither<VisorQueryResult> run(final VisorScanQueryTaskArg arg) {
try {
- IgniteCache<Object, Object> c = ignite.cache(arg.getCacheName());
UUID nid = ignite.localNode().id();
- String filterText = arg.getFilter();
-
- long start = U.currentTimeMillis();
-
- IgniteBiPredicate<Object, Object> filter = null;
-
- if (!F.isEmpty(filterText))
- filter = new VisorQueryScanRegexFilter(arg.isCaseSensitive(), arg.isRegEx(), filterText);
-
- VisorQueryCursor<Cache.Entry<Object, Object>> cur =
- new VisorQueryCursor<>(arg.isNear() ? near(c) : scan(c, arg, filter));
-
- List<Object[]> rows = fetchScanQueryRows(cur, arg.getPageSize());
-
- long duration = U.currentTimeMillis() - start; // Scan duration + fetch duration.
+ VisorQueryHolder holder = new VisorQueryHolder(false, null, null);
- boolean hasNext = cur.hasNext();
+ ignite.cluster().<String, VisorQueryHolder>nodeLocalMap().put(holder.getQueryID(), holder);
- // Generate query ID to store query cursor in node local storage.
- String qryId = SCAN_QRY_NAME + "-" + UUID.randomUUID();
+ scheduleScanStart(ignite, holder, arg);
- if (hasNext) {
- ignite.cluster().<String, VisorQueryCursor>nodeLocalMap().put(qryId, cur);
-
- scheduleResultSetHolderRemoval(qryId, ignite);
- }
- else
- cur.close();
-
- return new VisorEither<>(new VisorQueryResult(nid, qryId, SCAN_COL_NAMES, rows, hasNext,
- duration));
+ return new VisorEither<>(new VisorQueryResult(nid, holder.getQueryID(), null, null, false, 0));
}
catch (Throwable e) {
return new VisorEither<>(new VisorExceptionWrapper(e));
@@ -144,42 +79,5 @@ public class VisorScanQueryTask extends VisorOneNodeTask<VisorScanQueryTaskArg,
@Override public String toString() {
return S.toString(VisorScanQueryJob.class, this);
}
-
- /**
- * Wrapper for cache iterator to behave like {@link QueryCursor}.
- */
- private static class VisorNearCacheCursor<T> implements QueryCursor<T> {
- /** Wrapped iterator. */
- private final Iterator<T> it;
-
- /**
- * Wrapping constructor.
- *
- * @param it Near cache iterator to wrap.
- */
- private VisorNearCacheCursor(Iterator<T> it) {
- this.it = it;
- }
-
- /** {@inheritDoc} */
- @Override public List<T> getAll() {
- List<T> all = new ArrayList<>();
-
- while(it.hasNext())
- all.add(it.next());
-
- return all;
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- // Nothing to close.
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<T> iterator() {
- return it;
- }
- }
}
}
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
index 72d4cab..8d19839 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/cache/VisorCacheScanCommand.scala
@@ -148,7 +148,29 @@ class VisorCacheScanCommand {
error(x.getError)
return
- case x => x.getResult
+ case x if x.getResult.getRows != null =>
+ x.getResult
+
+ case x =>
+ var res = x.getResult
+
+ Thread.sleep(100)
+
+ while (res.getRows == null) {
+ res = executeOne(res.getResponseNodeId, classOf[VisorQueryFetchFirstPageTask],
+ new VisorQueryNextPageTaskArg(res.getQueryId, pageSize)) match {
+ case x if x.getError != null =>
+ error(x.getError)
+
+ return
+ case x => x.getResult
+ }
+
+ if (res.getRows == null)
+ Thread.sleep(500)
+ }
+
+ res
}
catch {
case e: ClusterGroupEmptyException =>
diff --git a/modules/web-console/backend/app/browsersHandler.js b/modules/web-console/backend/app/browsersHandler.js
index 820c3d4..680e340 100644
--- a/modules/web-console/backend/app/browsersHandler.js
+++ b/modules/web-console/backend/app/browsersHandler.js
@@ -239,6 +239,8 @@ module.exports = {
this.registerVisorTask('queryFetch', internalVisor('query.VisorQueryNextPageTask'), 'org.apache.ignite.lang.IgniteBiTuple', 'java.lang.String', 'java.lang.Integer');
this.registerVisorTask('queryFetchX2', internalVisor('query.VisorQueryNextPageTask'), internalVisor('query.VisorQueryNextPageTaskArg'));
+ this.registerVisorTask('queryFetchFirstPage', internalVisor('query.VisorQueryFetchFirstPageTask'), internalVisor('query.VisorQueryNextPageTaskArg'));
+
this.registerVisorTask('queryClose', internalVisor('query.VisorQueryCleanupTask'), 'java.util.Map', 'java.util.UUID', 'java.util.Set');
this.registerVisorTask('queryCloseX2', internalVisor('query.VisorQueryCleanupTask'), internalVisor('query.VisorQueryCleanupTaskArg'));
diff --git a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts
index d841cb5..76aae1c 100644
--- a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts
+++ b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts
@@ -18,8 +18,8 @@
import _ from 'lodash';
import {nonEmpty, nonNil} from 'app/utils/lodashMixins';
import id8 from 'app/utils/id8';
-import {timer, merge, defer, from} from 'rxjs';
-import {mergeMap, tap, switchMap, exhaustMap, take} from 'rxjs/operators';
+import {timer, merge, defer, from, of} from 'rxjs';
+import {mergeMap, tap, switchMap, exhaustMap, take, filter, map, catchError} from 'rxjs/operators';
import {CSV} from 'app/services/CSV';
@@ -255,16 +255,16 @@ class Paragraph {
// Controller for SQL notebook screen.
export class NotebookCtrl {
- static $inject = ['IgniteInput', '$rootScope', '$scope', '$http', '$q', '$timeout', '$interval', '$animate', '$location', '$anchorScroll', '$state', '$filter', '$modal', '$popover', 'IgniteLoading', 'IgniteLegacyUtils', 'IgniteMessages', 'IgniteConfirm', 'AgentManager', 'IgniteChartColors', 'IgniteNotebook', 'IgniteNodes', 'uiGridExporterConstants', 'IgniteVersion', 'IgniteActivitiesData', 'JavaTypes', 'IgniteCopyToClipboard', 'CSV', 'IgniteErrorParser', 'DemoInfo'];
+ static $inject = ['IgniteInput', '$rootScope', '$scope', '$http', '$q', '$timeout', '$interval', '$animate', '$location', '$anchorScroll', '$state', '$filter', '$modal', '$popover', '$window', 'IgniteLoading', 'IgniteLegacyUtils', 'IgniteMessages', 'IgniteConfirm', 'AgentManager', 'IgniteChartColors', 'IgniteNotebook', 'IgniteNodes', 'uiGridExporterConstants', 'IgniteVersion', 'IgniteActivitiesData', 'JavaTypes', 'IgniteCopyToClipboard', 'CSV', 'IgniteErrorParser', 'DemoInfo'];
/**
* @param {CSV} CSV
*/
- constructor(private IgniteInput: InputDialog, $root, private $scope, $http, $q, $timeout, $interval, $animate, $location, $anchorScroll, $state, $filter, $modal, $popover, Loading, LegacyUtils, private Messages: ReturnType<typeof MessagesServiceFactory>, private Confirm: ReturnType<typeof LegacyConfirmServiceFactory>, agentMgr, IgniteChartColors, private Notebook: Notebook, Nodes, uiGridExporterConstants, Version, ActivitiesData, JavaTypes, IgniteCopyToClipboard, CSV, errorParser, De [...]
+ constructor(private IgniteInput: InputDialog, $root, private $scope, $http, $q, $timeout, $interval, $animate, $location, $anchorScroll, $state, $filter, $modal, $popover, $window, Loading, LegacyUtils, private Messages: ReturnType<typeof MessagesServiceFactory>, private Confirm: ReturnType<typeof LegacyConfirmServiceFactory>, agentMgr, IgniteChartColors, private Notebook: Notebook, Nodes, uiGridExporterConstants, Version, ActivitiesData, JavaTypes, IgniteCopyToClipboard, CSV, errorP [...]
const $ctrl = this;
this.CSV = CSV;
- Object.assign(this, { $root, $scope, $http, $q, $timeout, $interval, $animate, $location, $anchorScroll, $state, $filter, $modal, $popover, Loading, LegacyUtils, Messages, Confirm, agentMgr, IgniteChartColors, Notebook, Nodes, uiGridExporterConstants, Version, ActivitiesData, JavaTypes, errorParser, DemoInfo });
+ Object.assign(this, { $root, $scope, $http, $q, $timeout, $interval, $animate, $location, $anchorScroll, $state, $filter, $modal, $popover, $window, Loading, LegacyUtils, Messages, Confirm, agentMgr, IgniteChartColors, Notebook, Nodes, uiGridExporterConstants, Version, ActivitiesData, JavaTypes, errorParser, DemoInfo });
// Define template urls.
$ctrl.paragraphRateTemplateUrl = paragraphRateTemplateUrl;
@@ -1218,7 +1218,47 @@ export class NotebookCtrl {
_rebuildColumns(paragraph);
};
- const _showLoading = (paragraph, enable) => paragraph.loading = enable;
+ const _showLoading = (paragraph, enable) => {
+ if (paragraph.qryType === 'scan')
+ paragraph.scanningInProgress = enable;
+
+ paragraph.loading = enable;
+ };
+
+ const _fetchQueryResult = (paragraph, clearChart, res, qryArg) => {
+ if (!_.isNil(res.rows)) {
+ _processQueryResult(paragraph, clearChart, res);
+ _tryStartRefresh(paragraph);
+
+ $scope.$applyAsync();
+
+ return;
+ }
+
+ const subscription = timer(100, 500).pipe(
+ exhaustMap(() => agentMgr.queryFetchFistsPage(qryArg.nid, res.queryId, qryArg.pageSize)),
+ filter((res) => !_.isNil(res.rows)),
+ take(1),
+ map((res) => _fetchQueryResult(paragraph, false, res, qryArg)),
+ catchError((err) => {
+ if (paragraph.subscription) {
+ paragraph.subscription.unsubscribe();
+ paragraph.setError(err);
+
+ _showLoading(paragraph, false);
+ delete paragraph.subscription;
+
+ $scope.$applyAsync();
+ }
+
+ return of(err);
+ })
+ ).subscribe();
+
+ Object.defineProperty(paragraph, 'subscription', {value: subscription, configurable: true});
+
+ return subscription;
+ };
/**
* @param {Object} paragraph Query
@@ -1230,6 +1270,11 @@ export class NotebookCtrl {
const prevKeyCols = paragraph.chartKeyCols;
const prevValCols = paragraph.chartValCols;
+ if (paragraph.subscription) {
+ paragraph.subscription.unsubscribe();
+ delete paragraph.subscription;
+ }
+
if (!_.eq(paragraph.meta, res.columns)) {
paragraph.meta = [];
@@ -1323,12 +1368,37 @@ export class NotebookCtrl {
const _closeOldQuery = (paragraph) => {
const nid = paragraph.resNodeId;
- if (paragraph.queryId && _.find($scope.caches, ({nodes}) => _.find(nodes, {nid: nid.toUpperCase()})))
- return agentMgr.queryClose(nid, paragraph.queryId);
+ if (paragraph.queryId) {
+ const qryId = paragraph.queryId;
+ delete paragraph.queryId;
+
+ return agentMgr.queryClose(nid, qryId);
+ }
return $q.when();
};
+ $scope.cancelQuery = (paragraph) => {
+ if (paragraph.subscription) {
+ paragraph.subscription.unsubscribe();
+ delete paragraph.subscription;
+ }
+
+ _showLoading(paragraph, false);
+ this.$scope.stopRefresh(paragraph);
+
+ _closeOldQuery(paragraph)
+ .then(() => _showLoading(paragraph, false))
+ .catch((err) => {
+ _showLoading(paragraph, false);
+ paragraph.setError(err);
+ });
+ };
+
+ $scope.cancelQueryAvailable = (paragraph) => {
+ return !!paragraph.subscription;
+ };
+
/**
* @param {String} name Cache name.
* @param {Array.<String>} nids Cache name.
@@ -1381,9 +1451,23 @@ export class NotebookCtrl {
agentMgr.awaitCluster()
.then(() => _closeOldQuery(paragraph))
.then(() => args.localNid || _chooseNode(args.cacheName, false))
- .then((nid) => agentMgr.querySql(nid, args.cacheName, args.query, args.nonCollocatedJoins,
- args.enforceJoinOrder, false, !!args.localNid, args.pageSize, args.lazy, args.collocated))
- .then((res) => _processQueryResult(paragraph, false, res))
+ .then((nid) => {
+ const qryArg = {
+ nid,
+ cacheName: args.cacheName,
+ query: args.query,
+ nonCollocatedJoins: args.nonCollocatedJoins,
+ enforceJoinOrder: args.enforceJoinOrder,
+ replicatedOnly: false,
+ local: !!args.localNid,
+ pageSize: args.pageSize,
+ lazy: args.lazy,
+ collocated: args.collocated
+ };
+
+ return agentMgr.querySql(qryArg)
+ .then((res) => _fetchQueryResult(paragraph, false, res, qryArg));
+ })
.catch((err) => paragraph.setError(err));
};
@@ -1393,8 +1477,6 @@ export class NotebookCtrl {
if (_.get(paragraph, 'rate.installed') && paragraph.queryExecuted()) {
$scope.chartAcceptKeyColumn(paragraph, TIME_LINE);
- _executeRefresh(paragraph);
-
const delay = paragraph.rate.value * paragraph.rate.unit;
paragraph.rate.stopTime = $interval(_executeRefresh, delay, 0, false, paragraph);
@@ -1430,6 +1512,19 @@ export class NotebookCtrl {
return $scope.ddlAvailable() && !paragraph.useAsDefaultSchema ? null : paragraph.cacheName;
};
+ const _initQueryResult = (paragraph, res) => {
+ paragraph.resNodeId = res.responseNodeId;
+ paragraph.queryId = res.queryId;
+
+ paragraph.rows = [];
+ paragraph.gridOptions.adjustHeight(paragraph.rows.length);
+
+ paragraph.meta = [];
+ paragraph.setError({message: ''});
+
+ paragraph.hasNext = false;
+ };
+
$scope.execute = (paragraph, local = false) => {
const nonCollocatedJoins = !!paragraph.nonCollocatedJoins;
const enforceJoinOrder = !!paragraph.enforceJoinOrder;
@@ -1467,13 +1562,25 @@ export class NotebookCtrl {
ActivitiesData.post({ group: 'sql', action: '/queries/execute' });
const qry = args.maxPages ? addLimit(args.query, args.pageSize * args.maxPages) : query;
+ const qryArg = {
+ nid,
+ cacheName: args.cacheName,
+ query: qry,
+ nonCollocatedJoins,
+ enforceJoinOrder,
+ replicatedOnly: false,
+ local,
+ pageSize: args.pageSize,
+ lazy,
+ collocated
+ };
- return agentMgr.querySql(nid, args.cacheName, qry, nonCollocatedJoins, enforceJoinOrder, false, local, args.pageSize, lazy, collocated);
- })
- .then((res) => {
- _processQueryResult(paragraph, true, res);
+ return agentMgr.querySql(qryArg)
+ .then((res) => {
+ _initQueryResult(paragraph, res);
- _tryStartRefresh(paragraph);
+ return _fetchQueryResult(paragraph, true, res, qryArg);
+ });
})
.catch((err) => {
paragraph.setError(err);
@@ -1525,9 +1632,25 @@ export class NotebookCtrl {
ActivitiesData.post({ group: 'sql', action: '/queries/explain' });
- return agentMgr.querySql(nid, args.cacheName, args.query, nonCollocatedJoins, enforceJoinOrder, false, false, args.pageSize, false, collocated);
+ const qryArg = {
+ nid,
+ cacheName: args.cacheName,
+ query: args.query,
+ nonCollocatedJoins,
+ enforceJoinOrder,
+ replicatedOnly: false,
+ local: false,
+ pageSize: args.pageSize,
+ lazy: false, collocated
+ };
+
+ return agentMgr.querySql(qryArg)
+ .then((res) => {
+ _initQueryResult(paragraph, res);
+
+ return _fetchQueryResult(paragraph, true, res, qryArg);
+ });
})
- .then((res) => _processQueryResult(paragraph, true, res))
.catch((err) => {
paragraph.setError(err);
@@ -1545,7 +1668,6 @@ export class NotebookCtrl {
$scope.scanAvailable(paragraph) && _chooseNode(cacheName, local)
.then((nid) => {
paragraph.localQueryMode = local;
- paragraph.scanningInProgress = true;
Notebook.save($scope.notebook)
.catch(Messages.showError);
@@ -1569,15 +1691,28 @@ export class NotebookCtrl {
ActivitiesData.post({ group: 'sql', action: '/queries/scan' });
- return agentMgr.queryScan(nid, cacheName, filter, false, caseSensitive, false, local, pageSize);
+ const qryArg = {
+ nid,
+ cacheName,
+ filter,
+ regEx: false,
+ caseSensitive,
+ near: false,
+ local,
+ pageSize
+ };
+
+ return agentMgr.queryScan(qryArg).then((res) => {
+ _initQueryResult(paragraph, res);
+
+ return _fetchQueryResult(paragraph, true, res, qryArg);
+ });
})
- .then((res) => _processQueryResult(paragraph, true, res))
.catch((err) => {
paragraph.setError(err);
_showLoading(paragraph, false);
- })
- .then(() => paragraph.scanningInProgress = false);
+ });
});
};
@@ -1718,8 +1853,26 @@ export class NotebookCtrl {
return Promise.resolve(args.localNid || _chooseNode(args.cacheName, false))
.then((nid) => args.type === 'SCAN'
- ? agentMgr.queryScanGetAll(nid, args.cacheName, args.query, !!args.regEx, !!args.caseSensitive, !!args.near, !!args.localNid)
- : agentMgr.querySqlGetAll(nid, args.cacheName, args.query, !!args.nonCollocatedJoins, !!args.enforceJoinOrder, false, !!args.localNid, !!args.lazy, !!args.collocated))
+ ? agentMgr.queryScanGetAll({
+ nid,
+ cacheName: args.cacheName,
+ filter: args.query,
+ regEx: !!args.regEx,
+ caseSensitive: !!args.caseSensitive,
+ near: !!args.near,
+ local: !!args.localNid
+ })
+ : agentMgr.querySqlGetAll({
+ nid,
+ cacheName: args.cacheName,
+ query: args.query,
+ nonCollocatedJoins: !!args.nonCollocatedJoins,
+ enforceJoinOrder: !!args.enforceJoinOrder,
+ replicatedOnly: false,
+ local: !!args.localNid,
+ lazy: !!args.lazy,
+ collocated: !!args.collocated
+ }))
.then((res) => _export(exportFileName(paragraph, true), paragraph.gridOptions.columnDefs, res.columns, res.rows))
.catch(Messages.showError)
.then(() => {
@@ -1758,7 +1911,7 @@ export class NotebookCtrl {
paragraph.rate.installed = true;
if (paragraph.queryExecuted() && !paragraph.scanExplain())
- _tryStartRefresh(paragraph);
+ _executeRefresh(paragraph);
};
$scope.stopRefresh = function(paragraph) {
@@ -1904,6 +2057,22 @@ export class NotebookCtrl {
$modal({scope, templateUrl: messageTemplateUrl, show: true});
}
};
+
+ $window.addEventListener('beforeunload', () => {
+ this._closeOpenedQueries(this.$scope.notebook.paragraphs);
+ });
+ }
+
+ _closeOpenedQueries(paragraphs) {
+ _.forEach(paragraphs, ({queryId, subscription, resNodeId}) => {
+ if (subscription)
+ subscription.unsubscribe();
+
+ if (queryId) {
+ this.agentMgr.queryClose(resNodeId, queryId)
+ .catch(() => { /* No-op. */ });
+ }
+ });
}
scanActions: QueryActions<Paragraph & {type: 'scan'}> = [
@@ -1964,6 +2133,8 @@ export class NotebookCtrl {
await this.Confirm.confirm('Are you sure you want to remove query: "' + paragraph.name + '"?');
this.$scope.stopRefresh(paragraph);
+ this._closeOpenedQueries([paragraph]);
+
const paragraph_idx = _.findIndex(this.$scope.notebook.paragraphs, (item) => paragraph === item);
const panel_idx = _.findIndex(this.$scope.expandedParagraphs, (item) => paragraph_idx === item);
@@ -1995,6 +2166,8 @@ export class NotebookCtrl {
}
$onDestroy() {
+ this._closeOpenedQueries(this.$scope.notebook.paragraphs);
+
if (this.refresh$)
this.refresh$.unsubscribe();
}
diff --git a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/template.tpl.pug b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/template.tpl.pug
index e2c4408..c4033e6 100644
--- a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/template.tpl.pug
+++ b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/template.tpl.pug
@@ -187,6 +187,9 @@ mixin query-actions
button.btn-ignite.btn-ignite--secondary(ng-disabled='!queryAvailable(paragraph)' ng-click='explain(paragraph)' data-placement='bottom' bs-tooltip='' data-title='{{queryTooltip(paragraph, "explain query")}}')
| Explain
+ button.btn-ignite.btn-ignite--secondary(ng-if='cancelQueryAvailable(paragraph)' ng-click='cancelQuery(paragraph)' data-placement='bottom' bs-tooltip='' data-title='{{"Cancel query execution"}}')
+ | Cancel
+
mixin table-result-heading-query
.total.row
.col-xs-7
@@ -339,6 +342,9 @@ mixin paragraph-scan
span.icon-left.fa.fa-fw.fa-play(ng-hide='paragraph.checkScanInProgress(true)')
span.icon-left.fa.fa-fw.fa-refresh.fa-spin(ng-show='paragraph.checkScanInProgress(true)')
| Scan on selected node
+
+ button.btn-ignite.btn-ignite--secondary(ng-if='cancelQueryAvailable(paragraph)' ng-click='cancelQuery(paragraph)' data-placement='bottom' bs-tooltip='' data-title='{{"Cancel query execution"}}')
+ | Cancel
div
.col-sm-12.sql-result(ng-if='paragraph.queryExecuted() && !paragraph.scanningInProgress' ng-switch='paragraph.resultType()')
diff --git a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
index faf0672..8e4dd2d 100644
--- a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
+++ b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js
@@ -18,8 +18,8 @@
import _ from 'lodash';
import {nonEmpty, nonNil} from 'app/utils/lodashMixins';
-import {BehaviorSubject} from 'rxjs';
-import {first, pluck, tap, distinctUntilChanged, map, filter} from 'rxjs/operators';
+import {timer, BehaviorSubject, of, from} from 'rxjs';
+import {exhaustMap, first, pluck, tap, distinctUntilChanged, map, filter, expand, takeWhile, last} from 'rxjs/operators';
import io from 'socket.io-client';
@@ -720,14 +720,14 @@ export default class AgentManager {
* @param {Boolean} enforceJoinOrder Flag whether enforce join order is enabled.
* @param {Boolean} replicatedOnly Flag whether query contains only replicated tables.
* @param {Boolean} local Flag whether to execute query locally.
- * @param {Number} pageSz
+ * @param {Number} pageSize
* @param {Boolean} [lazy] query flag.
* @param {Boolean} [collocated] Collocated query.
* @returns {Promise}
*/
- querySql(nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSz, lazy = false, collocated = false) {
+ querySql({nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSize, lazy = false, collocated = false}) {
if (this.available(IGNITE_2_0)) {
- let args = [cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSz];
+ let args = [cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSize];
if (this.available(...COLLOCATED_QUERY_SINCE))
args = [...args, lazy, collocated];
@@ -747,11 +747,11 @@ export default class AgentManager {
let queryPromise;
if (enforceJoinOrder)
- queryPromise = this.visorTask('querySqlV3', nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, local, pageSz);
+ queryPromise = this.visorTask('querySqlV3', nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, local, pageSize);
else if (nonCollocatedJoins)
- queryPromise = this.visorTask('querySqlV2', nid, cacheName, query, nonCollocatedJoins, local, pageSz);
+ queryPromise = this.visorTask('querySqlV2', nid, cacheName, query, nonCollocatedJoins, local, pageSize);
else
- queryPromise = this.visorTask('querySql', nid, cacheName, query, local, pageSz);
+ queryPromise = this.visorTask('querySql', nid, cacheName, query, local, pageSize);
return queryPromise
.then(({key, value}) => {
@@ -764,6 +764,21 @@ export default class AgentManager {
/**
* @param {String} nid Node id.
+ * @param {String} queryId Query ID.
+ * @param {Number} pageSize
+ * @returns {Promise}
+ */
+ queryFetchFistsPage(nid, queryId, pageSize) {
+ return this.visorTask('queryFetchFirstPage', nid, queryId, pageSize).then(({error, result}) => {
+ if (_.isEmpty(error))
+ return result;
+
+ return Promise.reject(error);
+ });
+ }
+
+ /**
+ * @param {String} nid Node id.
* @param {Number} queryId
* @param {Number} pageSize
* @returns {Promise}
@@ -775,6 +790,34 @@ export default class AgentManager {
return this.visorTask('queryFetch', nid, queryId, pageSize);
}
+ _fetchQueryAllResults(acc, pageSize) {
+ if (!_.isNil(acc.rows)) {
+ if (!acc.hasMore)
+ return acc;
+
+ return of(acc).pipe(
+ expand((acc) => {
+ return from(this.queryNextPage(acc.responseNodeId, acc.queryId, pageSize)
+ .then((res) => {
+ acc.rows = acc.rows.concat(res.rows);
+ acc.hasMore = res.hasMore;
+
+ return acc;
+ }));
+ }),
+ takeWhile((acc) => acc.hasMore),
+ last()
+ ).toPromise();
+ }
+
+ return timer(100, 500).pipe(
+ exhaustMap(() => this.queryFetchFistsPage(acc.responseNodeId, acc.queryId, pageSize)),
+ filter((res) => !_.isNil(res.rows)),
+ first(),
+ map((res) => this._fetchQueryAllResults(res, 1024))
+ ).toPromise();
+ }
+
/**
* @param {String} nid Node id.
* @param {String} cacheName Cache name.
@@ -787,26 +830,12 @@ export default class AgentManager {
* @param {Boolean} collocated Collocated query.
* @returns {Promise}
*/
- querySqlGetAll(nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, lazy, collocated) {
+ querySqlGetAll({nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, lazy, collocated}) {
// Page size for query.
- const pageSz = 1024;
+ const pageSize = 1024;
- const fetchResult = (acc) => {
- if (!acc.hasMore)
- return acc;
-
- return this.queryNextPage(acc.responseNodeId, acc.queryId, pageSz)
- .then((res) => {
- acc.rows = acc.rows.concat(res.rows);
-
- acc.hasMore = res.hasMore;
-
- return fetchResult(acc);
- });
- };
-
- return this.querySql(nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSz, lazy, collocated)
- .then(fetchResult);
+ return this.querySql({nid, cacheName, query, nonCollocatedJoins, enforceJoinOrder, replicatedOnly, local, pageSize, lazy, collocated})
+ .then((res) => this._fetchQueryAllResults(res, pageSize));
}
/**
@@ -834,7 +863,7 @@ export default class AgentManager {
* @param {Number} pageSize Page size.
* @returns {Promise}
*/
- queryScan(nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize) {
+ queryScan({nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize}) {
if (this.available(IGNITE_2_0)) {
return this.visorTask('queryScanX2', nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize)
.then(({error, result}) => {
@@ -854,7 +883,7 @@ export default class AgentManager {
const prefix = caseSensitive ? SCAN_CACHE_WITH_FILTER_CASE_SENSITIVE : SCAN_CACHE_WITH_FILTER;
const query = `${prefix}${filter}`;
- return this.querySql(nid, cacheName, query, false, false, false, local, pageSize);
+ return this.querySql({nid, cacheName, query, nonCollocatedJoins: false, enforceJoinOrder: false, replicatedOnly: false, local, pageSize});
}
/**
@@ -867,26 +896,12 @@ export default class AgentManager {
* @param {Boolean} local Flag whether to execute query locally.
* @returns {Promise}
*/
- queryScanGetAll(nid, cacheName, filter, regEx, caseSensitive, near, local) {
+ queryScanGetAll({nid, cacheName, filter, regEx, caseSensitive, near, local}) {
// Page size for query.
- const pageSz = 1024;
-
- const fetchResult = (acc) => {
- if (!acc.hasMore)
- return acc;
-
- return this.queryNextPage(acc.responseNodeId, acc.queryId, pageSz)
- .then((res) => {
- acc.rows = acc.rows.concat(res.rows);
-
- acc.hasMore = res.hasMore;
-
- return fetchResult(acc);
- });
- };
+ const pageSize = 1024;
- return this.queryScan(nid, cacheName, filter, regEx, caseSensitive, near, local, pageSz)
- .then(fetchResult);
+ return this.queryScan({nid, cacheName, filter, regEx, caseSensitive, near, local, pageSize})
+ .then((res) => this._fetchQueryAllResults(res, pageSize));
}
/**