You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/02/06 07:56:41 UTC
[7/7] ignite git commit: IGNITE-4436 WIP.
IGNITE-4436 WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/effc624d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/effc624d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/effc624d
Branch: refs/heads/ignite-4436-2
Commit: effc624da659724886bff6685d53f535750a3ea5
Parents: 2a572c4
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Mon Feb 6 11:07:45 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Feb 6 14:56:23 2017 +0700
----------------------------------------------------------------------
.../processors/query/GridRunningQueryInfo.java | 36 ++++-
.../query/VisorCollectCurrentQueriesTask.java | 17 +--
.../ignite/internal/visor/query/VisorQuery.java | 69 ----------
.../internal/visor/query/VisorRunningQuery.java | 119 ++++++++++++++++
.../processors/query/h2/IgniteH2Indexing.java | 66 ++++-----
.../h2/twostep/GridReduceQueryExecutor.java | 9 +-
.../cache/CacheSqlQueryValueCopySelfTest.java | 137 ++++++++++++++++++-
.../cache/GridCacheCrossCacheQuerySelfTest.java | 103 --------------
8 files changed, 338 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
index ea37d15..d77c8c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.query;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+
/**
* Query descriptor.
*/
@@ -27,6 +29,9 @@ public class GridRunningQueryInfo {
/** */
private final String qry;
+ /** Query type. */
+ private final GridCacheQueryType qryType;
+
/** */
private final String cache;
@@ -36,19 +41,27 @@ public class GridRunningQueryInfo {
/** */
private final GridQueryCancel cancel;
+ /** */
+ private final boolean loc;
+
/**
* @param id Query ID.
* @param qry Query text.
+ * @param qryType Query type.
* @param cache Cache where query was executed.
* @param startTime Query start time.
* @param cancel Query cancel.
+ * @param loc Local query flag.
*/
- public GridRunningQueryInfo(Long id, String qry, String cache, long startTime, GridQueryCancel cancel) {
+ public GridRunningQueryInfo(Long id, String qry, GridCacheQueryType qryType, String cache, long startTime,
+ GridQueryCancel cancel, boolean loc) {
this.id = id;
this.qry = qry;
+ this.qryType = qryType;
this.cache = cache;
this.startTime = startTime;
this.cancel = cancel;
+ this.loc = loc;
}
/**
@@ -66,6 +79,13 @@ public class GridRunningQueryInfo {
}
/**
+ * @return Query type.
+ */
+ public GridCacheQueryType queryType() {
+ return qryType;
+ }
+
+ /**
* @return Cache where query was executed.
*/
public String cache() {
@@ -95,4 +115,18 @@ public class GridRunningQueryInfo {
if (cancel != null)
cancel.cancel();
}
+
+ /**
+ * @return {@code true} if query can be cancelled.
+ */
+ public boolean cancelable() {
+ return cancel != null;
+ }
+
+ /**
+ * @return {@code true} if query is local.
+ */
+ public boolean local() {
+ return loc;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
index 0dc0ec5..621b2bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
@@ -35,7 +35,7 @@ import org.jetbrains.annotations.Nullable;
* Task to collect currently running queries.
*/
@GridInternal
-public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map<UUID, Collection<VisorQuery>>, Collection<VisorQuery>> {
+public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map<UUID, Collection<VisorRunningQuery>>, Collection<VisorRunningQuery>> {
/** */
private static final long serialVersionUID = 0L;
@@ -45,12 +45,12 @@ public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map
}
/** {@inheritDoc} */
- @Nullable @Override protected Map<UUID, Collection<VisorQuery>> reduce0(List<ComputeJobResult> results) throws IgniteException {
- Map<UUID, Collection<VisorQuery>> map = new HashMap<>();
+ @Nullable @Override protected Map<UUID, Collection<VisorRunningQuery>> reduce0(List<ComputeJobResult> results) throws IgniteException {
+ Map<UUID, Collection<VisorRunningQuery>> map = new HashMap<>();
for (ComputeJobResult res : results)
if (res.getException() != null) {
- Collection<VisorQuery> queries = res.getData();
+ Collection<VisorRunningQuery> queries = res.getData();
map.put(res.getNode().id(), queries);
}
@@ -61,7 +61,7 @@ public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map
/**
* Job to collect currently running queries from node.
*/
- private static class VisorCollectCurrentQueriesJob extends VisorJob<Long, Collection<VisorQuery>> {
+ private static class VisorCollectCurrentQueriesJob extends VisorJob<Long, Collection<VisorRunningQuery>> {
/**
* Create job with specified argument.
*
@@ -73,13 +73,14 @@ public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map
}
/** {@inheritDoc} */
- @Override protected Collection<VisorQuery> run(@Nullable Long duration) throws IgniteException {
+ @Override protected Collection<VisorRunningQuery> run(@Nullable Long duration) throws IgniteException {
Collection<GridRunningQueryInfo> queries = ignite.context().query().runningQueries(duration);
- Collection<VisorQuery> res = new ArrayList<>(queries.size());
+ Collection<VisorRunningQuery> res = new ArrayList<>(queries.size());
for (GridRunningQueryInfo qry : queries)
- res.add(new VisorQuery(qry.id(), qry.query(), qry.cache()));
+ res.add(new VisorRunningQuery(qry.id(), qry.query(), qry.queryType(), qry.cache(), qry.startTime(),
+ qry.cancelable(), qry.local()));
return res;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
deleted file mode 100644
index e9beff9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.visor.query;
-
-import java.io.Serializable;
-
-/**
- * Arguments for {@link VisorQueryTask}.
- */
-public class VisorQuery implements Serializable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private Long id;
-
- /** Query text. */
- private String qry;
-
- /** Cache name for query. */
- private String cache;
-
- /**
- * @param id Query ID.
- * @param qry Query text.
- * @param cache Cache where query was executed.
- */
- public VisorQuery(Long id, String qry, String cache) {
- this.id = id;
- this.qry = qry;
- this.cache = cache;
- }
-
- /**
- * @return Query ID.
- */
- public Long id() {
- return id;
- }
-
- /**
- * @return Query txt.
- */
- public String query() {
- return qry;
- }
-
- /**
- * @return Cache name.
- */
- public String getCache() {
- return cache;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java
new file mode 100644
index 0000000..5605ea2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+
+/**
+ * Descriptor of running query.
+ */
+public class VisorRunningQuery implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long id;
+
+ /** Query text. */
+ private String qry;
+
+ /** Query type. */
+ private GridCacheQueryType qryType;
+
+ /** Cache name for query. */
+ private String cache;
+
+ /** */
+ private long startTime;
+
+ /** */
+ private boolean cancellable;
+
+ /** */
+ private boolean loc;
+
+ /**
+ * @param id Query ID.
+ * @param qry Query text.
+ * @param qryType Query type.
+ * @param cache Cache where query was executed.
+ * @param startTime Query start time.
+ * @param cancellable {@code true} if query can be canceled.
+ * @param loc {@code true} if query is local.
+ */
+ public VisorRunningQuery(long id, String qry, GridCacheQueryType qryType, String cache, long startTime,
+ boolean cancellable, boolean loc) {
+ this.id = id;
+ this.qry = qry;
+ this.qryType = qryType;
+ this.cache = cache;
+ this.startTime = startTime;
+ this.cancellable = cancellable;
+ this.loc = loc;
+ }
+
+ /**
+ * @return Query ID.
+ */
+ public long id() {
+ return id;
+ }
+
+ /**
+ * @return Query txt.
+ */
+ public String query() {
+ return qry;
+ }
+
+ /**
+ * @return Query type.
+ */
+ public GridCacheQueryType queryType() {
+ return qryType;
+ }
+
+ /**
+ * @return Cache name.
+ */
+ public String getCache() {
+ return cache;
+ }
+
+ /**
+ * @return Query start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * @return {@code true} if query can be cancelled.
+ */
+ public boolean isCancelable() {
+ return cancellable;
+ }
+
+ /**
+ * @return {@code true} if query is local.
+ */
+ public boolean isLocal() {
+ return loc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index c0f5f09..5be4f03 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -182,6 +182,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_
import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.IgniteSystemProperties.getString;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT;
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL;
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED;
@@ -782,8 +785,19 @@ public class IgniteH2Indexing implements GridQueryIndexing {
IndexingQueryFilter filters) throws IgniteCheckedException {
TableDescriptor tbl = tableDescriptor(spaceName, type);
- if (tbl != null && tbl.luceneIdx != null)
- return tbl.luceneIdx.query(qry, filters);
+ if (tbl != null && tbl.luceneIdx != null) {
+ GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, spaceName,
+ U.currentTimeMillis(), null, true);
+
+ try {
+ runs.put(run.id(), run);
+
+ return tbl.luceneIdx.query(qry, filters);
+ }
+ finally {
+ runs.remove(run.id());
+ }
+ }
return new GridEmptyCloseableIterator<>();
}
@@ -841,7 +855,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridH2QueryContext.set(ctx);
- GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, spaceName, U.currentTimeMillis(), cancel);
+ GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL_FIELDS,
+ spaceName, U.currentTimeMillis(), cancel, true);
runs.putIfAbsent(run.id(), run);
@@ -1103,7 +1118,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false));
- GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), spaceName, qry, U.currentTimeMillis(), null);
+ GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, spaceName,
+ U.currentTimeMillis(), null, true);
runs.put(run.id(), run);
@@ -2269,11 +2285,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public Collection<GridRunningQueryInfo> runningQueries(long duration) {
- return rdcQryExec.longRunningQueries(duration);
+ Collection<GridRunningQueryInfo> res = new ArrayList<>();
+
+ res.addAll(runs.values());
+ res.addAll(rdcQryExec.longRunningQueries(duration));
+
+ return res;
}
/** {@inheritDoc} */
@Override public void cancelQueries(Set<Long> queries) {
+ for (Long qryId : queries) {
+ GridRunningQueryInfo run = runs.get(qryId);
+
+ if (run != null)
+ run.cancel();
+ }
+
rdcQryExec.cancelQueries(queries);
}
@@ -3191,32 +3219,4 @@ public class IgniteH2Indexing implements GridQueryIndexing {
lastUsage = U.currentTimeMillis();
}
}
-
- /**
- * Query run.
- */
- private static class QueryRun {
- /** */
- private final GridRunningQueryInfo qry;
-
- /** */
- private final long startTime;
-
- /** */
- private final GridQueryCancel cancel;
-
- /**
- *
- * @param id
- * @param qry
- * @param cache
- * @param startTime
- * @param cancel
- */
- public QueryRun(Long id, String qry, String cache, long startTime, GridQueryCancel cancel) {
- this.qry = new GridRunningQueryInfo(id, qry, cache, startTime, cancel);
- this.startTime = startTime;
- this.cancel = cancel;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index febe810..3540141 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -99,6 +99,7 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
/**
@@ -1332,8 +1333,10 @@ public class GridReduceQueryExecutor {
* @param queries Queries IDs to cancel.
*/
public void cancelQueries(Set<Long> queries) {
- for (QueryRun run : runs.values()) {
- if (queries.contains(run.qry.id()))
+ for (Long qryId : queries) {
+ QueryRun run = runs.get(qryId);
+
+ if (run != null)
run.qry.cancel();
}
}
@@ -1371,7 +1374,7 @@ public class GridReduceQueryExecutor {
* @param cancel Query cancel handler.
*/
private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) {
- this.qry = new GridRunningQueryInfo(id, qry, cache, startTime, cancel);
+ this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, cache, startTime, cancel, false);
this.conn = (JdbcConnection)conn;
this.idxs = new ArrayList<>(idxsCnt);
this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
index e47e893..a91f65e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
@@ -17,15 +17,23 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -54,6 +62,7 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
cc.setCopyOnRead(true);
cc.setIndexedTypes(Integer.class, Value.class);
+ cc.setSqlFunctionClasses(TestSQLFunctions.class);
cfg.setCacheConfiguration(cc);
@@ -195,6 +204,108 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
check(cache);
}
+ /**
+ * Run specified query in separate thread.
+ *
+ * @param qry Query to execute.
+ */
+ private IgniteInternalFuture<?> runQueryAsync(final Query<?> qry) throws Exception {
+ return multithreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ grid(0).cache(null).query(qry).getAll();
+ }
+ catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }, 1, "run-query");
+ }
+
+ /**
+ * Test collecting info about running.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRunningQueries() throws Exception {
+ IgniteInternalFuture<?> fut = runQueryAsync(new SqlFieldsQuery("select _val, sleep(1000) from Value limit 3"));
+
+ Thread.sleep(500);
+
+ GridQueryProcessor qryProc = ((IgniteKernal)grid(0)).context().query();
+
+ Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(0);
+
+ assertEquals(1, queries.size());
+
+ fut.get();
+
+ queries = qryProc.runningQueries(0);
+
+ assertEquals(0, queries.size());
+
+ SqlFieldsQuery qry = new SqlFieldsQuery("select _val, sleep(1000) from Value limit 3");
+ qry.setLocal(true);
+
+ fut = runQueryAsync(qry);
+
+ Thread.sleep(500);
+
+ queries = qryProc.runningQueries(0);
+
+ assertEquals(1, queries.size());
+
+ fut.get();
+
+ queries = qryProc.runningQueries(0);
+
+ assertEquals(0, queries.size());
+ }
+
+ /**
+ * Test collecting info about running.
+ *
+ * @throws Exception If failed.
+ */
+ public void testCancelingQueries() throws Exception {
+ final Ignite ignite = grid(0);
+
+ runQueryAsync(new SqlFieldsQuery("select * from (select _val, sleep(100) from Value limit 50)"));
+
+ Thread.sleep(500);
+
+ final GridQueryProcessor qryProc = ((IgniteKernal)ignite).context().query();
+
+ Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(0);
+
+ assertEquals(1, queries.size());
+
+ final Collection<GridRunningQueryInfo> finalQueries = queries;
+
+ for (GridRunningQueryInfo query : finalQueries)
+ qryProc.cancelQueries(Collections.singleton(query.id()));
+
+ int n = 100;
+
+ // Give cluster some time to cancel query and cleanup resources.
+ while (n > 0) {
+ Thread.sleep(100);
+
+ queries = qryProc.runningQueries(0);
+
+ if (queries.isEmpty())
+ break;
+
+ log.info(">>>> Wait for cancel: " + n);
+
+ n--;
+ }
+
+ queries = qryProc.runningQueries(0);
+
+ assertEquals(0, queries.size());
+ }
+
/** */
private static class Value {
/** */
@@ -223,4 +334,28 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
assertEquals(KEYS, cnt);
}
-}
\ No newline at end of file
+
+ /**
+ * Utility class with custom SQL functions.
+ */
+ public static class TestSQLFunctions {
+ /**
+ * Sleep function to simulate long running queries.
+ *
+ * @param x Time to sleep.
+ * @return Return specified argument.
+ */
+ @QuerySqlFunction
+ public static long sleep(long x) {
+ if (x >= 0)
+ try {
+ Thread.sleep(x);
+ }
+ catch (InterruptedException ignored) {
+ // No-op.
+ }
+
+ return x;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index d6a766d..337ae29 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -58,30 +58,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
/** */
private Ignite ignite;
- /**
- * Utility class with custom SQL functions.
- */
- public static class TestSQLFunctions {
- /**
- * Sleep function to simulate long running queries.
- *
- * @param x Time to sleep.
- * @return Return specified argument.
- */
- @QuerySqlFunction
- public static long sleep(long x) {
- if (x >= 0)
- try {
- Thread.sleep(x);
- }
- catch (InterruptedException ignored) {
- // No-op.
- }
-
- return x;
- }
- }
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration c = super.getConfiguration(gridName);
@@ -141,8 +117,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
else
throw new IllegalStateException("mode: " + mode);
- cc.setSqlFunctionClasses(TestSQLFunctions.class);
-
return cc;
}
@@ -248,83 +222,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
}
/**
- * Test collecting info about running.
- *
- * @throws Exception If failed.
- */
- public void testRunningQueries() throws Exception {
- IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
- @Override public void run() {
- try {
- SqlFieldsQuery qry = new SqlFieldsQuery("select productId, sleep(3000) from FactPurchase limit 1");
-
- ignite.cache("partitioned").query(qry).getAll();
- }
- catch (Throwable e) {
- e.printStackTrace();
- }
- }
- }, 1);
-
- Thread.sleep(1000);
-
- GridQueryProcessor qryProc = ((IgniteKernal)ignite).context().query();
-
- Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(500);
-
- assertEquals(1, queries.size());
-
- fut.get();
-
- queries = qryProc.runningQueries(500);
-
- assertEquals(0, queries.size());
- }
-
- /**
- * Test collecting info about running.
- *
- * @throws Exception If failed.
- */
- public void testCancelingQueries() throws Exception {
- IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
- @Override public void run() {
- try {
- SqlFieldsQuery qry = new SqlFieldsQuery("select productId, sleep(500) from FactPurchase limit 100");
-
- ignite.cache("partitioned").query(qry).getAll();
- }
- catch (Throwable e) {
- e.printStackTrace();
- }
- }
- }, 1);
-
- Thread.sleep(1000);
-
- GridQueryProcessor queryProc = ((IgniteKernal)ignite).context().query();
-
- Collection<GridRunningQueryInfo> queries = queryProc.runningQueries(500);
-
- assertEquals(1, queries.size());
-
- for (GridRunningQueryInfo query : queries)
- queryProc.cancelQueries(Collections.singleton(query.id()));
-
- Thread.sleep(2000); // Give cluster some time to cancel query and cleanup resources.
-
- queries = queryProc.runningQueries(500);
-
- assertEquals(0, queries.size());
-
- fut.get();
-
- queries = queryProc.runningQueries(500);
-
- assertEquals(0, queries.size());
- }
-
- /**
* @throws Exception If failed.
*/
public void testApiQueries() throws Exception {