You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/03/01 10:47:38 UTC

[ignite] branch master updated: IGNITE-11210: SQL: Merged DML and other command plans into a single cache. This closes #6200.

This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ab01d51  IGNITE-11210: SQL: Merged DML and other command plans into a single cache. This closes #6200.
     new 1948ba8  Merge remote-tracking branch 'origin/master'
ab01d51 is described below

commit ab01d51f51d93841f27c162e6a54e85ae911dfe1
Author: devozerov <vo...@gridgain.com>
AuthorDate: Fri Mar 1 13:46:04 2019 +0300

    IGNITE-11210: SQL: Merged DML and other command plans into a single cache. This closes #6200.
---
 .../processors/query/h2/IgniteH2Indexing.java      | 107 +++++----------------
 .../internal/processors/query/h2/QueryParser.java  |  36 ++++++-
 .../processors/query/h2/QueryParserResultDml.java  |  16 ++-
 3 files changed, 72 insertions(+), 87 deletions(-)

diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index acf1136..65a85ba 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -32,7 +32,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
@@ -109,7 +108,6 @@ import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateSingleEntryIt
 import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
-import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
@@ -127,7 +125,6 @@ import org.apache.ignite.internal.sql.command.SqlCommand;
 import org.apache.ignite.internal.sql.command.SqlCommitTransactionCommand;
 import org.apache.ignite.internal.sql.command.SqlRollbackTransactionCommand;
 import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -198,17 +195,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** Default number of attempts to re-run DELETE and UPDATE queries in case of concurrent modifications of values. */
     private static final int DFLT_UPDATE_RERUN_ATTEMPTS = 4;
 
-    /** Default size for update plan cache. */
-    private static final int UPDATE_PLAN_CACHE_SIZE = 1024;
-
     /** Cached value of {@code IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION}. */
     private final boolean updateInTxAllowed =
         Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION);
 
-    /** Update plans cache. */
-    private volatile ConcurrentMap<H2CachedStatementKey, UpdatePlan> updatePlanCache =
-        new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE);
-
     /** Logger. */
     @LoggerResource
     private IgniteLogger log;
