You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/08/14 10:30:43 UTC

[33/50] ignite git commit: Proper cancellation.

Proper cancellation.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/131087a5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/131087a5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/131087a5

Branch: refs/heads/ignite-5991-6019
Commit: 131087a5b871071ee9ba6b91e1c32414acf12f51
Parents: 2396875
Author: devozerov <vo...@gridgain.com>
Authored: Mon Aug 14 11:27:41 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Aug 14 11:27:41 2017 +0300

----------------------------------------------------------------------
 .../processors/query/h2/IgniteH2Indexing.java   | 38 ++++++++++++----
 .../query/h2/twostep/GridMapQueryExecutor.java  |  2 +-
 .../query/h2/twostep/MapQueryResult.java        | 47 ++++++++++++++++++--
 .../query/h2/twostep/MapQueryResults.java       | 10 ++++-
 4 files changed, 81 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/131087a5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index f655377..f79d3cf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -112,6 +112,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
 import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
 import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
 import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryStreamingResultTarget;
+import org.apache.ignite.internal.processors.query.h2.twostep.lazy.MapQueryLazyWorker;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -909,18 +910,21 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException If failed.
      */
     private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt,
-        int timeoutMillis, @Nullable GridQueryCancel cancel)
-        throws IgniteCheckedException {
+        int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
+        final MapQueryLazyWorker lazyWorker = MapQueryLazyWorker.currentWorker();
 
         if (cancel != null) {
             cancel.set(new Runnable() {
                 @Override public void run() {
-                    try {
-                        stmt.cancel();
-                    }
-                    catch (SQLException ignored) {
-                        // No-op.
+                    if (lazyWorker != null) {
+                        lazyWorker.submit(new Runnable() {
+                            @Override public void run() {
+                                cancelStatement(stmt);
+                            }
+                        });
                     }
+                    else
+                        cancelStatement(stmt);
                 }
             });
         }
@@ -930,9 +934,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (timeoutMillis > 0)
             ses.setQueryTimeout(timeoutMillis);
 
-        try {
+        if (lazyWorker != null)
             ses.setLazyQueryExecution(true);
 
+        try {
             return stmt.executeQuery();
         }
         catch (SQLException e) {
@@ -946,7 +951,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (timeoutMillis > 0)
                 ses.setQueryTimeout(0);
 
-            ses.setLazyQueryExecution(false);
+            if (lazyWorker != null)
+                ses.setLazyQueryExecution(false);
+        }
+    }
+
+    /**
+     * Cancel prepared statement.
+     *
+     * @param stmt Statement.
+     */
+    private static void cancelStatement(PreparedStatement stmt) {
+        try {
+            stmt.cancel();
+        }
+        catch (SQLException ignored) {
+            // No-op.
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/131087a5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 458990c..9362ab8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -657,7 +657,7 @@ public class GridMapQueryExecutor {
                         assert rs instanceof JdbcResultSet : rs.getClass();
                     }
 
-                    qr.addResult(qryIdx, qry, node.id(), rs, params);
+                    qr.addResult(qryIdx, qry, node.id(), rs, params, MapQueryLazyWorker.currentWorker());
 
                     if (qr.cancelled()) {
                         qr.result(qryIdx).close();

http://git-wip-us.apache.org/repos/asf/ignite/blob/131087a5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index a2a70f1..8c22259 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
+import org.apache.ignite.internal.processors.query.h2.twostep.lazy.MapQueryLazyWorker;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.jdbc.JdbcResultSet;
@@ -42,7 +44,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 /**
  * Mapper result for a single part of the query.
  */
-class MapQueryResult implements AutoCloseable {
+class MapQueryResult {
     /** */
     private static final Field RESULT_FIELD;
 
@@ -96,24 +98,30 @@ class MapQueryResult implements AutoCloseable {
     /** */
     private final Object[] params;
 
+    /** Lazy worker. */
+    private final MapQueryLazyWorker lazyWorker;
+
     /**
      * @param rs Result set.
      * @param cacheName Cache name.
      * @param qrySrcNodeId Query source node.
      * @param qry Query.
      * @param params Query params.
+     * @param lazyWorker Lazy worker.
      */
     MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable String cacheName,
-        UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) {
+        UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) {
         this.h2 = h2;
         this.cacheName = cacheName;
         this.qry = qry;
         this.params = params;
         this.qrySrcNodeId = qrySrcNodeId;
         this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId);
+        this.lazyWorker = lazyWorker;
 
         if (rs != null) {
             this.rs = rs;
+
             try {
                 res = (ResultInterface)RESULT_FIELD.get(rs);
             }
@@ -168,6 +176,8 @@ class MapQueryResult implements AutoCloseable {
      * @return {@code true} If there are no more rows available.
      */
     synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
+        assert lazyWorker == null || MapQueryLazyWorker.currentWorker() != null;
+
         if (closed)
             return true;
 
@@ -247,8 +257,37 @@ class MapQueryResult implements AutoCloseable {
         return res;
     }
 
-    /** {@inheritDoc} */
-    @Override public synchronized void close() {
+    /**
+     * Close the result.
+     */
+    public synchronized void close() {
+        if (lazyWorker != null && MapQueryLazyWorker.currentWorker() == null) {
+            final GridFutureAdapter closeFut = new GridFutureAdapter();
+
+            lazyWorker.submit(new Runnable() {
+                @Override public void run() {
+                    try {
+                        close();
+                    }
+                    finally {
+                        closeFut.onDone();
+                    }
+                }
+            });
+
+            lazyWorker.stop();
+
+            try {
+                // Wait for close synchronously to maintain consistent semantics.
+                closeFut.get();
+            }
+            catch (Exception e) {
+                // No-op.
+            }
+
+            return;
+        }
+
         if (closed)
             return;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/131087a5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
index c5e30af..ed79c14 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.twostep.lazy.MapQueryLazyWorker;
 import org.jetbrains.annotations.Nullable;
 
 import java.sql.ResultSet;
@@ -96,13 +97,17 @@ class MapQueryResults {
     }
 
     /**
+     * Add result.
+     *
      * @param qry Query result index.
      * @param q Query object.
      * @param qrySrcNodeId Query source node.
      * @param rs Result set.
+     * @param lazyWorker Lazy worker.
      */
-    void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
-        MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params);
+    void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params,
+        @Nullable MapQueryLazyWorker lazyWorker) {
+        MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params, lazyWorker);
 
         if (!results.compareAndSet(qry, null, res))
             throw new IllegalStateException();
@@ -141,6 +146,7 @@ class MapQueryResults {
                 continue;
             }
 
+            // NB: Cancel is already safe even for lazy queries (see implementation of passed Runnable).
             if (forceQryCancel) {
                 GridQueryCancel cancel = cancels[i];