You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/08/14 10:30:43 UTC
[33/50] ignite git commit: Proper cancellation.
Proper cancellation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/131087a5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/131087a5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/131087a5
Branch: refs/heads/ignite-5991-6019
Commit: 131087a5b871071ee9ba6b91e1c32414acf12f51
Parents: 2396875
Author: devozerov <vo...@gridgain.com>
Authored: Mon Aug 14 11:27:41 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Aug 14 11:27:41 2017 +0300
----------------------------------------------------------------------
.../processors/query/h2/IgniteH2Indexing.java | 38 ++++++++++++----
.../query/h2/twostep/GridMapQueryExecutor.java | 2 +-
.../query/h2/twostep/MapQueryResult.java | 47 ++++++++++++++++++--
.../query/h2/twostep/MapQueryResults.java | 10 ++++-
4 files changed, 81 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/131087a5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index f655377..f79d3cf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -112,6 +112,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryStreamingResultTarget;
+import org.apache.ignite.internal.processors.query.h2.twostep.lazy.MapQueryLazyWorker;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -909,18 +910,21 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @throws IgniteCheckedException If failed.
*/
private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt,
- int timeoutMillis, @Nullable GridQueryCancel cancel)
- throws IgniteCheckedException {
+ int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
+ final MapQueryLazyWorker lazyWorker = MapQueryLazyWorker.currentWorker();
if (cancel != null) {
cancel.set(new Runnable() {
@Override public void run() {
- try {
- stmt.cancel();
- }
- catch (SQLException ignored) {
- // No-op.
+ if (lazyWorker != null) {
+ lazyWorker.submit(new Runnable() {
+ @Override public void run() {
+ cancelStatement(stmt);
+ }
+ });
}
+ else
+ cancelStatement(stmt);
}
});
}
@@ -930,9 +934,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (timeoutMillis > 0)
ses.setQueryTimeout(timeoutMillis);
- try {
+ if (lazyWorker != null)
ses.setLazyQueryExecution(true);
+ try {
return stmt.executeQuery();
}
catch (SQLException e) {
@@ -946,7 +951,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (timeoutMillis > 0)
ses.setQueryTimeout(0);
- ses.setLazyQueryExecution(false);
+ if (lazyWorker != null)
+ ses.setLazyQueryExecution(false);
+ }
+ }
+
+ /**
+ * Cancel prepared statement.
+ *
+ * @param stmt Statement.
+ */
+ private static void cancelStatement(PreparedStatement stmt) {
+ try {
+ stmt.cancel();
+ }
+ catch (SQLException ignored) {
+ // No-op.
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/131087a5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 458990c..9362ab8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -657,7 +657,7 @@ public class GridMapQueryExecutor {
assert rs instanceof JdbcResultSet : rs.getClass();
}
- qr.addResult(qryIdx, qry, node.id(), rs, params);
+ qr.addResult(qryIdx, qry, node.id(), rs, params, MapQueryLazyWorker.currentWorker());
if (qr.cancelled()) {
qr.result(qryIdx).close();
http://git-wip-us.apache.org/repos/asf/ignite/blob/131087a5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index a2a70f1..8c22259 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
+import org.apache.ignite.internal.processors.query.h2.twostep.lazy.MapQueryLazyWorker;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.h2.jdbc.JdbcResultSet;
@@ -42,7 +44,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
/**
* Mapper result for a single part of the query.
*/
-class MapQueryResult implements AutoCloseable {
+class MapQueryResult {
/** */
private static final Field RESULT_FIELD;
@@ -96,24 +98,30 @@ class MapQueryResult implements AutoCloseable {
/** */
private final Object[] params;
+ /** Lazy worker. */
+ private final MapQueryLazyWorker lazyWorker;
+
/**
* @param rs Result set.
* @param cacheName Cache name.
* @param qrySrcNodeId Query source node.
* @param qry Query.
* @param params Query params.
+ * @param lazyWorker Lazy worker.
*/
MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable String cacheName,
- UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) {
+ UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) {
this.h2 = h2;
this.cacheName = cacheName;
this.qry = qry;
this.params = params;
this.qrySrcNodeId = qrySrcNodeId;
this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId);
+ this.lazyWorker = lazyWorker;
if (rs != null) {
this.rs = rs;
+
try {
res = (ResultInterface)RESULT_FIELD.get(rs);
}
@@ -168,6 +176,8 @@ class MapQueryResult implements AutoCloseable {
* @return {@code true} If there are no more rows available.
*/
synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
+ assert lazyWorker == null || MapQueryLazyWorker.currentWorker() != null;
+
if (closed)
return true;
@@ -247,8 +257,37 @@ class MapQueryResult implements AutoCloseable {
return res;
}
- /** {@inheritDoc} */
- @Override public synchronized void close() {
+ /**
+ * Close the result.
+ */
+ public synchronized void close() {
+ if (lazyWorker != null && MapQueryLazyWorker.currentWorker() == null) {
+ final GridFutureAdapter closeFut = new GridFutureAdapter();
+
+ lazyWorker.submit(new Runnable() {
+ @Override public void run() {
+ try {
+ close();
+ }
+ finally {
+ closeFut.onDone();
+ }
+ }
+ });
+
+ lazyWorker.stop();
+
+ try {
+ // Wait for close synchronously to maintain consistent semantics.
+ closeFut.get();
+ }
+ catch (Exception e) {
+ // No-op.
+ }
+
+ return;
+ }
+
if (closed)
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/131087a5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
index c5e30af..ed79c14 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.twostep.lazy.MapQueryLazyWorker;
import org.jetbrains.annotations.Nullable;
import java.sql.ResultSet;
@@ -96,13 +97,17 @@ class MapQueryResults {
}
/**
+ * Add result.
+ *
* @param qry Query result index.
* @param q Query object.
* @param qrySrcNodeId Query source node.
* @param rs Result set.
+ * @param lazyWorker Lazy worker.
*/
- void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
- MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params);
+ void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params,
+ @Nullable MapQueryLazyWorker lazyWorker) {
+ MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params, lazyWorker);
if (!results.compareAndSet(qry, null, res))
throw new IllegalStateException();
@@ -141,6 +146,7 @@ class MapQueryResults {
continue;
}
+ // NB: Cancel is already safe even for lazy queries (see implementation of passed Runnable).
if (forceQryCancel) {
GridQueryCancel cancel = cancels[i];