You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/02/03 11:03:23 UTC

[3/3] ignite git commit: IGNITE-4436 WIP.

IGNITE-4436 WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7fec2f49
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7fec2f49
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7fec2f49

Branch: refs/heads/ignite-4436-2
Commit: 7fec2f49ae38326cb8d7d49703083614bd128a75
Parents: 40c9f50
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Fri Feb 3 18:02:02 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Feb 3 18:02:02 2017 +0700

----------------------------------------------------------------------
 .../internal/processors/query/GridQuery.java    | 66 -------------
 .../processors/query/GridQueryIndexing.java     |  4 +-
 .../processors/query/GridQueryProcessor.java    |  4 +-
 .../processors/query/GridRunningQueryInfo.java  | 98 ++++++++++++++++++++
 .../internal/visor/VisorMultiNodeTask.java      |  2 +-
 .../visor/query/VisorCancelQueriesTask.java     | 17 ++--
 .../query/VisorCollectCurrentQueriesTask.java   |  6 +-
 .../ignite/internal/visor/query/VisorQuery.java |  7 +-
 .../cache/query/GridCacheTwoStepQuery.java      | 18 +++-
 .../processors/query/h2/IgniteH2Indexing.java   | 78 +++++++++++++---
 .../query/h2/sql/GridSqlQuerySplitter.java      |  4 +-
 .../h2/twostep/GridReduceQueryExecutor.java     | 38 +++-----
 .../cache/GridCacheCrossCacheQuerySelfTest.java | 10 +-
 13 files changed, 220 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQuery.java