@@ -475,7 +465,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             QueryParserResult parseRes = parser.parse(schemaName, fieldsQry, false);
 
             if (parseRes.isDml()) {
-                UpdateResult updRes = executeUpdate(schemaName, parseRes.dml(), fieldsQry, true, filter, cancel);
+                QueryParserResultDml dml = parseRes.dml();
+
+                assert dml != null;
+
+                UpdateResult updRes = executeUpdate(schemaName, dml, fieldsQry, true, filter, cancel);
 
                 List<?> updResRow = Collections.singletonList(updRes.counter());
 
@@ -754,7 +748,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         boolean fail = false;
 
         try {
-            UpdatePlan plan = updatePlan(schemaName, dml, null);
+            UpdatePlan plan = dml.plan();
 
             List<List<?>> planRows = plan.createRows(args != null ? args : X.EMPTY_OBJECT_ARRAY);
 
@@ -1285,12 +1279,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                 // Execute.
                 if (parseRes.isCommand()) {
+                    QueryParserResultCommand cmd = parseRes.command();
+
+                    assert cmd != null;
+
                     // Execute command.
                     FieldsQueryCursor<List<?>> cmdRes = executeCommand(
                         schemaName,
                         newQry,
                         cliCtx,
-                        parseRes.command()
+                        cmd
                     );
 
                     res.add(cmdRes);
@@ -1365,6 +1363,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             boolean fail = false;
 
             try {
+                if (!dml.mvccEnabled() && !updateInTxAllowed && ctx.cache().context().tm().inUserTx()) {
+                    throw new IgniteSQLException("DML statements are not allowed inside a transaction over " +
+                        "cache(s) with TRANSACTIONAL atomicity mode (change atomicity mode to " +
+                        "TRANSACTIONAL_SNAPSHOT or disable this error message with system property " +
+                        "\"-DIGNITE_ALLOW_DML_INSIDE_TRANSACTION=true\")");
+                }
+
                 if (!loc)
                     return executeUpdateDistributed(schemaName, dml , qry, cancel);
                 else {
@@ -1537,7 +1542,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         IndexingQueryFilter filter = backupFilter(topVer, parts);
 
-        UpdatePlan plan = updatePlan(schema, dml, fldsQry);
+        UpdatePlan plan = dml.plan();
 
         GridCacheContext planCctx = plan.cacheContext();
 
@@ -1585,6 +1590,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             );
 
             cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @SuppressWarnings("NullableProblems")
                 @Override public Iterator<List<?>> iterator() {
                     try {
                         return res.iterator();
@@ -2125,16 +2131,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         partReservationMgr.onCacheStop(cacheName);
 
-        // Remove cached DML plans.
-        Iterator<Map.Entry<H2CachedStatementKey, UpdatePlan>> iter = updatePlanCache.entrySet().iterator();
-
-        while (iter.hasNext()) {
-            UpdatePlan plan = iter.next().getValue();
-
-            if (F.eq(cacheName, plan.cacheContext().name()))
-                iter.remove();
-        }
-
         // Drop schema (needs to be called after callback to DML processor because the latter depends on schema).
         schemaMgr.onCacheDestroyed(cacheName, rmvIdx);
 
@@ -2150,8 +2146,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     private void clearPlanCache() {
         parser.clearCache();
-
-        updatePlanCache = new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE);
     }
 
     /** {@inheritDoc} */
@@ -2188,10 +2182,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException If failed.
      */
     public void awaitForReadyTopologyVersion(AffinityTopologyVersion topVer) throws IgniteCheckedException {
-        IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
-
-        if (fut != null)
-            fut.get();
+        ctx.cache().context().exchange().affinityReadyFuture(topVer).get();
     }
 
     /** {@inheritDoc} */
@@ -2290,7 +2281,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             List<Object[]> argss = fieldsQry0.batchedArguments();
 
-            UpdatePlan plan = updatePlan(schemaName, dml, fieldsQry0);
+            UpdatePlan plan = dml.plan();
 
             GridCacheContext<?, ?> cctx = plan.cacheContext();
 
@@ -2402,7 +2393,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         long items = 0;
 
-        UpdatePlan plan = updatePlan(schemaName, dml, fieldsQry);
+        UpdatePlan plan = dml.plan();
 
         GridCacheContext<?, ?> cctx = plan.cacheContext();
 
@@ -2651,6 +2642,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             );
 
             cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @SuppressWarnings("NullableProblems")
                 @Override public Iterator<List<?>> iterator() {
                     try {
                         return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), true);
@@ -2666,55 +2658,4 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         return DmlUtils.processSelectResult(plan, cur, pageSize);
     }
-
-    /**
-     * Generate SELECT statements to retrieve data for modifications from and find fast UPDATE or DELETE args,
-     * if available.
-     *
-     * @param schemaName Schema.
-     * @param dml Command.
-     * @param fieldsQry Original fields query.
-     * @return Update plan.
-     */
-    @SuppressWarnings("IfMayBeConditional")
-    private UpdatePlan updatePlan(
-        String schemaName,
-        QueryParserResultDml dml,
-        SqlFieldsQuery fieldsQry
-    ) throws IgniteCheckedException {
-        // Disallow updates on SYSTEM schema.
-        if (F.eq(QueryUtils.SCHEMA_SYS, schemaName))
-            throw new IgniteSQLException("DML statements are not supported on " + schemaName + " schema",
-                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-
-        // Disallow updates inside transaction if this is not MVCC mode.
-        boolean inTx = ctx.cache().context().tm().inUserTx();
-
-        if (!dml.mvccEnabled() && !updateInTxAllowed && inTx) {
-            throw new IgniteSQLException("DML statements are not allowed inside a transaction over " +
-                "cache(s) with TRANSACTIONAL atomicity mode (change atomicity mode to " +
-                "TRANSACTIONAL_SNAPSHOT or disable this error message with system property " +
-                "\"IGNITE_ALLOW_DML_INSIDE_TRANSACTION\"");
-        }
-
-        H2CachedStatementKey planKey = new H2CachedStatementKey(schemaName, dml.statement().getSQL(), fieldsQry);
-
-        UpdatePlan res = updatePlanCache.get(planKey);
-
-        if (res != null)
-            return res;
-
-        res = UpdatePlanBuilder.planForStatement(
-            schemaName,
-            dml.statement(),
-            dml.mvccEnabled(),
-            this,
-            fieldsQry
-        );
-
-        // Don't cache re-runs
-        UpdatePlan oldRes = updatePlanCache.putIfAbsent(planKey, res);
-
-        return oldRes != null ? oldRes : res;
-    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
index 52d52bc..6ddea41 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
@@ -30,8 +30,11 @@ import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlAstUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst;
@@ -333,7 +336,7 @@ public class QueryParser {
                 return new QueryParserResult(newQry, remainingQry, null, null, cmd);
             }
             else if (GridSqlQueryParser.isDml(prepared)) {
-                QueryParserResultDml dml = prepareDmlStatement(prepared);
+                QueryParserResultDml dml = prepareDmlStatement(schemaName, qry, prepared);
 
                 return new QueryParserResult(newQry, remainingQry, null, dml, null);
             }
@@ -475,10 +478,16 @@ public class QueryParser {
     /**
      * Prepare DML statement.
      *
+     * @param schemaName Schema name.
+     * @param qry Query.
      * @param prepared Prepared.
      * @return Statement.
      */
-    private QueryParserResultDml prepareDmlStatement(Prepared prepared) {
+    private QueryParserResultDml prepareDmlStatement(String schemaName, SqlFieldsQuery qry, Prepared prepared) {
+        if (F.eq(QueryUtils.SCHEMA_SYS, schemaName))
+            throw new IgniteSQLException("DML statements are not supported on " + schemaName + " schema",
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
         // Prepare AST.
         GridSqlQueryParser parser = new GridSqlQueryParser(false);
 
@@ -517,11 +526,32 @@ public class QueryParser {
             streamTbl = DmlAstUtils.gridTableForElement(insert.into()).dataTable();
         }
 
+
+        // Create update plan.
+        UpdatePlan plan;
+
+        try {
+            plan = UpdatePlanBuilder.planForStatement(
+                schemaName,
+                stmt,
+                mvccEnabled,
+                idx,
+                qry
+            );
+        }
+        catch (Exception e) {
+            if (e instanceof IgniteSQLException)
+                throw (IgniteSQLException)e;
+            else
+                throw new IgniteSQLException("Failed to prepare update plan.", e);
+        }
+
         return new QueryParserResultDml(
             stmt,
             prepared.getParameters().size(),
             mvccEnabled,
-            streamTbl
+            streamTbl,
+            plan
         );
     }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
index cf48d8f..2244a3f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResultDml.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
 import org.jetbrains.annotations.Nullable;
@@ -37,6 +38,9 @@ public class QueryParserResultDml {
     /** Streamer table. */
     private final GridH2Table streamTbl;
 
+    /** Update plan. */
+    private final UpdatePlan plan;
+
     /**
      * Constructor.
      *
@@ -44,17 +48,20 @@ public class QueryParserResultDml {
      * @param paramsCnt Number of parameters.
      * @param mvccEnabled Whether MVCC is enabled.
      * @param streamTbl Streamer table.
+     * @param plan Update plan.
      */
     public QueryParserResultDml(
         GridSqlStatement stmt,
         int paramsCnt,
         boolean mvccEnabled,
-        @Nullable GridH2Table streamTbl
+        @Nullable GridH2Table streamTbl,
+        UpdatePlan plan
     ) {
         this.stmt = stmt;
         this.paramsCnt = paramsCnt;
         this.mvccEnabled = mvccEnabled;
         this.streamTbl = streamTbl;
+        this.plan = plan;
     }
 
     /**
@@ -91,4 +98,11 @@ public class QueryParserResultDml {
     public int parametersCount() {
         return paramsCnt;
     }
+
+    /**
+     * @return Update plan.
+     */
+    public UpdatePlan plan() {
+        return plan;
+    }
 }