You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/04/05 09:55:43 UTC

ignite git commit: IGNITE-7712: SQL: system property to force lazy query execution. This closes #3666. This closes #3684.

Repository: ignite
Updated Branches:
  refs/heads/master 5181f6b53 -> 064c816c1


IGNITE-7712: SQL: system property to force lazy query execution. This closes #3666. This closes #3684.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/064c816c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/064c816c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/064c816c

Branch: refs/heads/master
Commit: 064c816c177d31f18af2954175ca3ad0f3eee957
Parents: 5181f6b
Author: Ilya Kasnacheev <il...@gmail.com>
Authored: Thu Apr 5 12:55:36 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Apr 5 12:55:36 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  5 +++-
 .../query/h2/opt/GridH2QueryContext.java        | 30 ++++++++++++++++++--
 .../processors/query/h2/opt/GridH2Table.java    | 14 ++++++++-
 .../query/h2/twostep/GridMapQueryExecutor.java  | 22 +++++++++-----
 .../query/h2/twostep/MapQueryLazyWorker.java    |  7 +++--
 .../query/h2/twostep/MapQueryResult.java        |  2 +-
 .../IgniteCacheQueryH2IndexingLeakTest.java     |  6 +++-
 7 files changed, 69 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 1f67f81..f7128c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -455,6 +455,9 @@ public final class IgniteSystemProperties {
     /** Disable fallback to H2 SQL parser if the internal SQL parser fails to parse the statement. */
     public static final String IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK = "IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK";
 
+    /** Force all SQL queries to be processed lazily regardless of what clients request. */
+    public static final String IGNITE_SQL_FORCE_LAZY_RESULT_SET = "IGNITE_SQL_FORCE_LAZY_RESULT_SET";
+
     /** Maximum size for affinity assignment history. */
     public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE";
 
@@ -840,7 +843,7 @@ public final class IgniteSystemProperties {
      * @see CacheConfiguration#readFromBackup
      */
     public static final String IGNITE_READ_LOAD_BALANCING = "IGNITE_READ_LOAD_BALANCING";
-  
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 0103676..7b52ea4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -21,15 +21,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.jetbrains.annotations.Nullable;
-import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
@@ -83,6 +84,9 @@ public class GridH2QueryContext {
     /** */
     private GridH2CollocationModel qryCollocationMdl;
 
+    /** */
+    private MapQueryLazyWorker lazyWorker;
+
     /**
      * @param locNodeId Local node ID.
      * @param nodeId The node who initiated the query.
@@ -351,7 +355,7 @@ public class GridH2QueryContext {
 
         for (Key key : qctxs.keySet()) {
             if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId) && key.qryId == qryId && key.type == type)
-                res |= doClear(new Key(locNodeId, nodeId, qryId, key.segmentId, type), false);
+                res |= doClear(key, false);
         }
 
         return res;
@@ -372,7 +376,10 @@ public class GridH2QueryContext {
 
         assert x.key.equals(key);
 
-        x.clearContext(nodeStop);
+        if (x.lazyWorker() != null)
+            x.lazyWorker().stop(nodeStop);
+        else
+            x.clearContext(nodeStop);
 
         return true;
     }
@@ -483,6 +490,23 @@ public class GridH2QueryContext {
         return this;
     }
 
+    /**
+     * @return Lazy worker, if any, or {@code null} if none.
+     */
+    public MapQueryLazyWorker lazyWorker() {
+        return lazyWorker;
+    }
+
+    /**
+     * @param lazyWorker Lazy worker, if any, or {@code null} if none.
+     * @return {@code this}.
+     */
+    public GridH2QueryContext lazyWorker(MapQueryLazyWorker lazyWorker) {
+        this.lazyWorker = lazyWorker;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridH2QueryContext.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 67ee496..15be253 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -36,6 +38,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
+import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.command.dml.Insert;
@@ -289,7 +292,16 @@ public class GridH2Table extends TableBase {
         Lock l = exclusive ? lock.writeLock() : lock.readLock();
 
         try {
-            l.lockInterruptibly();
+            if (!exclusive || !GridMapQueryExecutor.FORCE_LAZY)
+                l.lockInterruptibly();
+            else {
+                for (;;) {
+                    if (l.tryLock(200, TimeUnit.MILLISECONDS))
+                        break;
+                    else
+                        Thread.yield();
+                }
+            }
         }
         catch (InterruptedException e) {
             Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 7f49808..9b1e4fa 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -37,6 +37,7 @@ import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterNode;
@@ -83,6 +84,7 @@ import org.h2.jdbc.JdbcResultSet;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_FORCE_LAZY_RESULT_SET;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
@@ -99,6 +101,9 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2V
 @SuppressWarnings("ForLoopReplaceableByForEach")
 public class GridMapQueryExecutor {
     /** */
+    public static final boolean FORCE_LAZY = IgniteSystemProperties.getBoolean(IGNITE_SQL_FORCE_LAZY_RESULT_SET);
+
+    /** */
     private IgniteLogger log;
 
     /** */
@@ -189,7 +194,7 @@ public class GridMapQueryExecutor {
         lazyWorkerBusyLock.block();
 
         for (MapQueryLazyWorker worker : lazyWorkers.values())
-            worker.stop();
+            worker.stop(false);
 
         lazyWorkers.clear();
     }
@@ -451,7 +456,7 @@ public class GridMapQueryExecutor {
         final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
         final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
         final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
-        final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
+        final boolean lazy = (FORCE_LAZY && req.queries().size() == 1) || req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
 
         final List<Integer> cacheIds = req.caches();
 
@@ -562,10 +567,12 @@ public class GridMapQueryExecutor {
         final Object[] params,
         boolean lazy
     ) {
-        if (lazy && MapQueryLazyWorker.currentWorker() == null) {
+        MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
+
+        if (lazy && worker == null) {
             // Lazy queries must be re-submitted to dedicated workers.
             MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
-            MapQueryLazyWorker worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this);
+            worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this);
 
             worker.submit(new Runnable() {
                 @Override public void run() {
@@ -579,7 +586,7 @@ public class GridMapQueryExecutor {
                     MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker);
 
                     if (oldWorker != null)
-                        oldWorker.stop();
+                        oldWorker.stop(false);
 
                     IgniteThread thread = new IgniteThread(worker);
 
@@ -635,7 +642,8 @@ public class GridMapQueryExecutor {
                 .distributedJoinMode(distributedJoinMode)
                 .pageSize(pageSize)
                 .topologyVersion(topVer)
-                .reservations(reserved);
+                .reservations(reserved)
+                .lazyWorker(worker);
 
             Connection conn = h2.connectionForSchema(schemaName);
 
@@ -1049,7 +1057,7 @@ public class GridMapQueryExecutor {
         MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
 
         if (worker != null) {
-            worker.stop();
+            worker.stop(false);
 
             // Just stop is not enough as worker may be registered, but not started due to exception.
             unregisterLazyWorker(worker);

http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
index 50e2af5..59c050f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
@@ -114,19 +114,20 @@ public class MapQueryLazyWorker extends GridWorker {
 
     /**
      * Stop the worker.
+     * @param nodeStop Node is stopping.
      */
-    public void stop() {
+    public void stop(final boolean nodeStop) {
         if (MapQueryLazyWorker.currentWorker() == null)
             submit(new Runnable() {
                 @Override public void run() {
-                    stop();
+                    stop(nodeStop);
                 }
             });
         else {
             GridH2QueryContext qctx = GridH2QueryContext.get();
 
             if (qctx != null) {
-                qctx.clearContext(false);
+                qctx.clearContext(nodeStop);
 
                 GridH2QueryContext.clearThreadLocal();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index beeb054..733590c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -282,7 +282,7 @@ class MapQueryResult {
             U.closeQuiet(rs);
 
             if (lazyWorker != null)
-                lazyWorker.stop();
+                lazyWorker.stop(false);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
index 305529a..a8f2636 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
@@ -157,7 +157,11 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
                 // Wait for stmt cache entry is created for each thread.
                 assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
                     @Override public boolean apply() {
-                        return getStatementCacheSize(qryProc) == THREAD_COUNT;
+                        // '>' case is for lazy query flag turned on - in this case, there's more threads
+                        // than those run by test explicitly, and we can't rely on exact number.
+                        // Still the main check for this test is that all threads, no matter how many of them
+                        // is out there, are terminated and their statement caches are cleaned up.
+                        return getStatementCacheSize(qryProc) >= THREAD_COUNT;
                     }
                 }, STMT_CACHE_CLEANUP_TIMEOUT));
             }