You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/02/06 07:56:41 UTC

[7/7] ignite git commit: IGNITE-4436 WIP.

IGNITE-4436 WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/effc624d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/effc624d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/effc624d

Branch: refs/heads/ignite-4436-2
Commit: effc624da659724886bff6685d53f535750a3ea5
Parents: 2a572c4
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Mon Feb 6 11:07:45 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Mon Feb 6 14:56:23 2017 +0700

----------------------------------------------------------------------
 .../processors/query/GridRunningQueryInfo.java  |  36 ++++-
 .../query/VisorCollectCurrentQueriesTask.java   |  17 +--
 .../ignite/internal/visor/query/VisorQuery.java |  69 ----------
 .../internal/visor/query/VisorRunningQuery.java | 119 ++++++++++++++++
 .../processors/query/h2/IgniteH2Indexing.java   |  66 ++++-----
 .../h2/twostep/GridReduceQueryExecutor.java     |   9 +-
 .../cache/CacheSqlQueryValueCopySelfTest.java   | 137 ++++++++++++++++++-
 .../cache/GridCacheCrossCacheQuerySelfTest.java | 103 --------------
 8 files changed, 338 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
index ea37d15..d77c8c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.query;
 
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+
 /**
  * Query descriptor.
  */
@@ -27,6 +29,9 @@ public class GridRunningQueryInfo {
     /** */
     private final String qry;
 
+    /** Query type. */
+    private final GridCacheQueryType qryType;
+
     /** */
     private final String cache;
 
@@ -36,19 +41,27 @@ public class GridRunningQueryInfo {
     /** */
     private final GridQueryCancel cancel;
 
+    /** */
+    private final boolean loc;
+
     /**
      * @param id Query ID.
      * @param qry Query text.
+     * @param qryType Query type.
      * @param cache Cache where query was executed.
      * @param startTime Query start time.
      * @param cancel Query cancel.
+     * @param loc Local query flag.
      */
-    public GridRunningQueryInfo(Long id, String qry, String cache, long startTime, GridQueryCancel cancel) {
+    public GridRunningQueryInfo(Long id, String qry, GridCacheQueryType qryType, String cache, long startTime,
+        GridQueryCancel cancel, boolean loc) {
         this.id = id;
         this.qry = qry;
+        this.qryType = qryType;
         this.cache = cache;
         this.startTime = startTime;
         this.cancel = cancel;
+        this.loc = loc;
     }
 
     /**
@@ -66,6 +79,13 @@ public class GridRunningQueryInfo {
     }
 
     /**
+     * @return Query type.
+     */
+    public GridCacheQueryType queryType() {
+        return qryType;
+    }
+
+    /**
      * @return Cache where query was executed.
      */
     public String cache() {
@@ -95,4 +115,18 @@ public class GridRunningQueryInfo {
         if (cancel != null)
             cancel.cancel();
     }
+
+    /**
+     * @return {@code true} if query can be cancelled.
+     */
+    public boolean cancelable() {
+        return cancel != null;
+    }
+
+    /**
+     * @return {@code true} if query is local.
+     */
+    public boolean local() {
+        return loc;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
index 0dc0ec5..621b2bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
@@ -35,7 +35,7 @@ import org.jetbrains.annotations.Nullable;
  * Task to collect currently running queries.
  */
 @GridInternal
-public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map<UUID, Collection<VisorQuery>>, Collection<VisorQuery>> {
+public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map<UUID, Collection<VisorRunningQuery>>, Collection<VisorRunningQuery>> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -45,12 +45,12 @@ public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override protected Map<UUID, Collection<VisorQuery>> reduce0(List<ComputeJobResult> results) throws IgniteException {
-        Map<UUID, Collection<VisorQuery>> map = new HashMap<>();
+    @Nullable @Override protected Map<UUID, Collection<VisorRunningQuery>> reduce0(List<ComputeJobResult> results) throws IgniteException {
+        Map<UUID, Collection<VisorRunningQuery>> map = new HashMap<>();
 
         for (ComputeJobResult res : results)
             if (res.getException() != null) {
-                Collection<VisorQuery> queries = res.getData();
+                Collection<VisorRunningQuery> queries = res.getData();
 
                 map.put(res.getNode().id(), queries);
             }
@@ -61,7 +61,7 @@ public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map
     /**
      * Job to collect currently running queries from node.
      */
-    private static class VisorCollectCurrentQueriesJob extends VisorJob<Long, Collection<VisorQuery>> {
+    private static class VisorCollectCurrentQueriesJob extends VisorJob<Long, Collection<VisorRunningQuery>> {
         /**
          * Create job with specified argument.
          *
@@ -73,13 +73,14 @@ public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map
         }
 
         /** {@inheritDoc} */
-        @Override protected Collection<VisorQuery> run(@Nullable Long duration) throws IgniteException {
+        @Override protected Collection<VisorRunningQuery> run(@Nullable Long duration) throws IgniteException {
             Collection<GridRunningQueryInfo> queries = ignite.context().query().runningQueries(duration);
 
-            Collection<VisorQuery> res = new ArrayList<>(queries.size());
+            Collection<VisorRunningQuery> res = new ArrayList<>(queries.size());
 
             for (GridRunningQueryInfo qry : queries)
-                res.add(new VisorQuery(qry.id(), qry.query(), qry.cache()));
+                res.add(new VisorRunningQuery(qry.id(), qry.query(), qry.queryType(), qry.cache(), qry.startTime(),
+                    qry.cancelable(), qry.local()));
 
             return res;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
deleted file mode 100644
index e9beff9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.visor.query;
-
-import java.io.Serializable;
-
-/**
- * Arguments for {@link VisorQueryTask}.
- */
-public class VisorQuery implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private Long id;
-
-    /** Query text. */
-    private String qry;
-
-    /** Cache name for query. */
-    private String cache;
-
-    /**
-     * @param id Query ID.
-     * @param qry Query text.
-     * @param cache Cache where query was executed.
-     */
-    public VisorQuery(Long id, String qry, String cache) {
-        this.id = id;
-        this.qry = qry;
-        this.cache = cache;
-    }
-
-    /**
-     * @return Query ID.
-     */
-    public Long id() {
-        return id;
-    }
-
-    /**
-     * @return Query txt.
-     */
-    public String query() {
-        return qry;
-    }
-
-    /**
-     * @return Cache name.
-     */
-    public String getCache() {
-        return cache;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java
new file mode 100644
index 0000000..5605ea2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.query;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+
+/**
+ * Descriptor of running query.
+ */
+public class VisorRunningQuery implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long id;
+
+    /** Query text. */
+    private String qry;
+
+    /** Query type. */
+    private GridCacheQueryType qryType;
+
+    /** Cache name for query. */
+    private String cache;
+
+    /** */
+    private long startTime;
+
+    /** */
+    private boolean cancellable;
+
+    /** */
+    private boolean loc;
+
+    /**
+     * @param id Query ID.
+     * @param qry Query text.
+     * @param qryType Query type.
+     * @param cache Cache where query was executed.
+     * @param startTime Query start time.
+     * @param cancellable {@code true} if query can be canceled.
+     * @param loc {@code true} if query is local.
+     */
+    public VisorRunningQuery(long id, String qry, GridCacheQueryType qryType, String cache, long startTime,
+        boolean cancellable, boolean loc) {
+        this.id = id;
+        this.qry = qry;
+        this.qryType = qryType;
+        this.cache = cache;
+        this.startTime = startTime;
+        this.cancellable = cancellable;
+        this.loc = loc;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public long id() {
+        return id;
+    }
+
+    /**
+     * @return Query txt.
+     */
+    public String query() {
+        return qry;
+    }
+
+    /**
+     * @return Query type.
+     */
+    public GridCacheQueryType queryType() {
+        return qryType;
+    }
+
+    /**
+     * @return Cache name.
+     */
+    public String getCache() {
+        return cache;
+    }
+
+    /**
+     * @return Query start time.
+     */
+    public long getStartTime() {
+        return startTime;
+    }
+
+    /**
+     * @return {@code true} if query can be cancelled.
+     */
+    public boolean isCancelable() {
+        return cancellable;
+    }
+
+    /**
+     * @return {@code true} if query is local.
+     */
+    public boolean isLocal() {
+        return loc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index c0f5f09..5be4f03 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -182,6 +182,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.IgniteSystemProperties.getString;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED;
@@ -782,8 +785,19 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         IndexingQueryFilter filters) throws IgniteCheckedException {
         TableDescriptor tbl = tableDescriptor(spaceName, type);
 
-        if (tbl != null && tbl.luceneIdx != null)
-            return tbl.luceneIdx.query(qry, filters);
+        if (tbl != null && tbl.luceneIdx != null) {
+            GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, spaceName,
+                U.currentTimeMillis(), null, true);
+
+            try {
+                runs.put(run.id(), run);
+
+                return tbl.luceneIdx.query(qry, filters);
+            }
+            finally {
+                runs.remove(run.id());
+            }
+        }
 
         return new GridEmptyCloseableIterator<>();
     }
@@ -841,7 +855,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                 GridH2QueryContext.set(ctx);
 
-                GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, spaceName, U.currentTimeMillis(), cancel);
+                GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL_FIELDS,
+                    spaceName, U.currentTimeMillis(), cancel, true);
 
                 runs.putIfAbsent(run.id(), run);
 
@@ -1103,7 +1118,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false));
 
-        GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), spaceName, qry, U.currentTimeMillis(), null);
+        GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, spaceName,
+            U.currentTimeMillis(), null, true);
 
         runs.put(run.id(), run);
 
@@ -2269,11 +2285,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** {@inheritDoc} */
     @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) {
-        return rdcQryExec.longRunningQueries(duration);
+        Collection<GridRunningQueryInfo> res = new ArrayList<>();
+
+        res.addAll(runs.values());
+        res.addAll(rdcQryExec.longRunningQueries(duration));
+
+        return res;
     }
 
     /** {@inheritDoc} */
     @Override public void cancelQueries(Set<Long> queries) {
+        for (Long qryId : queries) {
+            GridRunningQueryInfo run = runs.get(qryId);
+
+            if (run != null)
+                run.cancel();
+        }
+
         rdcQryExec.cancelQueries(queries);
     }
 
@@ -3191,32 +3219,4 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             lastUsage = U.currentTimeMillis();
         }
     }
-
-    /**
-     * Query run.
-     */
-    private static class QueryRun {
-        /** */
-        private final GridRunningQueryInfo qry;
-
-        /** */
-        private final long startTime;
-
-        /** */
-        private final GridQueryCancel cancel;
-
-        /**
-         *
-         * @param id
-         * @param qry
-         * @param cache
-         * @param startTime
-         * @param cancel
-         */
-        public QueryRun(Long id, String qry, String cache, long startTime, GridQueryCancel cancel) {
-            this.qry = new GridRunningQueryInfo(id, qry, cache, startTime, cancel);
-            this.startTime = startTime;
-            this.cancel = cancel;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index febe810..3540141 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -99,6 +99,7 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
 
 /**
@@ -1332,8 +1333,10 @@ public class GridReduceQueryExecutor {
      * @param queries Queries IDs to cancel.
      */
     public void cancelQueries(Set<Long> queries) {
-        for (QueryRun run : runs.values()) {
-            if (queries.contains(run.qry.id()))
+        for (Long qryId : queries) {
+            QueryRun run = runs.get(qryId);
+
+            if (run != null)
                 run.qry.cancel();
         }
     }
@@ -1371,7 +1374,7 @@ public class GridReduceQueryExecutor {
          * @param cancel Query cancel handler.
          */
         private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) {
-            this.qry = new GridRunningQueryInfo(id, qry, cache, startTime, cancel);
+            this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, cache, startTime, cancel, false);
             this.conn = (JdbcConnection)conn;
             this.idxs = new ArrayList<>(idxsCnt);
             this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;

http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
index e47e893..a91f65e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
@@ -17,15 +17,23 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import javax.cache.Cache;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -54,6 +62,7 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
 
         cc.setCopyOnRead(true);
         cc.setIndexedTypes(Integer.class, Value.class);
+        cc.setSqlFunctionClasses(TestSQLFunctions.class);
 
         cfg.setCacheConfiguration(cc);
 
@@ -195,6 +204,108 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
         check(cache);
     }
 
+    /**
+     * Run specified query in separate thread.
+     *
+     * @param qry Query to execute.
+     */
+    private IgniteInternalFuture<?> runQueryAsync(final Query<?> qry) throws Exception {
+        return multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    grid(0).cache(null).query(qry).getAll();
+                }
+                catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        }, 1, "run-query");
+    }
+
+    /**
+     * Test collecting info about running.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRunningQueries() throws Exception {
+        IgniteInternalFuture<?> fut = runQueryAsync(new SqlFieldsQuery("select _val, sleep(1000) from Value limit 3"));
+
+        Thread.sleep(500);
+
+        GridQueryProcessor qryProc = ((IgniteKernal)grid(0)).context().query();
+
+        Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(0);
+
+        assertEquals(1, queries.size());
+
+        fut.get();
+
+        queries = qryProc.runningQueries(0);
+
+        assertEquals(0, queries.size());
+
+        SqlFieldsQuery qry = new SqlFieldsQuery("select _val, sleep(1000) from Value limit 3");
+        qry.setLocal(true);
+
+        fut = runQueryAsync(qry);
+
+        Thread.sleep(500);
+
+        queries = qryProc.runningQueries(0);
+
+        assertEquals(1, queries.size());
+
+        fut.get();
+
+        queries = qryProc.runningQueries(0);
+
+        assertEquals(0, queries.size());
+    }
+
+    /**
+     * Test collecting info about running.
+     *
+     * @throws Exception If failed.
+     */
+    public void testCancelingQueries() throws Exception {
+        final Ignite ignite = grid(0);
+
+        runQueryAsync(new SqlFieldsQuery("select * from (select _val, sleep(100) from Value limit 50)"));
+
+        Thread.sleep(500);
+
+        final GridQueryProcessor qryProc = ((IgniteKernal)ignite).context().query();
+
+        Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(0);
+
+        assertEquals(1, queries.size());
+
+        final Collection<GridRunningQueryInfo> finalQueries = queries;
+
+        for (GridRunningQueryInfo query : finalQueries)
+            qryProc.cancelQueries(Collections.singleton(query.id()));
+
+        int n = 100;
+
+        // Give cluster some time to cancel query and cleanup resources.
+        while (n > 0) {
+            Thread.sleep(100);
+
+            queries = qryProc.runningQueries(0);
+
+            if (queries.isEmpty())
+                break;
+
+            log.info(">>>> Wait for cancel: " + n);
+
+            n--;
+        }
+
+        queries = qryProc.runningQueries(0);
+
+        assertEquals(0, queries.size());
+    }
+
     /** */
     private static class Value {
         /** */
@@ -223,4 +334,28 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest {
 
         assertEquals(KEYS, cnt);
     }
-}
\ No newline at end of file
+
+    /**
+     * Utility class with custom SQL functions.
+     */
+    public static class TestSQLFunctions {
+        /**
+         * Sleep function to simulate long running queries.
+         *
+         * @param x Time to sleep.
+         * @return Return specified argument.
+         */
+        @QuerySqlFunction
+        public static long sleep(long x) {
+            if (x >= 0)
+                try {
+                    Thread.sleep(x);
+                }
+                catch (InterruptedException ignored) {
+                    // No-op.
+                }
+
+            return x;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/effc624d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index d6a766d..337ae29 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -58,30 +58,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
     /** */
     private Ignite ignite;
 
-    /**
-     * Utility class with custom SQL functions.
-     */
-    public static class TestSQLFunctions {
-        /**
-         * Sleep function to simulate long running queries.
-         *
-         * @param x Time to sleep.
-         * @return Return specified argument.
-         */
-        @QuerySqlFunction
-        public static long sleep(long x) {
-            if (x >= 0)
-                try {
-                    Thread.sleep(x);
-                }
-                catch (InterruptedException ignored) {
-                    // No-op.
-                }
-
-            return x;
-        }
-    }
-
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration c = super.getConfiguration(gridName);
@@ -141,8 +117,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
         else
             throw new IllegalStateException("mode: " + mode);
 
-        cc.setSqlFunctionClasses(TestSQLFunctions.class);
-
         return cc;
     }
 
@@ -248,83 +222,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Test collecting info about running.
-     *
-     * @throws Exception If failed.
-     */
-    public void testRunningQueries() throws Exception {
-        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-            @Override public void run() {
-                try {
-                    SqlFieldsQuery qry = new SqlFieldsQuery("select productId, sleep(3000) from FactPurchase limit 1");
-
-                    ignite.cache("partitioned").query(qry).getAll();
-                }
-                catch (Throwable e) {
-                    e.printStackTrace();
-                }
-            }
-        }, 1);
-
-        Thread.sleep(1000);
-
-        GridQueryProcessor qryProc = ((IgniteKernal)ignite).context().query();
-
-        Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(500);
-
-        assertEquals(1, queries.size());
-
-        fut.get();
-
-        queries = qryProc.runningQueries(500);
-
-        assertEquals(0, queries.size());
-    }
-
-    /**
-     * Test collecting info about running.
-     *
-     * @throws Exception If failed.
-     */
-    public void testCancelingQueries() throws Exception {
-        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
-            @Override public void run() {
-                try {
-                    SqlFieldsQuery qry = new SqlFieldsQuery("select productId, sleep(500) from FactPurchase limit 100");
-
-                    ignite.cache("partitioned").query(qry).getAll();
-                }
-                catch (Throwable e) {
-                    e.printStackTrace();
-                }
-            }
-        }, 1);
-
-        Thread.sleep(1000);
-
-        GridQueryProcessor queryProc = ((IgniteKernal)ignite).context().query();
-
-        Collection<GridRunningQueryInfo> queries = queryProc.runningQueries(500);
-
-        assertEquals(1, queries.size());
-
-        for (GridRunningQueryInfo query : queries)
-            queryProc.cancelQueries(Collections.singleton(query.id()));
-
-        Thread.sleep(2000); // Give cluster some time to cancel query and cleanup resources.
-
-        queries = queryProc.runningQueries(500);
-
-        assertEquals(0, queries.size());
-
-        fut.get();
-
-        queries = queryProc.runningQueries(500);
-
-        assertEquals(0, queries.size());
-    }
-
-    /**
      * @throws Exception If failed.
      */
     public void testApiQueries() throws Exception {