You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/02/03 11:03:23 UTC
[3/3] ignite git commit: IGNITE-4436 WIP.
IGNITE-4436 WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7fec2f49
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7fec2f49
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7fec2f49
Branch: refs/heads/ignite-4436-2
Commit: 7fec2f49ae38326cb8d7d49703083614bd128a75
Parents: 40c9f50
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Fri Feb 3 18:02:02 2017 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Feb 3 18:02:02 2017 +0700
----------------------------------------------------------------------
.../internal/processors/query/GridQuery.java | 66 -------------
.../processors/query/GridQueryIndexing.java | 4 +-
.../processors/query/GridQueryProcessor.java | 4 +-
.../processors/query/GridRunningQueryInfo.java | 98 ++++++++++++++++++++
.../internal/visor/VisorMultiNodeTask.java | 2 +-
.../visor/query/VisorCancelQueriesTask.java | 17 ++--
.../query/VisorCollectCurrentQueriesTask.java | 6 +-
.../ignite/internal/visor/query/VisorQuery.java | 7 +-
.../cache/query/GridCacheTwoStepQuery.java | 18 +++-
.../processors/query/h2/IgniteH2Indexing.java | 78 +++++++++++++---
.../query/h2/sql/GridSqlQuerySplitter.java | 4 +-
.../h2/twostep/GridReduceQueryExecutor.java | 38 +++-----
.../cache/GridCacheCrossCacheQuerySelfTest.java | 10 +-
13 files changed, 220 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQuery.java
deleted file mode 100644
index ff7c9da..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQuery.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query;
-
-import java.util.UUID;
-
-/**
- * Query descriptor.
- */
-public class GridQuery {
- /** */
- private UUID id;
-
- /** */
- private String qry;
-
- /** */
- private String cache;
-
- /**
- * @param id Query ID.
- * @param qry Query text.
- * @param cache Cache where query was executed.
- */
- public GridQuery(UUID id, String qry, String cache) {
- this.id = id;
- this.qry = qry;
- this.cache = cache;
- }
-
- /**
- * @return Id.
- */
- public UUID id() {
- return id;
- }
-
- /**
- * @return Query.
- */
- public String query() {
- return qry;
- }
-
- /**
- * @return Cache.
- */
- public String cache() {
- return cache;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index e368063..323038b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -246,14 +246,14 @@ public interface GridQueryIndexing {
* @param duration Duration to check.
* @return Collection of long running queries.
*/
- public Collection<GridQuery> runningQueries(long duration);
+ public Collection<GridRunningQueryInfo> runningQueries(long duration);
/**
* Cancel specified queries.
*
* @param queries Queries ID's to cancel.
*/
- public void cancelQueries(Set<UUID> queries);
+ public void cancelQueries(Set<Long> queries);
/**
* Cancels all executing queries.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 1e5c5d8..c14a8a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -942,7 +942,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @param duration Duration to check.
* @return Collection of long running queries.
*/
- public Collection<GridQuery> runningQueries(long duration) {
+ public Collection<GridRunningQueryInfo> runningQueries(long duration) {
if (moduleEnabled())
return idx.runningQueries(duration);
@@ -954,7 +954,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
*
* @param queries Queries ID's to cancel.
*/
- public void cancelQueries(Set<UUID> queries) {
+ public void cancelQueries(Set<Long> queries) {
if (moduleEnabled())
idx.cancelQueries(queries);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
new file mode 100644
index 0000000..ea37d15
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+/**
+ * Query descriptor.
+ */
+public class GridRunningQueryInfo {
+ /** */
+ private final long id;
+
+ /** */
+ private final String qry;
+
+ /** */
+ private final String cache;
+
+ /** */
+ private final long startTime;
+
+ /** */
+ private final GridQueryCancel cancel;
+
+ /**
+ * @param id Query ID.
+ * @param qry Query text.
+ * @param cache Cache where query was executed.
+ * @param startTime Query start time.
+ * @param cancel Query cancel.
+ */
+ public GridRunningQueryInfo(Long id, String qry, String cache, long startTime, GridQueryCancel cancel) {
+ this.id = id;
+ this.qry = qry;
+ this.cache = cache;
+ this.startTime = startTime;
+ this.cancel = cancel;
+ }
+
+ /**
+ * @return Query ID.
+ */
+ public Long id() {
+ return id;
+ }
+
+ /**
+ * @return Query text.
+ */
+ public String query() {
+ return qry;
+ }
+
+ /**
+ * @return Cache where query was executed.
+ */
+ public String cache() {
+ return cache;
+ }
+
+ /**
+ * @return Query start time.
+ */
+ public long startTime() {
+ return startTime;
+ }
+
+ /**
+ * @param curTime Current time.
+ * @param duration Duration of long query.
+ * @return {@code true} if this query should be considered as long running query.
+ */
+ public boolean longQuery(long curTime, long duration) {
+ return curTime - startTime > duration;
+ }
+
+ /**
+ * Cancel query.
+ */
+ public void cancel() {
+ if (cancel != null)
+ cancel.cancel();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
index 57f1346..ece1a17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java
@@ -130,4 +130,4 @@ public abstract class VisorMultiNodeTask<A, R, J> implements ComputeTask<VisorTa
logFinish(ignite.log(), getClass(), start);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
index 88d7eec..b40a082 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCancelQueriesTask.java
@@ -18,11 +18,13 @@
package org.apache.ignite.internal.visor.query;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteException;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorMultiNodeTask;
import org.jetbrains.annotations.Nullable;
@@ -31,12 +33,12 @@ import org.jetbrains.annotations.Nullable;
* Task to cancel queries.
*/
@GridInternal
-public class VisorCancelQueriesTask extends VisorMultiNodeTask<Set<UUID>, Void, Void> {
+public class VisorCancelQueriesTask extends VisorMultiNodeTask<Map<UUID, Set<Long>>, Void, Void> {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
- @Override protected VisorCancelQueriesJob job(Set<UUID> arg) {
+ @Override protected VisorCancelQueriesJob job(Map<UUID, Set<Long>> arg) {
return new VisorCancelQueriesJob(arg, debug);
}
@@ -48,20 +50,23 @@ public class VisorCancelQueriesTask extends VisorMultiNodeTask<Set<UUID>, Void,
/**
* Job to cancel queries on node.
*/
- private static class VisorCancelQueriesJob extends VisorJob<Set<UUID>, Void> {
+ private static class VisorCancelQueriesJob extends VisorJob<Map<UUID, Set<Long>>, Void> {
/**
* Create job with specified argument.
*
* @param arg Job argument.
* @param debug Flag indicating whether debug information should be printed into node log.
*/
- protected VisorCancelQueriesJob(@Nullable Set<UUID> arg, boolean debug) {
+ protected VisorCancelQueriesJob(@Nullable Map<UUID, Set<Long>> arg, boolean debug) {
super(arg, debug);
}
/** {@inheritDoc} */
- @Override protected Void run(@Nullable Set<UUID> queries) throws IgniteException {
- ignite.context().query().cancelQueries(queries);
+ @Override protected Void run(@Nullable Map<UUID, Set<Long>> arg) throws IgniteException {
+ Set<Long> queries = arg.get(ignite.localNode().id());
+
+ if (!F.isEmpty(queries))
+ ignite.context().query().cancelQueries(queries);
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
index 1638da3..0dc0ec5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectCurrentQueriesTask.java
@@ -25,7 +25,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteException;
import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.processors.query.GridQuery;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorMultiNodeTask;
@@ -74,11 +74,11 @@ public class VisorCollectCurrentQueriesTask extends VisorMultiNodeTask<Long, Map
/** {@inheritDoc} */
@Override protected Collection<VisorQuery> run(@Nullable Long duration) throws IgniteException {
- Collection<GridQuery> queries = ignite.context().query().runningQueries(duration);
+ Collection<GridRunningQueryInfo> queries = ignite.context().query().runningQueries(duration);
Collection<VisorQuery> res = new ArrayList<>(queries.size());
- for (GridQuery qry : queries)
+ for (GridRunningQueryInfo qry : queries)
res.add(new VisorQuery(qry.id(), qry.query(), qry.cache()));
return res;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
index 518091c..e9beff9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQuery.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.visor.query;
import java.io.Serializable;
-import java.util.UUID;
/**
* Arguments for {@link VisorQueryTask}.
@@ -28,7 +27,7 @@ public class VisorQuery implements Serializable {
private static final long serialVersionUID = 0L;
/** */
- private UUID id;
+ private Long id;
/** Query text. */
private String qry;
@@ -41,7 +40,7 @@ public class VisorQuery implements Serializable {
* @param qry Query text.
* @param cache Cache where query was executed.
*/
- public VisorQuery(UUID id, String qry, String cache) {
+ public VisorQuery(Long id, String qry, String cache) {
this.id = id;
this.qry = qry;
this.cache = cache;
@@ -50,7 +49,7 @@ public class VisorQuery implements Serializable {
/**
* @return Query ID.
*/
- public UUID id() {
+ public Long id() {
return id;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 8dcba2f..f53936f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -46,6 +46,9 @@ public class GridCacheTwoStepQuery {
private boolean explain;
/** */
+ private String originalSql;
+
+ /** */
private Collection<String> spaces;
/** */
@@ -67,10 +70,12 @@ public class GridCacheTwoStepQuery {
private List<Integer> extraCaches;
/**
+ * @param originalSql Original query SQL.
* @param schemas Schema names in query.
* @param tbls Tables in query.
*/
- public GridCacheTwoStepQuery(Set<String> schemas, Set<String> tbls) {
+ public GridCacheTwoStepQuery(String originalSql, Set<String> schemas, Set<String> tbls) {
+ this.originalSql = originalSql;
this.schemas = schemas;
this.tbls = tbls;
}
@@ -196,6 +201,13 @@ public class GridCacheTwoStepQuery {
}
/**
+ * @return Original query SQL.
+ */
+ public String originalSql() {
+ return originalSql;
+ }
+
+ /**
* @return Spaces.
*/
public Collection<String> spaces() {
@@ -223,7 +235,7 @@ public class GridCacheTwoStepQuery {
public GridCacheTwoStepQuery copy(Object[] args) {
assert !explain;
- GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(schemas, tbls);
+ GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls);
cp.caches = caches;
cp.extraCaches = extraCaches;
@@ -250,4 +262,4 @@ public class GridCacheTwoStepQuery {
@Override public String toString() {
return S.toString(GridCacheTwoStepQuery.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index cc281cf..aad524f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -53,6 +53,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.cache.Cache;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
@@ -82,7 +83,7 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.query.GridQuery;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -286,9 +287,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private final Map<String, String> space2schema = new ConcurrentHashMap8<>();
/** */
+ private AtomicLong qryIdGen;
+
+ /** */
private GridSpinBusyLock busyLock;
/** */
+ private final ConcurrentMap<Long, GridRunningQueryInfo> runs = new ConcurrentHashMap8<>();
+
+ /** */
private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() {
@Nullable @Override public ConnectionWrapper get() {
ConnectionWrapper c = super.get();
@@ -832,6 +839,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridH2QueryContext.set(ctx);
+ GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, spaceName, U.currentTimeMillis(), cancel);
+
+ runs.putIfAbsent(run.id(), run);
+
try {
ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel);
@@ -839,6 +850,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
finally {
GridH2QueryContext.clearThreadLocal();
+
+ runs.remove(run.id());
}
}
};
@@ -1088,6 +1101,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false));
+ GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), spaceName, qry, U.currentTimeMillis(), null);
+
+ runs.put(run.id(), run);
+
try {
ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null);
@@ -1095,6 +1112,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
finally {
GridH2QueryContext.clearThreadLocal();
+
+ runs.remove(run.id());
}
}
@@ -1735,6 +1754,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
this.busyLock = busyLock;
+ qryIdGen = new AtomicLong();
+
if (SysProperties.serializeJavaObject) {
U.warn(log, "Serialization of Java objects in H2 was enabled.");
@@ -1785,7 +1806,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
marshaller = ctx.config().getMarshaller();
mapQryExec = new GridMapQueryExecutor(busyLock);
- rdcQryExec = new GridReduceQueryExecutor(busyLock);
+ rdcQryExec = new GridReduceQueryExecutor(qryIdGen, busyLock);
mapQryExec.start(ctx, this);
rdcQryExec.start(ctx, this);
@@ -2239,6 +2260,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return cols;
}
+
+ /** {@inheritDoc} */
+ @Override public Collection<GridRunningQueryInfo> runningQueries(long duration) {
+ return rdcQryExec.longRunningQueries(duration);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancelQueries(Set<Long> queries) {
+ rdcQryExec.cancelQueries(queries);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancelAllQueries() {
+ for (Connection conn : conns)
+ U.close(conn, log);
+ }
+
/**
* Wrapper to store connection and flag is schema set or not.
*/
@@ -3148,19 +3186,31 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- /** {@inheritDoc} */
- @Override public Collection<GridQuery> runningQueries(long duration) {
- return rdcQryExec.longRunningQueries(duration);
- }
+ /**
+ * Query run.
+ */
+ private static class QueryRun {
+ /** */
+ private final GridRunningQueryInfo qry;
- /** {@inheritDoc} */
- @Override public void cancelQueries(Set<UUID> queries) {
- rdcQryExec.cancelQueries(queries);
- }
+ /** */
+ private final long startTime;
- /** {@inheritDoc} */
- @Override public void cancelAllQueries() {
- for (Connection conn : conns)
- U.close(conn, log);
+ /** */
+ private final GridQueryCancel cancel;
+
+ /**
+ *
+ * @param id
+ * @param qry
+ * @param cache
+ * @param startTime
+ * @param cancel
+ */
+ public QueryRun(Long id, String qry, String cache, long startTime, GridQueryCancel cancel) {
+ this.qry = new GridRunningQueryInfo(id, qry, cache, startTime, cancel);
+ this.startTime = startTime;
+ this.cancel = cancel;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 09952cf..e164315 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -174,7 +174,7 @@ public class GridSqlQuerySplitter {
qry = collectAllTables(qry, schemas, tbls);
// Build resulting two step query.
- GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(schemas, tbls);
+ GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(qry.getSQL(), schemas, tbls);
// Map query will be direct reference to the original query AST.
// Thus all the modifications will be performed on the original AST, so we should be careful when
@@ -958,4 +958,4 @@ public class GridSqlQuerySplitter {
private static GridSqlFunction function(GridSqlFunctionType type) {
return new GridSqlFunction(type);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 39c494d..6f96b8d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -62,7 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
-import org.apache.ignite.internal.processors.query.GridQuery;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
@@ -121,7 +121,7 @@ public class GridReduceQueryExecutor {
private IgniteLogger log;
/** */
- private final AtomicLong reqIdGen = new AtomicLong();
+ private final AtomicLong qryIdGen;
/** */
private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();
@@ -168,9 +168,11 @@ public class GridReduceQueryExecutor {
};
/**
+ * @param qryIdGen Query ID generator.
* @param busyLock Busy lock.
*/
- public GridReduceQueryExecutor(GridSpinBusyLock busyLock) {
+ public GridReduceQueryExecutor(AtomicLong qryIdGen, GridSpinBusyLock busyLock) {
+ this.qryIdGen = qryIdGen;
this.busyLock = busyLock;
}
@@ -494,13 +496,11 @@ public class GridReduceQueryExecutor {
}
}
- final long qryReqId = reqIdGen.incrementAndGet();
+ final long qryReqId = qryIdGen.incrementAndGet();
final String space = cctx.name();
- final QueryRun r = new QueryRun(UUID.randomUUID(),
- F.isEmpty(qry.mapQueries()) ? "" : qry.mapQueries().get(0).query(),
- F.first(qry.schemas()),
+ final QueryRun r = new QueryRun(qryReqId, qry.originalSql(), space,
h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize(),
System.currentTimeMillis(), cancel);
@@ -1313,13 +1313,13 @@ public class GridReduceQueryExecutor {
* @param duration Duration to check.
* @return Collection of IDs and statements of long running queries.
*/
- public Collection<GridQuery> longRunningQueries(long duration) {
- Collection<GridQuery> res = new ArrayList<>();
+ public Collection<GridRunningQueryInfo> longRunningQueries(long duration) {
+ Collection<GridRunningQueryInfo> res = new ArrayList<>();
long curTime = U.currentTimeMillis();
for (QueryRun run : runs.values()) {
- if (curTime - run.startTime > duration)
+ if (run.qry.longQuery(curTime, duration))
res.add(run.qry);
}
@@ -1331,10 +1331,10 @@ public class GridReduceQueryExecutor {
*
* @param queries Queries IDs to cancel.
*/
- public void cancelQueries(Set<UUID> queries) {
+ public void cancelQueries(Set<Long> queries) {
for (QueryRun run : runs.values()) {
if (queries.contains(run.qry.id()))
- run.cancel.cancel();
+ run.qry.cancel();
}
}
@@ -1343,7 +1343,7 @@ public class GridReduceQueryExecutor {
*/
private static class QueryRun {
/** */
- private final GridQuery qry;
+ private final GridRunningQueryInfo qry;
/** */
private final List<GridMergeIndex> idxs;
@@ -1357,12 +1357,6 @@ public class GridReduceQueryExecutor {
/** */
private final int pageSize;
- /** */
- private final long startTime;
-
- /** */
- private final GridQueryCancel cancel;
-
/** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
private final AtomicReference<Object> state = new AtomicReference<>();
@@ -1376,13 +1370,11 @@ public class GridReduceQueryExecutor {
* @param startTime Start time.
* @param cancel Query cancel handler.
*/
- private QueryRun(UUID id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) {
- this.qry = new GridQuery(id, qry, cache);
+ private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) {
+ this.qry = new GridRunningQueryInfo(id, qry, cache, startTime, cancel);
this.conn = (JdbcConnection)conn;
this.idxs = new ArrayList<>(idxsCnt);
this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
- this.startTime = startTime;
- this.cancel = cancel;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fec2f49/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 98376d7..d6a766d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -30,7 +30,6 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
@@ -38,8 +37,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.processors.query.GridQuery;
-import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -272,7 +270,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
GridQueryProcessor qryProc = ((IgniteKernal)ignite).context().query();
- Collection<GridQuery> queries = qryProc.runningQueries(500);
+ Collection<GridRunningQueryInfo> queries = qryProc.runningQueries(500);
assertEquals(1, queries.size());
@@ -306,11 +304,11 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
GridQueryProcessor queryProc = ((IgniteKernal)ignite).context().query();
- Collection<GridQuery> queries = queryProc.runningQueries(500);
+ Collection<GridRunningQueryInfo> queries = queryProc.runningQueries(500);
assertEquals(1, queries.size());
- for (GridQuery query : queries)
+ for (GridRunningQueryInfo query : queries)
queryProc.cancelQueries(Collections.singleton(query.id()));
Thread.sleep(2000); // Give cluster some time to cancel query and cleanup resources.