deleted file mode 100644
index ff7c9da..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQuery.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query;
-
-import java.util.UUID;
-
-/**
- * Query descriptor.
- */
-public class GridQuery {
-    /** */
-    private UUID id;
-
-    /** */
-    private String qry;
-
-    /** */
-    private String cache;
-
-    /**
-     * @param id Query ID.
-     * @param qry Query text.
-     * @param cache Cache where query was executed.
-     */
-    public GridQuery(UUID id, String qry, String cache) {
-        this.id = id;
-        this.qry = qry;
-        this.cache = cache;
-    }
-
-    /**
-     * @return Id.
-     */
-    public UUID id() {
-        return id;
-    }
-
-    /**
-     * @return Query.
-     */
-    public String query() {
-        return qry;
-    }
-
-    /**
-     * @return Cache.
-     */
-    public String cache() {
-        return cache;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index e368063..323038b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -246,14 +246,14 @@ public interface GridQueryIndexing {
      * @param duration Duration to check.
      * @return Collection of long running queries.
      */
-    public Collection<GridQuery> runningQueries(long duration);
+    public Collection<GridRunningQueryInfo> runningQueries(long duration);
 
     /**
      * Cancel specified queries.
      *
      * @param queries Queries ID's to cancel.
      */
-    public void cancelQueries(Set<UUID> queries);
+    public void cancelQueries(Set<Long> queries);
 
     /**
      * Cancels all executing queries.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 1e5c5d8..c14a8a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -942,7 +942,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param duration Duration to check.
      * @return Collection of long running queries.
      */
-    public Collection<GridQuery> runningQueries(long duration) {
+    public Collection<GridRunningQueryInfo> runningQueries(long duration) {
         if (moduleEnabled())
             return idx.runningQueries(duration);
 
@@ -954,7 +954,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      *
      * @param queries Queries ID's to cancel.
      */
-    public void cancelQueries(Set<UUID> queries) {
+    public void cancelQueries(Set<Long> queries) {
         if (moduleEnabled())
             idx.cancelQueries(queries);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
new file mode 100644
index 0000000..ea37d15
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+/**
+ * Query descriptor.
+ */
+public class GridRunningQueryInfo {
+    /** */
+    private final long id;
+
+    /** */
+    private final String qry;
+
+    /** */
+    private final String cache;
+
+    /** */
+    private final long startTime;
+
+    /** */
+    private final GridQueryCancel cancel;
+
+    /**
+     * @param id Query ID.
+     * @param qry Query text.
+     * @param cache Cache where query was executed.
+     * @param startTime Query start time.
+     * @param cancel Query cancel.
+     */
+    public GridRunningQueryInfo(Long id, String qry, String cache, long startTime, GridQueryCancel cancel) {
+        this.id = id;
+        this.qry = qry;
+        this.cache = cache;
+        this.startTime = startTime;
+        this.cancel = cancel;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public Long id() {
+        return id;
+    }
+
+    /**
+     * @return Query text.
+     */
+    public String query() {
+        return qry;
+    }
+
+    /**
+     * @return Cache where query was executed.
+     */
+    public String cache() {
+        return cache;
+    }
+
+    /**
+     * @return Query start time.
+     */
+    public long startTime() {
+        return startTime;
+    }
+
+    /**
+     * @param curTime Current time.
+     * @param duration Duration of long query.
+     * @return {@code true} if this query should be considered as long running query.
+     */
+    public boolean longQuery(long curTime, long duration) {
+        return curTime - startTime > duration;
+    }
+
+    /**
+     * Cancel query.
+     */
+    public void cancel() {
+        if (cancel != null)
+            cancel.cancel();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
index 57f1346..ece1a17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
@@ -130,4 +130,4 @@ public abstract class VisorMultiNodeTask<A, R, J> implements ComputeTask<VisorTa
                 logFinish(ignite.log(), getClass(), start);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
index 88d7eec..b40a082 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
@@ -18,11 +18,13 @@
 package org.apache.ignite.internal.visor.query;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.VisorMultiNodeTask;
 import org.jetbrains.annotations.Nullable;
@@ -31,12 +33,12 @@ import org.jetbrains.annotations.Nullable;
  * Task to cancel queries.
  */
 @GridInternal
-public class VisorCancelQueriesTask extends VisorMultiNodeTask<Set<UUID>, Void, Void> {
+public class VisorCancelQueriesTask extends VisorMultiNodeTask<Map<UUID, Set<Long>>, Void, Void> {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** {@inheritDoc} */
-    @Override protected VisorCancelQueriesJob job(Set<UUID> arg) {
+    @Override protected VisorCancelQueriesJob job(Map<UUID, Set<Long>> arg) {
         return new VisorCancelQueriesJob(arg, debug);
     }
 
@@ -48,20 +50,23 @@ public class VisorCancelQueriesTask extends VisorMultiNodeTask<Set<UUID>, Void,
     /**
      * Job to cancel queries on node.
      */
-    private static class VisorCancelQueriesJob extends VisorJob<Set<UUID>, Void> {
+    private static class VisorCancelQueriesJob extends VisorJob<Map<UUID, Set<Long>>, Void> {
         /**
          * Create job with specified argument.
          *
          * @param arg Job argument.
          * @param debug Flag indicating whether debug information should be printed into node log.
          */
-        protected VisorCancelQueriesJob(@Nullable Set<UUID> arg, boolean debug) {
+        protected VisorCancelQueriesJob(@Nullable Map<UUID, Set<Long>> arg, boolean debug) {
             super(arg, debug);
         }
 
         /** {@inheritDoc} */
-        @Override protected Void run(@Nullable Set<UUID> queries) throws IgniteException {
-            ignite.context().query().cancelQueries(queries);
+        @Override protected Void run(@Nullable Map<UUID, Set<Long>> arg) throws IgniteException {
+            Set<Long> queries = arg.get(ignite.localNode().id());
+
+            if (!F.isEmpty(queries))
+                ignite.context().query().cancelQueries(queries);
 
             return null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
index 1638da3..0dc0ec5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.processors.query.GridQuery;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.VisorMultiNodeTask;
@@ -74,11 +74,11 @@ public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map
 
         /** {@inheritDoc} */
         @Override protected Collection<VisorQuery> run(@Nullable Long duration) throws IgniteException {
-            Collection<GridQuery> queries = ignite.context().query().runningQueries(duration);
+            Collection<GridRunningQueryInfo> queries = ignite.context().query().runningQueries(duration);
 
             Collection<VisorQuery> res = new ArrayList<>(queries.size());
 
-            for (GridQuery qry : queries)
+            for (GridRunningQueryInfo qry : queries)
                 res.add(new VisorQuery(qry.id(), qry.query(), qry.cache()));
 
             return res;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
index 518091c..e9beff9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.visor.query;
 
 import java.io.Serializable;
-import java.util.UUID;
 
 /**
  * Arguments for {@link VisorQueryTask}.
@@ -28,7 +27,7 @@ public class VisorQuery implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
-    private UUID id;
+    private Long id;
 
     /** Query text. */
     private String qry;
@@ -41,7 +40,7 @@ public class VisorQuery implements Serializable {
      * @param qry Query text.
      * @param cache Cache where query was executed.
      */
-    public VisorQuery(UUID id, String qry, String cache) {
+    public VisorQuery(Long id, String qry, String cache) {
         this.id = id;
         this.qry = qry;
         this.cache = cache;
@@ -50,7 +49,7 @@ public class VisorQuery implements Serializable {
     /**
      * @return Query ID.
      */
-    public UUID id() {
+    public Long id() {
         return id;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 8dcba2f..f53936f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -46,6 +46,9 @@ public class GridCacheTwoStepQuery {
     private boolean explain;
 
     /** */
+    private String originalSql;
+
+    /** */
     private Collection<String> spaces;
 
     /** */
@@ -67,10 +70,12 @@ public class GridCacheTwoStepQuery {
     private List<Integer> extraCaches;
 
     /**
+     * @param originalSql Original query SQL.
      * @param schemas Schema names in query.
      * @param tbls Tables in query.
      */
-    public GridCacheTwoStepQuery(Set<String> schemas, Set<String> tbls) {
+    public GridCacheTwoStepQuery(String originalSql, Set<String> schemas, Set<String> tbls) {
+        this.originalSql = originalSql;
         this.schemas = schemas;
         this.tbls = tbls;
     }
@@ -196,6 +201,13 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
+     * @return Original query SQL.
+     */
+    public String originalSql() {
+        return originalSql;
+    }
+
+    /**
      * @return Spaces.
      */
     public Collection<String> spaces() {
@@ -223,7 +235,7 @@ public class GridCacheTwoStepQuery {
     public GridCacheTwoStepQuery copy(Object[] args) {
         assert !explain;
 
-        GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(schemas, tbls);
+        GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls);
 
         cp.caches = caches;
         cp.extraCaches = extraCaches;
@@ -250,4 +262,4 @@ public class GridCacheTwoStepQuery {
     @Override public String toString() {
         return S.toString(GridCacheTwoStepQuery.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index cc281cf..aad524f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -53,6 +53,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.Cache;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -82,7 +83,7 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.query.GridQuery;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -286,9 +287,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private final Map<String, String> space2schema = new ConcurrentHashMap8<>();
 
     /** */
+    private AtomicLong qryIdGen;
+
+    /** */
     private GridSpinBusyLock busyLock;
 
     /** */
+    private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap8<>();
+
+    /** */
     private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() {
         @Nullable @Override public ConnectionWrapper get() {
             ConnectionWrapper c = super.get();
@@ -832,6 +839,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                 GridH2QueryContext.set(ctx);
 
+                GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, spaceName, U.currentTimeMillis(), cancel);
+
+                runs.putIfAbsent(run.id(), run);
+
                 try {
                     ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel);
 
@@ -839,6 +850,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 }
                 finally {
                     GridH2QueryContext.clearThreadLocal();
+
+                    runs.remove(run.id());
                 }
             }
         };
@@ -1088,6 +1101,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false));
 
+        GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), spaceName, qry, U.currentTimeMillis(), null);
+
+        runs.put(run.id(), run);
+
         try {
             ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null);
 
@@ -1095,6 +1112,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
         finally {
             GridH2QueryContext.clearThreadLocal();
+
+            runs.remove(run.id());
         }
     }
 
@@ -1735,6 +1754,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         this.busyLock = busyLock;
 
+        qryIdGen = new AtomicLong();
+
         if (SysProperties.serializeJavaObject) {
             U.warn(log, "Serialization of Java objects in H2 was enabled.");
 
@@ -1785,7 +1806,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             marshaller = ctx.config().getMarshaller();
 
             mapQryExec = new GridMapQueryExecutor(busyLock);
-            rdcQryExec = new GridReduceQueryExecutor(busyLock);
+            rdcQryExec = new GridReduceQueryExecutor(qryIdGen, busyLock);
 
             mapQryExec.start(ctx, this);
             rdcQryExec.start(ctx, this);
@@ -2239,6 +2260,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         return cols;
     }
 
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) {
+        return rdcQryExec.longRunningQueries(duration);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancelQueries(Set<Long> queries) {
+        rdcQryExec.cancelQueries(queries);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancelAllQueries() {
+        for (Connection conn : conns)
+            U.close(conn, log);
+    }
+
     /**
      * Wrapper to store connection and flag is schema set or not.
      */
@@ -3148,19 +3186,31 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public Collection<GridQuery> runningQueries(long duration) {
-        return rdcQryExec.longRunningQueries(duration);
-    }
+    /**
+     * Query run.
+     */
+    private static class QueryRun {
+        /** */
+        private final GridRunningQueryInfo qry;
 
-    /** {@inheritDoc} */
-    @Override public void cancelQueries(Set<UUID> queries) {
-        rdcQryExec.cancelQueries(queries);
-    }
+        /** */
+        private final long startTime;
 
-    /** {@inheritDoc} */
-    @Override public void cancelAllQueries() {
-        for (Connection conn : conns)
-            U.close(conn, log);
+        /** */
+        private final GridQueryCancel cancel;
+
+        /**
+         *
+         * @param id
+         * @param qry
+         * @param cache
+         * @param startTime
+         * @param cancel
+         */
+        public QueryRun(Long id, String qry, String cache, long startTime, GridQueryCancel cancel) {
+            this.qry = new GridRunningQueryInfo(id, qry, cache, startTime, cancel);
+            this.startTime = startTime;
+            this.cancel = cancel;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 09952cf..e164315 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -174,7 +174,7 @@ public class GridSqlQuerySplitter {
         qry = collectAllTables(qry, schemas, tbls);
 
         // Build resulting two step query.
-        GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(schemas, tbls);
+        GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(qry.getSQL(), schemas, tbls);
 
         // Map query will be direct reference to the original query AST.
         // Thus all the modifications will be performed on the original AST, so we should be careful when
@@ -958,4 +958,4 @@ public class GridSqlQuerySplitter {
     private static GridSqlFunction function(GridSqlFunctionType type) {
         return new GridSqlFunction(type);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 39c494d..6f96b8d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -62,7 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
-import org.apache.ignite.internal.processors.query.GridQuery;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
@@ -121,7 +121,7 @@ public class GridReduceQueryExecutor {
     private IgniteLogger log;
 
     /** */
-    private final AtomicLong reqIdGen = new AtomicLong();
+    private final AtomicLong qryIdGen;
 
     /** */
     private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();
@@ -168,9 +168,11 @@ public class GridReduceQueryExecutor {
     };
 
     /**
+     * @param qryIdGen Query ID generator.
      * @param busyLock Busy lock.
      */
-    public GridReduceQueryExecutor(GridSpinBusyLock busyLock) {
+    public GridReduceQueryExecutor(AtomicLong qryIdGen, GridSpinBusyLock busyLock) {
+        this.qryIdGen = qryIdGen;
         this.busyLock = busyLock;
     }
 
@@ -494,13 +496,11 @@ public class GridReduceQueryExecutor {
                 }
             }
 
-            final long qryReqId = reqIdGen.incrementAndGet();
+            final long qryReqId = qryIdGen.incrementAndGet();
 
             final String space = cctx.name();
 
-            final QueryRun r = new QueryRun(UUID.randomUUID(),
-                F.isEmpty(qry.mapQueries()) ? "" : qry.mapQueries().get(0).query(),
-                F.first(qry.schemas()),
+            final QueryRun r = new QueryRun(qryReqId, qry.originalSql(), space,
                 h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize(),
                 System.currentTimeMillis(), cancel);
 
@@ -1313,13 +1313,13 @@ public class GridReduceQueryExecutor {
      * @param duration Duration to check.
      * @return Collection of IDs and statements of long running queries.
      */
-    public Collection<GridQuery> longRunningQueries(long duration) {
-        Collection<GridQuery> res = new ArrayList<>();
+    public Collection<GridRunningQueryInfo> longRunningQueries(long duration) {
+        Collection<GridRunningQueryInfo> res = new ArrayList<>();
 
         long curTime = U.currentTimeMillis();
 
         for (QueryRun run : runs.values()) {
-            if (curTime - run.startTime > duration)
+            if (run.qry.longQuery(curTime, duration))
                 res.add(run.qry);
         }
 
@@ -1331,10 +1331,10 @@ public class GridReduceQueryExecutor {
      *
      * @param queries Queries IDs to cancel.
      */
-    public void cancelQueries(Set<UUID> queries) {
+    public void cancelQueries(Set<Long> queries) {
         for (QueryRun run : runs.values()) {
             if (queries.contains(run.qry.id()))
-                run.cancel.cancel();
+                run.qry.cancel();
         }
     }
 
@@ -1343,7 +1343,7 @@ public class GridReduceQueryExecutor {
      */
     private static class QueryRun {
         /** */
-        private final GridQuery qry;
+        private final GridRunningQueryInfo qry;
 
         /** */
         private final List<GridMergeIndex> idxs;
@@ -1357,12 +1357,6 @@ public class GridReduceQueryExecutor {
         /** */
         private final int pageSize;
 
-        /** */
-        private final long startTime;
-
-        /** */
-        private final GridQueryCancel cancel;
-
         /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
         private final AtomicReference<Object> state = new AtomicReference<>();
 
@@ -1376,13 +1370,11 @@ public class GridReduceQueryExecutor {
          * @param startTime Start time.
          * @param cancel Query cancel handler.
          */
-        private QueryRun(UUID id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) {
-            this.qry = new GridQuery(id, qry, cache);
+        private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) {
+            this.qry = new GridRunningQueryInfo(id, qry, cache, startTime, cancel);
             this.conn = (JdbcConnection)conn;
             this.idxs = new ArrayList<>(idxsCnt);
             this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
-            this.startTime = startTime;
-            this.cancel = cancel;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 98376d7..d6a766d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -30,7 +30,6 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
@@ -38,8 +37,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.query.GridQuery;
-import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -272,7 +270,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
 
         GridQueryProcessor qryProc = ((IgniteKernal)ignite).context().query();
 
-        Collection<GridQuery> queries = qryProc.runningQueries(500);
+        Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(500);
 
         assertEquals(1, queries.size());
 
@@ -306,11 +304,11 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
 
         GridQueryProcessor queryProc = ((IgniteKernal)ignite).context().query();
 
-        Collection<GridQuery> queries = queryProc.runningQueries(500);
+        Collection<GridRunningQueryInfo> queries = queryProc.runningQueries(500);
 
         assertEquals(1, queries.size());
 
-        for (GridQuery query : queries)
+        for (GridRunningQueryInfo query : queries)
             queryProc.cancelQueries(Collections.singleton(query.id()));
 
         Thread.sleep(2000); // Give cluster some time to cancel query and cleanup resources.