You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/04/05 09:55:43 UTC
ignite git commit: IGNITE-7712: SQL: system property to force lazy
query execution. This closes #3666. This closes #3684.
Repository: ignite
Updated Branches:
refs/heads/master 5181f6b53 -> 064c816c1
IGNITE-7712: SQL: system property to force lazy query execution. This closes #3666. This closes #3684.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/064c816c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/064c816c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/064c816c
Branch: refs/heads/master
Commit: 064c816c177d31f18af2954175ca3ad0f3eee957
Parents: 5181f6b
Author: Ilya Kasnacheev <il...@gmail.com>
Authored: Thu Apr 5 12:55:36 2018 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Apr 5 12:55:36 2018 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 5 +++-
.../query/h2/opt/GridH2QueryContext.java | 30 ++++++++++++++++++--
.../processors/query/h2/opt/GridH2Table.java | 14 ++++++++-
.../query/h2/twostep/GridMapQueryExecutor.java | 22 +++++++++-----
.../query/h2/twostep/MapQueryLazyWorker.java | 7 +++--
.../query/h2/twostep/MapQueryResult.java | 2 +-
.../IgniteCacheQueryH2IndexingLeakTest.java | 6 +++-
7 files changed, 69 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 1f67f81..f7128c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -455,6 +455,9 @@ public final class IgniteSystemProperties {
/** Disable fallback to H2 SQL parser if the internal SQL parser fails to parse the statement. */
public static final String IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK = "IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK";
+ /** Force all SQL queries to be processed lazily regardless of what clients request. */
+ public static final String IGNITE_SQL_FORCE_LAZY_RESULT_SET = "IGNITE_SQL_FORCE_LAZY_RESULT_SET";
+
/** Maximum size for affinity assignment history. */
public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE";
@@ -840,7 +843,7 @@ public final class IgniteSystemProperties {
* @see CacheConfiguration#readFromBackup
*/
public static final String IGNITE_READ_LOAD_BALANCING = "IGNITE_READ_LOAD_BALANCING";
-
+
/**
* Enforces singleton.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 0103676..7b52ea4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -21,15 +21,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.jetbrains.annotations.Nullable;
-import java.util.concurrent.ConcurrentHashMap;
import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
@@ -83,6 +84,9 @@ public class GridH2QueryContext {
/** */
private GridH2CollocationModel qryCollocationMdl;
+ /** */
+ private MapQueryLazyWorker lazyWorker;
+
/**
* @param locNodeId Local node ID.
* @param nodeId The node who initiated the query.
@@ -351,7 +355,7 @@ public class GridH2QueryContext {
for (Key key : qctxs.keySet()) {
if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId) && key.qryId == qryId && key.type == type)
- res |= doClear(new Key(locNodeId, nodeId, qryId, key.segmentId, type), false);
+ res |= doClear(key, false);
}
return res;
@@ -372,7 +376,10 @@ public class GridH2QueryContext {
assert x.key.equals(key);
- x.clearContext(nodeStop);
+ if (x.lazyWorker() != null)
+ x.lazyWorker().stop(nodeStop);
+ else
+ x.clearContext(nodeStop);
return true;
}
@@ -483,6 +490,23 @@ public class GridH2QueryContext {
return this;
}
+ /**
+ * @return Lazy worker, if any, or {@code null} if none.
+ */
+ public MapQueryLazyWorker lazyWorker() {
+ return lazyWorker;
+ }
+
+ /**
+ * @param lazyWorker Lazy worker, if any, or {@code null} if none.
+ * @return {@code this}.
+ */
+ public GridH2QueryContext lazyWorker(MapQueryLazyWorker lazyWorker) {
+ this.lazyWorker = lazyWorker;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridH2QueryContext.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 67ee496..15be253 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -36,6 +38,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
+import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import org.apache.ignite.internal.util.typedef.F;
import org.h2.command.ddl.CreateTableData;
import org.h2.command.dml.Insert;
@@ -289,7 +292,16 @@ public class GridH2Table extends TableBase {
Lock l = exclusive ? lock.writeLock() : lock.readLock();
try {
- l.lockInterruptibly();
+ if (!exclusive || !GridMapQueryExecutor.FORCE_LAZY)
+ l.lockInterruptibly();
+ else {
+ for (;;) {
+ if (l.tryLock(200, TimeUnit.MILLISECONDS))
+ break;
+ else
+ Thread.yield();
+ }
+ }
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 7f49808..9b1e4fa 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -37,6 +37,7 @@ import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
@@ -83,6 +84,7 @@ import org.h2.jdbc.JdbcResultSet;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_FORCE_LAZY_RESULT_SET;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
@@ -99,6 +101,9 @@ import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2V
@SuppressWarnings("ForLoopReplaceableByForEach")
public class GridMapQueryExecutor {
/** */
+ public static final boolean FORCE_LAZY = IgniteSystemProperties.getBoolean(IGNITE_SQL_FORCE_LAZY_RESULT_SET);
+
+ /** */
private IgniteLogger log;
/** */
@@ -189,7 +194,7 @@ public class GridMapQueryExecutor {
lazyWorkerBusyLock.block();
for (MapQueryLazyWorker worker : lazyWorkers.values())
- worker.stop();
+ worker.stop(false);
lazyWorkers.clear();
}
@@ -451,7 +456,7 @@ public class GridMapQueryExecutor {
final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
- final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
+ final boolean lazy = (FORCE_LAZY && req.queries().size() == 1) || req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
final List<Integer> cacheIds = req.caches();
@@ -562,10 +567,12 @@ public class GridMapQueryExecutor {
final Object[] params,
boolean lazy
) {
- if (lazy && MapQueryLazyWorker.currentWorker() == null) {
+ MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
+
+ if (lazy && worker == null) {
// Lazy queries must be re-submitted to dedicated workers.
MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
- MapQueryLazyWorker worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this);
+ worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this);
worker.submit(new Runnable() {
@Override public void run() {
@@ -579,7 +586,7 @@ public class GridMapQueryExecutor {
MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker);
if (oldWorker != null)
- oldWorker.stop();
+ oldWorker.stop(false);
IgniteThread thread = new IgniteThread(worker);
@@ -635,7 +642,8 @@ public class GridMapQueryExecutor {
.distributedJoinMode(distributedJoinMode)
.pageSize(pageSize)
.topologyVersion(topVer)
- .reservations(reserved);
+ .reservations(reserved)
+ .lazyWorker(worker);
Connection conn = h2.connectionForSchema(schemaName);
@@ -1049,7 +1057,7 @@ public class GridMapQueryExecutor {
MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
if (worker != null) {
- worker.stop();
+ worker.stop(false);
// Just stop is not enough as worker may be registered, but not started due to exception.
unregisterLazyWorker(worker);
http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
index 50e2af5..59c050f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
@@ -114,19 +114,20 @@ public class MapQueryLazyWorker extends GridWorker {
/**
* Stop the worker.
+ * @param nodeStop Node is stopping.
*/
- public void stop() {
+ public void stop(final boolean nodeStop) {
if (MapQueryLazyWorker.currentWorker() == null)
submit(new Runnable() {
@Override public void run() {
- stop();
+ stop(nodeStop);
}
});
else {
GridH2QueryContext qctx = GridH2QueryContext.get();
if (qctx != null) {
- qctx.clearContext(false);
+ qctx.clearContext(nodeStop);
GridH2QueryContext.clearThreadLocal();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index beeb054..733590c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -282,7 +282,7 @@ class MapQueryResult {
U.closeQuiet(rs);
if (lazyWorker != null)
- lazyWorker.stop();
+ lazyWorker.stop(false);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/064c816c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
index 305529a..a8f2636 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
@@ -157,7 +157,11 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
// Wait for stmt cache entry is created for each thread.
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- return getStatementCacheSize(qryProc) == THREAD_COUNT;
+ // '>' case is for lazy query flag turned on - in this case, there's more threads
+ // than those run by test explicitly, and we can't rely on exact number.
+ // Still the main check for this test is that all threads, no matter how many of them
+ // is out there, are terminated and their statement caches are cleaned up.
+ return getStatementCacheSize(qryProc) >= THREAD_COUNT;
}
}, STMT_CACHE_CLEANUP_TIMEOUT));
}