You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/09 08:20:04 UTC
[ignite] branch master updated: IGNITE-11223: SQL: Make two-step
plan immutable. Note that GridCacheSqlQuery is still mutable,
what will be fixed separately. This closes #6070.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 208c53f IGNITE-11223: SQL: Make two-step plan immutable. Note that GridCacheSqlQuery is still mutable, what will be fixed separately. This closes #6070.
208c53f is described below
commit 208c53f467797606cfd0b116bda3101cf3451d5b
Author: devozerov <pp...@gmail.com>
AuthorDate: Sat Feb 9 11:19:55 2019 +0300
IGNITE-11223: SQL: Make two-step plan immutable. Note that GridCacheSqlQuery is still mutable, what will be fixed separately. This closes #6070.
---
.../cache/query/GridCacheTwoStepQuery.java | 183 ++++-----------
.../internal/processors/query/h2/H2Utils.java | 141 ++++++++++++
.../processors/query/h2/IgniteH2Indexing.java | 250 +++++----------------
.../processors/query/h2/dml/UpdatePlanBuilder.java | 21 +-
.../query/h2/sql/GridSqlQuerySplitter.java | 59 ++---
.../query/h2/twostep/GridReduceQueryExecutor.java | 34 ++-
.../query/h2/twostep/ReduceQueryRun.java | 5 +-
7 files changed, 317 insertions(+), 376 deletions(-)
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index cc5dd03..aad9cdd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.processors.cache.query;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
-import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
@@ -30,48 +30,42 @@ import org.apache.ignite.internal.util.typedef.internal.S;
*/
public class GridCacheTwoStepQuery {
/** */
- public static final int DFLT_PAGE_SIZE = 1000;
-
- /** */
@GridToStringInclude
- private List<GridCacheSqlQuery> mapQrys = new ArrayList<>();
+ private final List<GridCacheSqlQuery> mapQrys;
/** */
@GridToStringInclude
- private GridCacheSqlQuery rdc;
-
- /** */
- private int pageSize = DFLT_PAGE_SIZE;
+ private final GridCacheSqlQuery rdc;
/** */
- private boolean explain;
+ private final boolean explain;
/** */
- private String originalSql;
+ private final String originalSql;
/** */
- private Set<QueryTable> tbls;
+ private final Set<QueryTable> tbls;
/** */
- private boolean distributedJoins;
+ private final boolean distributedJoins;
/** */
- private boolean skipMergeTbl;
+ private final boolean skipMergeTbl;
/** */
- private List<Integer> cacheIds;
+ private final List<Integer> cacheIds;
/** */
- private boolean local;
+ private final boolean local;
/** */
- private PartitionResult derivedPartitions;
+ private final PartitionResult derivedPartitions;
/** */
- private boolean mvccEnabled;
+ private final boolean mvccEnabled;
/** {@code FOR UPDATE} flag. */
- private boolean forUpdate;
+ private final boolean forUpdate;
/** Number of positional arguments in the sql. */
private final int paramsCnt;
@@ -80,19 +74,34 @@ public class GridCacheTwoStepQuery {
* @param originalSql Original query SQL.
* @param tbls Tables in query.
*/
- public GridCacheTwoStepQuery(String originalSql, int paramsCnt, Set<QueryTable> tbls) {
+ public GridCacheTwoStepQuery(
+ String originalSql,
+ int paramsCnt,
+ Set<QueryTable> tbls,
+ GridCacheSqlQuery rdc,
+ List<GridCacheSqlQuery> mapQrys,
+ boolean skipMergeTbl,
+ boolean explain,
+ boolean distributedJoins,
+ boolean forUpdate,
+ PartitionResult derivedPartitions,
+ List<Integer> cacheIds,
+ boolean mvccEnabled,
+ boolean local
+ ) {
this.originalSql = originalSql;
this.paramsCnt = paramsCnt;
this.tbls = tbls;
- }
-
- /**
- * Specify if distributed joins are enabled for this query.
- *
- * @param distributedJoins Distributed joins enabled.
- */
- public void distributedJoins(boolean distributedJoins) {
+ this.rdc = rdc;
+ this.mapQrys = F.isEmpty(mapQrys) ? Collections.emptyList() : mapQrys;
+ this.skipMergeTbl = skipMergeTbl;
+ this.explain = explain;
this.distributedJoins = distributedJoins;
+ this.forUpdate = forUpdate;
+ this.derivedPartitions = derivedPartitions;
+ this.cacheIds = cacheIds;
+ this.mvccEnabled = mvccEnabled;
+ this.local = local;
}
/**
@@ -104,7 +113,6 @@ public class GridCacheTwoStepQuery {
return distributedJoins;
}
-
/**
* @return {@code True} if reduce query can skip merge table creation and get data directly from merge index.
*/
@@ -113,13 +121,6 @@ public class GridCacheTwoStepQuery {
}
/**
- * @param skipMergeTbl Skip merge table.
- */
- public void skipMergeTable(boolean skipMergeTbl) {
- this.skipMergeTbl = skipMergeTbl;
- }
-
- /**
* @return If this is explain query.
*/
public boolean explain() {
@@ -127,34 +128,6 @@ public class GridCacheTwoStepQuery {
}
/**
- * @param explain If this is explain query.
- */
- public void explain(boolean explain) {
- this.explain = explain;
- }
-
- /**
- * @param pageSize Page size.
- */
- public void pageSize(int pageSize) {
- this.pageSize = pageSize;
- }
-
- /**
- * @return Page size.
- */
- public int pageSize() {
- return pageSize;
- }
-
- /**
- * @param qry SQL Query.
- */
- public void addMapQuery(GridCacheSqlQuery qry) {
- mapQrys.add(qry);
- }
-
- /**
* @return {@code true} If all the map queries contain only replicated tables.
*/
public boolean isReplicatedOnly() {
@@ -176,13 +149,6 @@ public class GridCacheTwoStepQuery {
}
/**
- * @param rdc Reduce query.
- */
- public void reduceQuery(GridCacheSqlQuery rdc) {
- this.rdc = rdc;
- }
-
- /**
* @return Map queries.
*/
public List<GridCacheSqlQuery> mapQueries() {
@@ -197,13 +163,6 @@ public class GridCacheTwoStepQuery {
}
/**
- * @param cacheIds Cache IDs.
- */
- public void cacheIds(List<Integer> cacheIds) {
- this.cacheIds = cacheIds;
- }
-
- /**
* @return Original query SQL.
*/
public String originalSql() {
@@ -218,13 +177,6 @@ public class GridCacheTwoStepQuery {
}
/**
- * @param local Local query flag.
- */
- public void local(boolean local) {
- this.local = local;
- }
-
- /**
* @return Query derived partitions info.
*/
public PartitionResult derivedPartitions() {
@@ -232,37 +184,6 @@ public class GridCacheTwoStepQuery {
}
/**
- * @param derivedPartitions Query derived partitions info.
- */
- public void derivedPartitions(PartitionResult derivedPartitions) {
- this.derivedPartitions = derivedPartitions;
- }
-
- /**
- * @return Copy.
- */
- public GridCacheTwoStepQuery copy() {
- assert !explain;
-
- GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, paramsCnt, tbls);
-
- cp.cacheIds = cacheIds;
- cp.rdc = rdc.copy();
- cp.skipMergeTbl = skipMergeTbl;
- cp.pageSize = pageSize;
- cp.distributedJoins = distributedJoins;
- cp.derivedPartitions = derivedPartitions;
- cp.local = local;
- cp.mvccEnabled = mvccEnabled;
- cp.forUpdate = forUpdate;
-
- for (int i = 0; i < mapQrys.size(); i++)
- cp.mapQrys.add(mapQrys.get(i).copy());
-
- return cp;
- }
-
- /**
* @return Nuumber of tables.
*/
public int tablesCount() {
@@ -284,13 +205,6 @@ public class GridCacheTwoStepQuery {
}
/**
- * @param mvccEnabled Mvcc flag.
- */
- public void mvccEnabled(boolean mvccEnabled) {
- this.mvccEnabled = mvccEnabled;
- }
-
- /**
* @return {@code FOR UPDATE} flag.
*/
public boolean forUpdate() {
@@ -298,13 +212,6 @@ public class GridCacheTwoStepQuery {
}
/**
- * @param forUpdate {@code FOR UPDATE} flag.
- */
- public void forUpdate(boolean forUpdate) {
- this.forUpdate = forUpdate;
- }
-
- /**
* @return Number of parameters
*/
public int parametersCount() {
@@ -315,18 +222,4 @@ public class GridCacheTwoStepQuery {
@Override public String toString() {
return S.toString(GridCacheTwoStepQuery.class, this);
}
-
- /**
- * @return {@code True} is system views exist.
- */
- public boolean hasSystemViews() {
- if (tablesCount() > 0) {
- for (QueryTable tbl : tables()) {
- if (QueryUtils.SCHEMA_SYS.equals(tbl.schema()))
- return true;
- }
- }
-
- return false;
- }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index 684ecf7..50b8def 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -34,6 +34,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -43,10 +44,17 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
@@ -743,4 +751,137 @@ public class H2Utils {
U.swap(arr, i, i + 1);
}
}
+
+ /**
+ * Collect cache identifiers from two-step query.
+ *
+ * @param mainCacheId Id of main cache.
+ * @return Result.
+ */
+ @Nullable public static List<Integer> collectCacheIds(
+ IgniteH2Indexing idx,
+ @Nullable Integer mainCacheId,
+ Collection<QueryTable> tbls
+ ) {
+ LinkedHashSet<Integer> caches0 = new LinkedHashSet<>();
+
+ if (mainCacheId != null)
+ caches0.add(mainCacheId);
+
+ if (!F.isEmpty(tbls)) {
+ for (QueryTable tblKey : tbls) {
+ GridH2Table tbl = idx.schemaManager().dataTable(tblKey.schema(), tblKey.table());
+
+ if (tbl != null) {
+ checkAndStartNotStartedCache(idx.kernalContext(), tbl);
+
+ int cacheId = tbl.cacheId();
+
+ caches0.add(cacheId);
+ }
+ }
+ }
+
+ return caches0.isEmpty() ? Collections.emptyList() : new ArrayList<>(caches0);
+ }
+
+ /**
+ * Collect MVCC enabled flag.
+ *
+ * @param idx Indexing.
+ * @param cacheIds Cache IDs.
+ * @return {@code True} if indexing is enabled.
+ */
+ public static boolean collectMvccEnabled(IgniteH2Indexing idx, List<Integer> cacheIds) {
+ if (cacheIds.isEmpty())
+ return false;
+
+ GridCacheSharedContext sharedCtx = idx.kernalContext().cache().context();
+
+ GridCacheContext cctx0 = null;
+
+ boolean mvccEnabled = false;
+
+ for (int i = 0; i < cacheIds.size(); i++) {
+ Integer cacheId = cacheIds.get(i);
+
+ GridCacheContext cctx = sharedCtx.cacheContext(cacheId);
+
+ assert cctx != null;
+
+ if (i == 0) {
+ mvccEnabled = cctx.mvccEnabled();
+ cctx0 = cctx;
+ }
+ else if (cctx.mvccEnabled() != mvccEnabled)
+ MvccUtils.throwAtomicityModesMismatchException(cctx0, cctx);
+ }
+
+ return mvccEnabled;
+ }
+
+ /**
+ * Check if query is valid.
+ *
+ * @param idx Indexing.
+ * @param cacheIds Cache IDs.
+ * @param mvccEnabled MVCC enabled flag.
+ * @param forUpdate For update flag.
+ * @param tbls Tables.
+ */
+ public static void checkQuery(
+ IgniteH2Indexing idx,
+ List<Integer> cacheIds,
+ boolean mvccEnabled,
+ boolean forUpdate,
+ Collection<QueryTable> tbls
+ ) {
+ GridCacheSharedContext sharedCtx = idx.kernalContext().cache().context();
+
+ // Check query parallelism.
+ int expectedParallelism = 0;
+
+ for (int i = 0; i < cacheIds.size(); i++) {
+ Integer cacheId = cacheIds.get(i);
+
+ GridCacheContext cctx = sharedCtx.cacheContext(cacheId);
+
+ assert cctx != null;
+
+ if (!cctx.isPartitioned())
+ continue;
+
+ if (expectedParallelism == 0)
+ expectedParallelism = cctx.config().getQueryParallelism();
+ else if (cctx.config().getQueryParallelism() != expectedParallelism) {
+ throw new IllegalStateException("Using indexes with different parallelism levels in same query is " +
+ "forbidden.");
+ }
+ }
+
+ // Check FOR UPDATE invariants: only one table, MVCC is there.
+ if (forUpdate) {
+ if (cacheIds.size() != 1)
+ throw new IgniteSQLException("SELECT FOR UPDATE is supported only for queries " +
+ "that involve single transactional cache.");
+
+ if (!mvccEnabled)
+ throw new IgniteSQLException("SELECT FOR UPDATE query requires transactional cache " +
+ "with MVCC enabled.", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ }
+
+ // Check for joins between system views and normal tables.
+ if (!F.isEmpty(tbls)) {
+ for (QueryTable tbl : tbls) {
+ if (QueryUtils.SCHEMA_SYS.equals(tbl.schema())) {
+ if (!F.isEmpty(cacheIds)) {
+ throw new IgniteSQLException("Normal tables and system views cannot be used in the same query.",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ }
+ else
+ return;
+ }
+ }
+ }
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2b60286..afdd9e5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -29,7 +29,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -65,7 +64,6 @@ import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
@@ -81,7 +79,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.cache.query.RegisteredQueryCursor;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
@@ -1277,6 +1274,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param lazy Lazy query execution flag.
* @param mvccTracker Query tracker.
* @param dataPageScanEnabled If data page scan is enabled.
+ * @param pageSize Page size.
* @return Iterable result.
*/
private Iterable<List<?>> runQueryTwoStep(
@@ -1291,7 +1289,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
final int[] parts,
final boolean lazy,
MvccQueryTracker mvccTracker,
- Boolean dataPageScanEnabled
+ Boolean dataPageScanEnabled,
+ int pageSize
) {
assert !qry.mvccEnabled() || !F.isEmpty(qry.cacheIds());
@@ -1301,11 +1300,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridNearTxLocal tx = tracker != null ? tx(ctx) : null;
- if (qry.forUpdate()) {
- // Locking has no meaning if SELECT FOR UPDATE is not executed in explicit transaction.
- // So, we can can reset forUpdate flag if there is no explicit transaction.
- qry.forUpdate(checkActive(tx) != null);
- }
+ // Locking has no meaning if SELECT FOR UPDATE is not executed in explicit transaction.
+ // So, we can can reset forUpdate flag if there is no explicit transaction.
+ boolean forUpdate = qry.forUpdate() && checkActive(tx) != null;
int opTimeout = operationTimeout(qryTimeout, tx);
@@ -1313,8 +1310,21 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@SuppressWarnings("NullableProblems")
@Override public Iterator<List<?>> iterator() {
try {
- return rdcQryExec.query(schemaName, qry, keepCacheObj, enforceJoinOrder, opTimeout,
- cancel, params, parts, lazy, tracker, dataPageScanEnabled);
+ return rdcQryExec.query(
+ schemaName,
+ qry,
+ keepCacheObj,
+ enforceJoinOrder,
+ opTimeout,
+ cancel,
+ params,
+ parts,
+ lazy,
+ tracker,
+ dataPageScanEnabled,
+ forUpdate,
+ pageSize
+ );
}
catch (Throwable e) {
if (tracker != null)
@@ -1743,12 +1753,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
res.addAll(qryRes);
firstArg += parseRes.parametersCount();
-
- H2TwoStepCachedQueryKey twoStepKey = parseRes.twoStepQueryKey();
-
- // We cannot cache two-step query for multiple statements query except the last statement
- if (twoStepQry != null && remainingSql == null && !twoStepQry.explain() && twoStepKey != null)
- twoStepCache.putIfAbsent(twoStepKey, new H2TwoStepCachedQuery(meta, twoStepQry.copy()));
}
}
@@ -1788,29 +1792,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
qry.isEnforceJoinOrder(),
qry.isLocal());
- H2TwoStepCachedQuery cachedQry;
-
- if ((cachedQry = twoStepCache.get(cachedQryKey)) != null) {
- checkQueryType(qry, true);
-
- GridCacheTwoStepQuery twoStepQry = cachedQry.query().copy();
-
- List<GridQueryFieldMetadata> meta = cachedQry.meta();
-
- ParsingResult parseRes = new ParsingResult(null, qry, null, twoStepQry, cachedQryKey, meta);
+ H2TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
- if (!twoStepQry.explain())
- twoStepCache.putIfAbsent(cachedQryKey, new H2TwoStepCachedQuery(meta, twoStepQry.copy()));
-
- return parseRes;
- }
+ if (cachedQry != null)
+ return new ParsingResult(null, qry, null, cachedQry.query(), cachedQryKey, cachedQry.meta());
+ // Try parting as native command.
ParsingResult parseRes = parseNativeCommand(schemaName, qry);
if (parseRes != null)
return parseRes;
- // parse with h2 parser:
+ // Parse with H2.
return parseAndSplit(schemaName, qry, firstArg);
}
@@ -2006,7 +1999,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
args = Arrays.copyOfRange(argsOrig, firstArg, firstArg + paramsCnt);
}
- if (prepared.isQuery()) {
+ if (prepared.isQuery()) {
try {
H2Utils.bindParameters(stmt, F.asList(args));
}
@@ -2063,34 +2056,43 @@ public class IgniteH2Indexing implements GridQueryIndexing {
H2TwoStepCachedQueryKey cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, qry.getSql(),
qry.isCollocated(), qry.isDistributedJoins(), qry.isEnforceJoinOrder(), qry.isLocal());
- H2TwoStepCachedQuery cachedQry;
+ H2TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
- if ((cachedQry = twoStepCache.get(cachedQryKey)) != null) {
- checkQueryType(qry, true);
+ if (cachedQry == null) {
+ try {
+ GridCacheTwoStepQuery twoStepQry = GridSqlQuerySplitter.split(
+ connMgr.connectionForThread().connection(newQry.getSchema()),
+ prepared,
+ newQry.getArgs(),
+ newQry.isCollocated(),
+ newQry.isDistributedJoins(),
+ newQry.isEnforceJoinOrder(),
+ newQry.isLocal(),
+ this
+ );
- GridCacheTwoStepQuery twoStepQry = cachedQry.query().copy();
+ List<GridQueryFieldMetadata> meta = H2Utils.meta(stmt.getMetaData());
- List<GridQueryFieldMetadata> meta = cachedQry.meta();
+ cachedQry = new H2TwoStepCachedQuery(meta, twoStepQry);
- return new ParsingResult(prepared, newQry, remainingSql, twoStepQry, cachedQryKey, meta);
+ if (remainingSql == null && !twoStepQry.explain())
+ twoStepCache.putIfAbsent(cachedQryKey, cachedQry);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteSQLException("Failed to bind parameters: [qry=" + newQry.getSql() + ", params=" +
+ Arrays.deepToString(newQry.getArgs()) + "]", IgniteQueryErrorCode.PARSING, e);
+ }
+ catch (SQLException e) {
+ throw new IgniteSQLException(e);
+ }
+ finally {
+ U.close(stmt, log);
+ }
}
- try {
- GridCacheTwoStepQuery twoStepQry = split(prepared, newQry);
+ checkQueryType(qry, true);
- return new ParsingResult(prepared, newQry, remainingSql, twoStepQry,
- cachedQryKey, H2Utils.meta(stmt.getMetaData()));
- }
- catch (IgniteCheckedException e) {
- throw new IgniteSQLException("Failed to bind parameters: [qry=" + newQry.getSql() + ", params=" +
- Arrays.deepToString(newQry.getArgs()) + "]", IgniteQueryErrorCode.PARSING, e);
- }
- catch (SQLException e) {
- throw new IgniteSQLException(e);
- }
- finally {
- U.close(stmt, log);
- }
+ return new ParsingResult(prepared, newQry, remainingSql, cachedQry.query(), cachedQryKey, cachedQry.meta());
}
/**
@@ -2103,44 +2105,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * Split query into two-step query.
- * @param prepared JDBC prepared statement.
- * @param qry Original fields query.
- * @return Two-step query.
- * @throws IgniteCheckedException in case of error inside {@link GridSqlQuerySplitter}.
- * @throws SQLException in case of error inside {@link GridSqlQuerySplitter}.
- */
- private GridCacheTwoStepQuery split(Prepared prepared, SqlFieldsQuery qry) throws IgniteCheckedException,
- SQLException {
- GridCacheTwoStepQuery res = GridSqlQuerySplitter.split(
- connMgr.connectionForThread().connection(qry.getSchema()),
- prepared,
- qry.getArgs(),
- qry.isCollocated(),
- qry.isDistributedJoins(),
- qry.isEnforceJoinOrder(),
- partExtractor);
-
- List<Integer> cacheIds = collectCacheIds(null, res);
-
- if (!F.isEmpty(cacheIds) && res.hasSystemViews()) {
- throw new IgniteSQLException("Normal tables and system views cannot be used in the same query.",
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- }
-
- if (F.isEmpty(cacheIds))
- res.local(true);
- else {
- res.cacheIds(cacheIds);
- res.local(qry.isLocal());
- }
-
- res.pageSize(qry.getPageSize());
-
- return res;
- }
-
- /**
* @param qry Sql fields query.autoStartTx(qry)
* @return {@code True} if need to start transaction.
*/
@@ -2267,8 +2231,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (log.isDebugEnabled())
log.debug("Parsed query: `" + qry.getSql() + "` into two step query: " + twoStepQry);
- twoStepQry.pageSize(qry.getPageSize());
-
if (cancel == null)
cancel = new GridQueryCancel();
@@ -2316,7 +2278,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
parts,
qry.isLazy(),
mvccTracker,
- qry.isDataPageScanEnabled()
+ qry.isDataPageScanEnabled(),
+ qry.getPageSize()
);
QueryCursorImpl<List<?>> cursor = registerAsNewQry
@@ -2428,60 +2391,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * @param cacheIds Cache IDs.
- * @param twoStepQry Query.
- * @throws IllegalStateException if segmented indices used with non-segmented indices.
- */
- private void processCaches(List<Integer> cacheIds, GridCacheTwoStepQuery twoStepQry) {
- if (cacheIds.isEmpty())
- return; // Nothing to check
-
- GridCacheSharedContext sharedCtx = ctx.cache().context();
-
- int expectedParallelism = 0;
- GridCacheContext cctx0 = null;
-
- boolean mvccEnabled = false;
-
- for (int i = 0; i < cacheIds.size(); i++) {
- Integer cacheId = cacheIds.get(i);
-
- GridCacheContext cctx = sharedCtx.cacheContext(cacheId);
-
- assert cctx != null;
-
- if (i == 0) {
- mvccEnabled = cctx.mvccEnabled();
- cctx0 = cctx;
- }
- else if (cctx.mvccEnabled() != mvccEnabled)
- MvccUtils.throwAtomicityModesMismatchException(cctx0, cctx);
-
- if (!cctx.isPartitioned())
- continue;
-
- if (expectedParallelism == 0)
- expectedParallelism = cctx.config().getQueryParallelism();
- else if (cctx.config().getQueryParallelism() != expectedParallelism) {
- throw new IllegalStateException("Using indexes with different parallelism levels in same query is " +
- "forbidden.");
- }
- }
-
- twoStepQry.mvccEnabled(mvccEnabled);
-
- if (twoStepQry.forUpdate()) {
- if (cacheIds.size() != 1)
- throw new IgniteSQLException("SELECT FOR UPDATE is supported only for queries " +
- "that involve single transactional cache.");
-
- if (!mvccEnabled)
- throw new IgniteSQLException("SELECT FOR UPDATE query requires transactional cache " +
- "with MVCC enabled.", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- }
- }
-
- /**
* Registers new class description.
*
* This implementation doesn't support type reregistration.
@@ -3039,47 +2948,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * Collect cache identifiers from two-step query.
- *
- * @param mainCacheId Id of main cache.
- * @param twoStepQry Two-step query.
- * @return Result.
- */
- @Nullable public List<Integer> collectCacheIds(@Nullable Integer mainCacheId, GridCacheTwoStepQuery twoStepQry) {
- LinkedHashSet<Integer> caches0 = new LinkedHashSet<>();
-
- int tblCnt = twoStepQry.tablesCount();
-
- if (mainCacheId != null)
- caches0.add(mainCacheId);
-
- if (tblCnt > 0) {
- for (QueryTable tblKey : twoStepQry.tables()) {
- GridH2Table tbl = schemaMgr.dataTable(tblKey.schema(), tblKey.table());
-
- if (tbl != null) {
- H2Utils.checkAndStartNotStartedCache(ctx, tbl);
-
- int cacheId = tbl.cacheId();
-
- caches0.add(cacheId);
- }
- }
- }
-
- if (caches0.isEmpty())
- return null;
- else {
- //Prohibit usage indices with different numbers of segments in same query.
- List<Integer> cacheIds = new ArrayList<>(caches0);
-
- processCaches(cacheIds, twoStepQry);
-
- return cacheIds;
- }
- }
-
- /**
* @param schemaName Schema.
* @param conn Connection.
* @param prepared Prepared statement.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index d87e7da..484f5b5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -935,19 +935,30 @@ public final class UpdatePlanBuilder {
try (PreparedStatement stmt = conn.prepareStatement(selectQry)) {
H2Utils.bindParameters(stmt, F.asList(fieldsQry.getArgs()));
- GridCacheTwoStepQuery qry = GridSqlQuerySplitter.split(conn,
+ GridCacheTwoStepQuery qry = GridSqlQuerySplitter.split(
+ conn,
GridSqlQueryParser.prepared(stmt),
fieldsQry.getArgs(),
fieldsQry.isCollocated(),
fieldsQry.isDistributedJoins(),
fieldsQry.isEnforceJoinOrder(),
- idx.partitionExtractor());
+ false,
+ idx
+ );
- boolean distributed = qry.skipMergeTable() && qry.mapQueries().size() == 1 &&
+ boolean distributed = !qry.isLocal() && qry.skipMergeTable() && qry.mapQueries().size() == 1 &&
!qry.mapQueries().get(0).hasSubQueries();
- return distributed ? new DmlDistributedPlanInfo(qry.isReplicatedOnly(),
- idx.collectCacheIds(CU.cacheId(cacheName), qry)): null;
+ if (distributed) {
+ List<Integer> cacheIds = H2Utils.collectCacheIds(idx, CU.cacheId(cacheName), qry.tables());
+
+ H2Utils.collectMvccEnabled(idx, cacheIds);
+ H2Utils.checkQuery(idx, cacheIds, qry.mvccEnabled(), qry.forUpdate(), qry.tables());
+
+ return new DmlDistributedPlanInfo(qry.isReplicatedOnly(), cacheIds);
+ }
+ else
+ return null;
}
}
catch (SQLException e) {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index c6e75dd..406c323 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -170,7 +171,7 @@ public class GridSqlQuerySplitter {
* @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
* @param distributedJoins If distributed joins enabled.
* @param enforceJoinOrder Enforce join order.
- * @param partExtractor Partition extractor.
+ * @param idx Indexing.
* @return Two step query.
* @throws SQLException If failed.
* @throws IgniteCheckedException If failed.
@@ -182,7 +183,8 @@ public class GridSqlQuerySplitter {
boolean collocatedGrpBy,
boolean distributedJoins,
boolean enforceJoinOrder,
- PartitionExtractor partExtractor
+ boolean local,
+ IgniteH2Indexing idx
) throws SQLException, IgniteCheckedException {
SplitterContext.set(distributedJoins);
@@ -194,7 +196,8 @@ public class GridSqlQuerySplitter {
collocatedGrpBy,
distributedJoins,
enforceJoinOrder,
- partExtractor
+ local,
+ idx
);
}
finally {
@@ -209,7 +212,7 @@ public class GridSqlQuerySplitter {
* @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
* @param distributedJoins If distributed joins enabled.
* @param enforceJoinOrder Enforce join order.
- * @param partExtractor Partition extractor.
+ * @param idx Indexing.
* @return Two step query.
* @throws SQLException If failed.
* @throws IgniteCheckedException If failed.
@@ -221,7 +224,8 @@ public class GridSqlQuerySplitter {
boolean collocatedGrpBy,
boolean distributedJoins,
boolean enforceJoinOrder,
- PartitionExtractor partExtractor
+ boolean local,
+ IgniteH2Indexing idx
) throws SQLException, IgniteCheckedException {
if (params == null)
params = GridCacheSqlQuery.EMPTY_PARAMS;
@@ -236,8 +240,12 @@ public class GridSqlQuerySplitter {
qry.explain(false);
- GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(params, collocatedGrpBy, distributedJoins,
- partExtractor);
+ GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(
+ params,
+ collocatedGrpBy,
+ distributedJoins,
+ idx.partitionExtractor()
+ );
// Normalization will generate unique aliases for all the table filters in FROM.
// Also it will collect all tables and schemas from the query.
@@ -279,26 +287,27 @@ public class GridSqlQuerySplitter {
distributedJoins = false;
}
- // Setup resulting two step query and return it.
- int paramsCnt = prepared.getParameters().size();
-
- GridCacheTwoStepQuery twoStepQry = new GridCacheTwoStepQuery(originalSql, paramsCnt, splitter.tbls);
-
- twoStepQry.reduceQuery(splitter.rdcSqlQry);
-
- for (GridCacheSqlQuery mapSqlQry : splitter.mapSqlQrys)
- twoStepQry.addMapQuery(mapSqlQry);
+ List<Integer> cacheIds = H2Utils.collectCacheIds(idx, null, splitter.tbls);
+ boolean mvccEnabled = H2Utils.collectMvccEnabled(idx, cacheIds);
- twoStepQry.skipMergeTable(splitter.skipMergeTbl);
- twoStepQry.explain(explain);
- twoStepQry.distributedJoins(distributedJoins);
+ H2Utils.checkQuery(idx, cacheIds, mvccEnabled, forUpdate, splitter.tbls);
- // all map queries must have non-empty derivedPartitions to use this feature.
- twoStepQry.derivedPartitions(splitter.extractor.mergeMapQueries(twoStepQry.mapQueries()));
-
- twoStepQry.forUpdate(forUpdate);
-
- return twoStepQry;
+ // Setup resulting two step query and return it.
+ return new GridCacheTwoStepQuery(
+ originalSql,
+ prepared.getParameters().size(),
+ splitter.tbls,
+ splitter.rdcSqlQry,
+ splitter.mapSqlQrys,
+ splitter.skipMergeTbl,
+ explain,
+ distributedJoins,
+ forUpdate,
+ splitter.extractor.mergeMapQueries(splitter.mapSqlQrys),
+ cacheIds,
+ mvccEnabled,
+ local || F.isEmpty(cacheIds)
+ );
}
/**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3ff6dfe..046840e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -386,6 +386,8 @@ public class GridReduceQueryExecutor {
* @param lazy Lazy execution flag.
* @param mvccTracker Query tracker.
* @param dataPageScanEnabled If data page scan is enabled.
+ * @param forUpdate For update flag.
+ * @param pageSize Page size.
* @return Rows iterator.
*/
@SuppressWarnings({"BusyWait", "IfMayBeConditional"})
@@ -400,7 +402,9 @@ public class GridReduceQueryExecutor {
int[] parts,
boolean lazy,
MvccQueryTracker mvccTracker,
- Boolean dataPageScanEnabled
+ Boolean dataPageScanEnabled,
+ boolean forUpdate,
+ int pageSize
) {
if (qry.isLocal() && parts != null)
parts = null;
@@ -412,9 +416,17 @@ public class GridReduceQueryExecutor {
if (F.isEmpty(params))
params = EMPTY_PARAMS;
- final List<GridCacheSqlQuery> mapQueries = singlePartMode ?
- prepareMapQueryForSinglePartition(qry, params) :
- qry.mapQueries();
+ List<GridCacheSqlQuery> mapQueries;
+
+ if (singlePartMode)
+ mapQueries = prepareMapQueryForSinglePartition(qry, params);
+ else {
+ mapQueries = new ArrayList<>(qry.mapQueries().size());
+
+ // Copy queries here because node ID will be changed below.
+ for (GridCacheSqlQuery mapQry : qry.mapQueries())
+ mapQueries.add(mapQry.copy());
+ }
final boolean isReplicatedOnly = qry.isReplicatedOnly();
@@ -458,7 +470,7 @@ public class GridReduceQueryExecutor {
AffinityTopologyVersion topVer;
- if (qry.forUpdate()) {
+ if (forUpdate) {
// Indexing should have started TX at this point for FOR UPDATE query.
assert mvccEnabled && curTx != null;
@@ -491,8 +503,13 @@ public class GridReduceQueryExecutor {
long qryReqId = qryIdGen.incrementAndGet();
- final ReduceQueryRun r = new ReduceQueryRun(h2.connections().connectionForThread().connection(schemaName),
- mapQueries.size(), qry.pageSize(), sfuFut, dataPageScanEnabled);
+ final ReduceQueryRun r = new ReduceQueryRun(
+ h2.connections().connectionForThread().connection(schemaName),
+ mapQueries.size(),
+ pageSize,
+ sfuFut,
+ dataPageScanEnabled
+ );
Collection<ClusterNode> nodes;
@@ -683,7 +700,7 @@ public class GridReduceQueryExecutor {
final C2<ClusterNode, Message, Message> spec;
- if (qry.forUpdate()) {
+ if (forUpdate) {
final AtomicInteger cnt = new AtomicInteger();
spec = new C2<ClusterNode, Message, Message>() {
@@ -1344,6 +1361,7 @@ public class GridReduceQueryExecutor {
for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
if (mapQry.hasSubQueries()) {
hasSubQries = true;
+
break;
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
index 91ab3e5..ccd14c3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -24,9 +24,10 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
+
+import org.apache.ignite.cache.query.Query;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSelectForUpdateFuture;
-import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.util.typedef.F;
import org.h2.jdbc.JdbcConnection;
import org.jetbrains.annotations.Nullable;
@@ -75,7 +76,7 @@ public class ReduceQueryRun {
idxs = new ArrayList<>(idxsCnt);
- this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
+ this.pageSize = pageSize > 0 ? pageSize : Query.DFLT_PAGE_SIZE;
this.selectForUpdateFut = selectForUpdateFut;
this.dataPageScanEnabled = dataPageScanEnabled;
}