You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/03/01 10:47:38 UTC
[ignite] branch master updated: IGNITE-11210: SQL: Merged DML and
other command plans into a single cache. This closes #6200.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ab01d51 IGNITE-11210: SQL: Merged DML and other command plans into a single cache. This closes #6200.
new 1948ba8 Merge remote-tracking branch 'origin/master'
ab01d51 is described below
commit ab01d51f51d93841f27c162e6a54e85ae911dfe1
Author: devozerov <vo...@gridgain.com>
AuthorDate: Fri Mar 1 13:46:04 2019 +0300
IGNITE-11210: SQL: Merged DML and other command plans into a single cache. This closes #6200.
---
.../processors/query/h2/IgniteH2Indexing.java | 107 +++++----------------
.../internal/processors/query/h2/QueryParser.java | 36 ++++++-
.../processors/query/h2/QueryParserResultDml.java | 16 ++-
3 files changed, 72 insertions(+), 87 deletions(-)
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index acf1136..65a85ba 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -32,7 +32,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
@@ -109,7 +108,6 @@ import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateSingleEntryIt
import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
-import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
@@ -127,7 +125,6 @@ import org.apache.ignite.internal.sql.command.SqlCommand;
import org.apache.ignite.internal.sql.command.SqlCommitTransactionCommand;
import org.apache.ignite.internal.sql.command.SqlRollbackTransactionCommand;
import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -198,17 +195,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** Default number of attempts to re-run DELETE and UPDATE queries in case of concurrent modifications of values. */
private static final int DFLT_UPDATE_RERUN_ATTEMPTS = 4;
- /** Default size for update plan cache. */
- private static final int UPDATE_PLAN_CACHE_SIZE = 1024;
-
/** Cached value of {@code IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION}. */
private final boolean updateInTxAllowed =
Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION);
- /** Update plans cache. */
- private volatile ConcurrentMap<H2CachedStatementKey, UpdatePlan> updatePlanCache =
- new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE);
-
/** Logger. */
@LoggerResource
private IgniteLogger log;
@@ -475,7 +465,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
QueryParserResult parseRes = parser.parse(schemaName, fieldsQry, false);
if (parseRes.isDml()) {
- UpdateResult updRes = executeUpdate(schemaName, parseRes.dml(), fieldsQry, true, filter, cancel);
+ QueryParserResultDml dml = parseRes.dml();
+
+ assert dml != null;
+
+ UpdateResult updRes = executeUpdate(schemaName, dml, fieldsQry, true, filter, cancel);
List<?> updResRow = Collections.singletonList(updRes.counter());
@@ -754,7 +748,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
boolean fail = false;
try {
- UpdatePlan plan = updatePlan(schemaName, dml, null);
+ UpdatePlan plan = dml.plan();
List<List<?>> planRows = plan.createRows(args != null ? args : X.EMPTY_OBJECT_ARRAY);
@@ -1285,12 +1279,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
// Execute.
if (parseRes.isCommand()) {
+ QueryParserResultCommand cmd = parseRes.command();
+
+ assert cmd != null;
+
// Execute command.
FieldsQueryCursor<List<?>> cmdRes = executeCommand(
schemaName,
newQry,
cliCtx,
- parseRes.command()
+ cmd
);
res.add(cmdRes);
@@ -1365,6 +1363,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
boolean fail = false;
try {
+ if (!dml.mvccEnabled() && !updateInTxAllowed && ctx.cache().context().tm().inUserTx()) {
+ throw new IgniteSQLException("DML statements are not allowed inside a transaction over " +
+ "cache(s) with TRANSACTIONAL atomicity mode (change atomicity mode to " +
+ "TRANSACTIONAL_SNAPSHOT or disable this error message with system property " +
+ "\"-DIGNITE_ALLOW_DML_INSIDE_TRANSACTION=true\")");
+ }
+
if (!loc)
return executeUpdateDistributed(schemaName, dml , qry, cancel);
else {
@@ -1537,7 +1542,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
IndexingQueryFilter filter = backupFilter(topVer, parts);
- UpdatePlan plan = updatePlan(schema, dml, fldsQry);
+ UpdatePlan plan = dml.plan();
GridCacheContext planCctx = plan.cacheContext();
@@ -1585,6 +1590,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
);
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @SuppressWarnings("NullableProblems")
@Override public Iterator<List<?>> iterator() {
try {
return res.iterator();
@@ -2125,16 +2131,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
partReservationMgr.onCacheStop(cacheName);
- // Remove cached DML plans.
- Iterator<Map.Entry<H2CachedStatementKey, UpdatePlan>> iter = updatePlanCache.entrySet().iterator();
-
- while (iter.hasNext()) {
- UpdatePlan plan = iter.next().getValue();
-
- if (F.eq(cacheName, plan.cacheContext().name()))
- iter.remove();
- }
-
// Drop schema (needs to be called after callback to DML processor because the latter depends on schema).
schemaMgr.onCacheDestroyed(cacheName, rmvIdx);
@@ -2150,8 +2146,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
private void clearPlanCache() {
parser.clearCache();
-
- updatePlanCache = new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE);
}
/** {@inheritDoc} */
@@ -2188,10 +2182,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @throws IgniteCheckedException If failed.
*/
public void awaitForReadyTopologyVersion(AffinityTopologyVersion topVer) throws IgniteCheckedException {
- IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
-
- if (fut != null)
- fut.get();
+ ctx.cache().context().exchange().affinityReadyFuture(topVer).get();
}
/** {@inheritDoc} */
@@ -2290,7 +2281,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
List<Object[]> argss = fieldsQry0.batchedArguments();
- UpdatePlan plan = updatePlan(schemaName, dml, fieldsQry0);
+ UpdatePlan plan = dml.plan();
GridCacheContext<?, ?> cctx = plan.cacheContext();
@@ -2402,7 +2393,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
long items = 0;
- UpdatePlan plan = updatePlan(schemaName, dml, fieldsQry);
+ UpdatePlan plan = dml.plan();
GridCacheContext<?, ?> cctx = plan.cacheContext();
@@ -2651,6 +2642,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
);
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @SuppressWarnings("NullableProblems")
@Override public Iterator<List<?>> iterator() {
try {
return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), true);
@@ -2666,55 +2658,4 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return DmlUtils.processSelectResult(plan, cur, pageSize);
}
-
- /**
- * Generate SELECT statements to retrieve data for modifications from and find fast UPDATE or DELETE args,
- * if available.
- *
- * @param schemaName Schema.
- * @param dml Command.
- * @param fieldsQry Original fields query.
- * @return Update plan.
- */
- @SuppressWarnings("IfMayBeConditional")
- private UpdatePlan updatePlan(
- String schemaName,
- QueryParserResultDml dml,
- SqlFieldsQuery fieldsQry
- ) throws IgniteCheckedException {
- // Disallow updates on SYSTEM schema.
- if (F.eq(QueryUtils.SCHEMA_SYS, schemaName))
- throw new IgniteSQLException("DML statements are not supported on " + schemaName + " schema",
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-
- // Disallow updates inside transaction if this is not MVCC mode.
- boolean inTx = ctx.cache().context().tm().inUserTx();
-
- if (!dml.mvccEnabled() && !updateInTxAllowed && inTx) {
- throw new IgniteSQLException("DML statements are not allowed inside a transaction over " +
- "cache(s) with TRANSACTIONAL atomicity mode (change atomicity mode to " +
- "TRANSACTIONAL_SNAPSHOT or disable this error message with system property " +
- "\"IGNITE_ALLOW_DML_INSIDE_TRANSACTION\"");
- }
-
- H2CachedStatementKey planKey = new H2CachedStatementKey(schemaName, dml.statement().getSQL(), fieldsQry);
-
- UpdatePlan res = updatePlanCache.get(planKey);
-
- if (res != null)
- return res;
-
- res = UpdatePlanBuilder.planForStatement(
- schemaName,
- dml.statement(),
- dml.mvccEnabled(),
- this,
- fieldsQry
- );
-
- // Don't cache re-runs
- UpdatePlan oldRes = updatePlanCache.putIfAbsent(planKey, res);
-
- return oldRes != null ? oldRes : res;
- }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
index 52d52bc..6ddea41 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
@@ -30,8 +30,11 @@ import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.dml.DmlAstUtils;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst;
@@ -333,7 +336,7 @@ public class QueryParser {
return new QueryParserResult(newQry, remainingQry, null, null, cmd);
}
else if (GridSqlQueryParser.isDml(prepared)) {
- QueryParserResultDml dml = prepareDmlStatement(prepared);
+ QueryParserResultDml dml = prepareDmlStatement(schemaName, qry, prepared);
return new QueryParserResult(newQry, remainingQry, null, dml, null);
}
@@ -475,10 +478,16 @@ public class QueryParser {
/**
* Prepare DML statement.
*
+ * @param schemaName Schema name.
+ * @param qry Query.
* @param prepared Prepared.
* @return Statement.
*/
- private QueryParserResultDml prepareDmlStatement(Prepared prepared) {
+ private QueryParserResultDml prepareDmlStatement(String schemaName, SqlFieldsQuery qry, Prepared prepared) {
+ if (F.eq(QueryUtils.SCHEMA_SYS, schemaName))
+ throw new IgniteSQLException("DML statements are not supported on " + schemaName + " schema",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
// Prepare AST.
GridSqlQueryParser parser = new GridSqlQueryParser(false);
@@ -517,11 +526,32 @@ public class QueryParser {
streamTbl = DmlAstUtils.gridTableForElement(insert.into()).dataTable();
}
+
+ // Create update plan.
+ UpdatePlan plan;
+
+ try {
+ plan = UpdatePlanBuilder.planForStatement(
+ schemaName,
+ stmt,
+ mvccEnabled,
+ idx,
+ qry
+ );
+ }
+ catch (Exception e) {
+ if (e instanceof IgniteSQLException)
+ throw (IgniteSQLException)e;
+ else
+ throw new IgniteSQLException("Failed to prepare update plan.", e);
+ }
+
return new QueryParserResultDml(
stmt,
prepared.getParameters().size(),
mvccEnabled,
- streamTbl
+ streamTbl,
+ plan
);
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
index cf48d8f..2244a3f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.h2;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
import org.jetbrains.annotations.Nullable;
@@ -37,6 +38,9 @@ public class QueryParserResultDml {
/** Streamer table. */
private final GridH2Table streamTbl;
+ /** Update plan. */
+ private final UpdatePlan plan;
+
/**
* Constructor.
*
@@ -44,17 +48,20 @@ public class QueryParserResultDml {
* @param paramsCnt Number of parameters.
* @param mvccEnabled Whether MVCC is enabled.
* @param streamTbl Streamer table.
+ * @param plan Update plan.
*/
public QueryParserResultDml(
GridSqlStatement stmt,
int paramsCnt,
boolean mvccEnabled,
- @Nullable GridH2Table streamTbl
+ @Nullable GridH2Table streamTbl,
+ UpdatePlan plan
) {
this.stmt = stmt;
this.paramsCnt = paramsCnt;
this.mvccEnabled = mvccEnabled;
this.streamTbl = streamTbl;
+ this.plan = plan;
}
/**
@@ -91,4 +98,11 @@ public class QueryParserResultDml {
public int parametersCount() {
return paramsCnt;
}
+
+ /**
+ * @return Update plan.
+ */
+ public UpdatePlan plan() {
+ return plan;
+ }
}