You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/09 15:03:12 UTC
[ignite] branch master updated: IGNITE-12275: SQL: Moved
non-SELECT/DML commands processing to separate class. This closes #6071.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 88fcc92 IGNITE-12275: SQL: Moved non-SELECT/DML commands processing to separate class. This closes #6071.
88fcc92 is described below
commit 88fcc92243df12bff1d18742211062d5e1d7dee4
Author: devozerov <pp...@gmail.com>
AuthorDate: Sat Feb 9 18:02:37 2019 +0300
IGNITE-12275: SQL: Moved non-SELECT/DML commands processing to separate class. This closes #6071.
---
...tementsProcessor.java => CommandProcessor.java} | 323 ++++++++++++++++----
.../processors/query/h2/CommandResult.java | 58 ++++
.../processors/query/h2/IgniteH2Indexing.java | 324 +++++----------------
.../processors/query/h2/ParsingResult.java | 45 ++-
.../cache/index/H2DynamicTableSelfTest.java | 4 +-
5 files changed, 435 insertions(+), 319 deletions(-)
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
similarity index 72%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
index 84dfea2..8c52296 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.h2.ddl;
+package org.apache.ignite.internal.processors.query.h2;
import java.util.ArrayList;
import java.util.Collections;
@@ -28,28 +28,41 @@ import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCluster;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.cache.query.BulkLoadContextCursor;
import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadParser;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.NestedTxMode;
import org.apache.ignite.internal.processors.query.QueryEntityEx;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.h2.H2Utils;
-import org.apache.ignite.internal.processors.query.h2.SchemaManager;
+import org.apache.ignite.internal.processors.query.RunningQueryManager;
+import org.apache.ignite.internal.processors.query.SqlClientContext;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlBulkLoadDataConverter;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlterTableAddColumn;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlterTableDropColumn;
@@ -58,20 +71,26 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateIndex;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlCreateTable;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropIndex;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDropTable;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.sql.command.SqlAlterTableCommand;
import org.apache.ignite.internal.sql.command.SqlAlterUserCommand;
+import org.apache.ignite.internal.sql.command.SqlBeginTransactionCommand;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
import org.apache.ignite.internal.sql.command.SqlCommand;
+import org.apache.ignite.internal.sql.command.SqlCommitTransactionCommand;
import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand;
import org.apache.ignite.internal.sql.command.SqlCreateUserCommand;
import org.apache.ignite.internal.sql.command.SqlDropIndexCommand;
import org.apache.ignite.internal.sql.command.SqlDropUserCommand;
import org.apache.ignite.internal.sql.command.SqlIndexColumn;
+import org.apache.ignite.internal.sql.command.SqlRollbackTransactionCommand;
+import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.h2.command.Prepared;
import org.h2.command.ddl.AlterTableAlterColumn;
@@ -82,20 +101,29 @@ import org.h2.command.ddl.DropTable;
import org.h2.table.Column;
import org.h2.value.DataType;
import org.h2.value.Value;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart;
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser.PARAM_WRAP_VALUE;
/**
- * DDL statements processor.<p>
- * Contains higher level logic to handle operations as a whole and communicate with the client.
+ * Processor responsible for execution of all non-SELECT and non-DML commands.
*/
-public class DdlStatementsProcessor {
+public class CommandProcessor {
/** Kernal context. */
- private GridKernalContext ctx;
+ private final GridKernalContext ctx;
- /** Indexing. */
- private SchemaManager schemaMgr;
+ /** Schema manager. */
+ private final SchemaManager schemaMgr;
+
+ /** Running query manager. */
+ private final RunningQueryManager runningQryMgr;
+
+ /** Logger. */
+ private final IgniteLogger log;
/** Is backward compatible handling of UUID through DDL enabled. */
private static final boolean handleUuidAsByte =
@@ -107,27 +135,19 @@ public class DdlStatementsProcessor {
* @param ctx Kernal context.
* @param schemaMgr Schema manager.
*/
- public DdlStatementsProcessor(GridKernalContext ctx, SchemaManager schemaMgr) {
+ public CommandProcessor(GridKernalContext ctx, SchemaManager schemaMgr, RunningQueryManager runningQryMgr) {
this.ctx = ctx;
this.schemaMgr = schemaMgr;
- }
+ this.runningQryMgr = runningQryMgr;
- /**
- * Initialize message handlers and this' fields needed for further operation.
- *
- * @param ctx Kernal context.
- * @param schemaMgr Schema manager.
- */
- public void start(final GridKernalContext ctx, SchemaManager schemaMgr) {
- this.ctx = ctx;
- this.schemaMgr = schemaMgr;
+ log = ctx.log(CommandProcessor.class);
}
/**
* @param cmd Command.
* @return {@code True} if this is supported DDL command.
*/
- public static boolean isDdlCommand(SqlCommand cmd) {
+ private static boolean isDdl(SqlCommand cmd) {
return cmd instanceof SqlCreateIndexCommand
|| cmd instanceof SqlDropIndexCommand
|| cmd instanceof SqlAlterTableCommand
@@ -137,13 +157,54 @@ public class DdlStatementsProcessor {
}
/**
+ * Execute command.
+ *
+ * @param qry Query.
+ * @param cmdNative Native command (if any).
+ * @param cmdH2 H2 command (if any).
+ * @param cliCtx Client context.
+ * @param qryId Running query ID.
+ * @return Result.
+ */
+ public CommandResult runCommand(SqlFieldsQuery qry, SqlCommand cmdNative, GridSqlStatement cmdH2,
+ @Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException {
+ assert cmdNative != null || cmdH2 != null;
+
+ // Do execute.
+ FieldsQueryCursor<List<?>> res = H2Utils.zeroCursor();
+ boolean unregister = true;
+
+ if (cmdNative != null) {
+ assert cmdH2 == null;
+
+ if (isDdl(cmdNative))
+ runCommandNativeDdl(qry.getSql(), cmdNative);
+ else if (cmdNative instanceof SqlBulkLoadCommand) {
+ res = processBulkLoadCommand((SqlBulkLoadCommand) cmdNative, qryId);
+
+ unregister = false;
+ }
+ else if (cmdNative instanceof SqlSetStreamingCommand)
+ processSetStreamingCommand((SqlSetStreamingCommand)cmdNative, cliCtx);
+ else
+ processTxCommand(cmdNative, qry);
+ }
+ else {
+ assert cmdH2 != null;
+
+ runCommandH2(qry.getSql(), cmdH2);
+ }
+
+ return new CommandResult(res, unregister);
+ }
+
+ /**
* Run DDL statement.
*
* @param sql Original SQL.
* @param cmd Command.
- * @return Result.
*/
- public FieldsQueryCursor<List<?>> runDdlStatement(String sql, SqlCommand cmd) {
+ private void runCommandNativeDdl(String sql, SqlCommand cmd) {
IgniteInternalFuture fut = null;
try {
@@ -260,8 +321,6 @@ public class DdlStatementsProcessor {
if (fut != null)
fut.get();
-
- return H2Utils.zeroCursor();
}
catch (SchemaOperationException e) {
throw convert(e);
@@ -278,20 +337,17 @@ public class DdlStatementsProcessor {
* Execute DDL statement.
*
* @param sql SQL.
- * @param prepared Prepared.
- * @return Cursor on query results.
+ * @param cmdH2 Command.
*/
@SuppressWarnings({"unchecked"})
- public FieldsQueryCursor<List<?>> runDdlStatement(String sql, Prepared prepared) {
+ private void runCommandH2(String sql, GridSqlStatement cmdH2) {
IgniteInternalFuture fut = null;
try {
finishActiveTxIfNecessary();
- GridSqlStatement stmt0 = new GridSqlQueryParser(false).parse(prepared);
-
- if (stmt0 instanceof GridSqlCreateIndex) {
- GridSqlCreateIndex cmd = (GridSqlCreateIndex)stmt0;
+ if (cmdH2 instanceof GridSqlCreateIndex) {
+ GridSqlCreateIndex cmd = (GridSqlCreateIndex)cmdH2;
isDdlOnSchemaSupported(cmd.schemaName());
@@ -329,8 +385,8 @@ public class DdlStatementsProcessor {
fut = ctx.query().dynamicIndexCreate(tbl.cacheName(), cmd.schemaName(), typeDesc.tableName(),
newIdx, cmd.ifNotExists(), 0);
}
- else if (stmt0 instanceof GridSqlDropIndex) {
- GridSqlDropIndex cmd = (GridSqlDropIndex) stmt0;
+ else if (cmdH2 instanceof GridSqlDropIndex) {
+ GridSqlDropIndex cmd = (GridSqlDropIndex) cmdH2;
isDdlOnSchemaSupported(cmd.schemaName());
@@ -350,10 +406,10 @@ public class DdlStatementsProcessor {
cmd.indexName());
}
}
- else if (stmt0 instanceof GridSqlCreateTable) {
+ else if (cmdH2 instanceof GridSqlCreateTable) {
ctx.security().authorize(null, SecurityPermission.CACHE_CREATE, null);
- GridSqlCreateTable cmd = (GridSqlCreateTable)stmt0;
+ GridSqlCreateTable cmd = (GridSqlCreateTable)cmdH2;
isDdlOnSchemaSupported(cmd.schemaName());
@@ -383,10 +439,10 @@ public class DdlStatementsProcessor {
cmd.writeSynchronizationMode(), cmd.backups(), cmd.ifNotExists(), cmd.encrypted());
}
}
- else if (stmt0 instanceof GridSqlDropTable) {
+ else if (cmdH2 instanceof GridSqlDropTable) {
ctx.security().authorize(null, SecurityPermission.CACHE_DESTROY, null);
- GridSqlDropTable cmd = (GridSqlDropTable)stmt0;
+ GridSqlDropTable cmd = (GridSqlDropTable)cmdH2;
isDdlOnSchemaSupported(cmd.schemaName());
@@ -400,8 +456,8 @@ public class DdlStatementsProcessor {
else
ctx.query().dynamicTableDrop(tbl.cacheName(), cmd.tableName(), cmd.ifExists());
}
- else if (stmt0 instanceof GridSqlAlterTableAddColumn) {
- GridSqlAlterTableAddColumn cmd = (GridSqlAlterTableAddColumn)stmt0;
+ else if (cmdH2 instanceof GridSqlAlterTableAddColumn) {
+ GridSqlAlterTableAddColumn cmd = (GridSqlAlterTableAddColumn)cmdH2;
isDdlOnSchemaSupported(cmd.schemaName());
@@ -455,8 +511,8 @@ public class DdlStatementsProcessor {
}
}
}
- else if (stmt0 instanceof GridSqlAlterTableDropColumn) {
- GridSqlAlterTableDropColumn cmd = (GridSqlAlterTableDropColumn)stmt0;
+ else if (cmdH2 instanceof GridSqlAlterTableDropColumn) {
+ GridSqlAlterTableDropColumn cmd = (GridSqlAlterTableDropColumn)cmdH2;
isDdlOnSchemaSupported(cmd.schemaName());
@@ -519,13 +575,6 @@ public class DdlStatementsProcessor {
if (fut != null)
fut.get();
-
- QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
- (Collections.singletonList(0L)), null, false);
-
- resCur.fieldsMeta(UPDATE_RESULT_META);
-
- return resCur;
}
catch (SchemaOperationException e) {
U.error(null, "DDL operation failure", e);
@@ -742,7 +791,7 @@ public class DdlStatementsProcessor {
* @param cmd Statement.
* @return Whether {@code cmd} is a DDL statement we're able to handle.
*/
- public static boolean isDdlStatement(Prepared cmd) {
+ public static boolean isCommand(Prepared cmd) {
return cmd instanceof CreateIndex || cmd instanceof DropIndex || cmd instanceof CreateTable ||
cmd instanceof DropTable || cmd instanceof AlterTableAlterColumn;
}
@@ -765,4 +814,170 @@ public class DdlStatementsProcessor {
return DataType.getTypeClassName(type);
}
}
-}
+
+ /**
+ * Process transactional command.
+ * @param cmd Command.
+ * @param qry Query.
+ * @throws IgniteCheckedException if failed.
+ */
+ private void processTxCommand(SqlCommand cmd, SqlFieldsQuery qry)
+ throws IgniteCheckedException {
+ NestedTxMode nestedTxMode = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx)qry).getNestedTxMode() :
+ NestedTxMode.DEFAULT;
+
+ GridNearTxLocal tx = tx(ctx);
+
+ if (cmd instanceof SqlBeginTransactionCommand) {
+ if (!mvccEnabled(ctx))
+ throw new IgniteSQLException("MVCC must be enabled in order to start transaction.",
+ IgniteQueryErrorCode.MVCC_DISABLED);
+
+ if (tx != null) {
+ if (nestedTxMode == null)
+ nestedTxMode = NestedTxMode.DEFAULT;
+
+ switch (nestedTxMode) {
+ case COMMIT:
+ doCommit(tx);
+
+ txStart(ctx, qry.getTimeout());
+
+ break;
+
+ case IGNORE:
+ log.warning("Transaction has already been started, ignoring BEGIN command.");
+
+ break;
+
+ case ERROR:
+ throw new IgniteSQLException("Transaction has already been started.",
+ IgniteQueryErrorCode.TRANSACTION_EXISTS);
+
+ default:
+ throw new IgniteSQLException("Unexpected nested transaction handling mode: " +
+ nestedTxMode.name());
+ }
+ }
+ else
+ txStart(ctx, qry.getTimeout());
+ }
+ else if (cmd instanceof SqlCommitTransactionCommand) {
+ // Do nothing if there's no transaction.
+ if (tx != null)
+ doCommit(tx);
+ }
+ else {
+ assert cmd instanceof SqlRollbackTransactionCommand;
+
+ // Do nothing if there's no transaction.
+ if (tx != null)
+ doRollback(tx);
+ }
+ }
+
+ /**
+ * Commit and properly close transaction.
+ * @param tx Transaction.
+ * @throws IgniteCheckedException if failed.
+ */
+ private void doCommit(@NotNull GridNearTxLocal tx) throws IgniteCheckedException {
+ try {
+ tx.commit();
+ }
+ finally {
+ closeTx(tx);
+ }
+ }
+
+ /**
+ * Rollback and properly close transaction.
+ * @param tx Transaction.
+ * @throws IgniteCheckedException if failed.
+ */
+ public void doRollback(@NotNull GridNearTxLocal tx) throws IgniteCheckedException {
+ try {
+ tx.rollback();
+ }
+ finally {
+ closeTx(tx);
+ }
+ }
+
+ /**
+ * Properly close transaction.
+ * @param tx Transaction.
+ * @throws IgniteCheckedException if failed.
+ */
+ private void closeTx(@NotNull GridNearTxLocal tx) throws IgniteCheckedException {
+ try {
+ tx.close();
+ }
+ finally {
+ ctx.cache().context().tm().resetContext();
+ }
+ }
+
+ /**
+ * Process SET STREAMING command.
+ *
+ * @param cmd Command.
+ * @param cliCtx Client context.
+ */
+ private void processSetStreamingCommand(SqlSetStreamingCommand cmd,
+ @Nullable SqlClientContext cliCtx) {
+ if (cliCtx == null)
+ throw new IgniteSQLException("SET STREAMING command can only be executed from JDBC or ODBC driver.");
+
+ if (cmd.isTurnOn())
+ cliCtx.enableStreaming(
+ cmd.allowOverwrite(),
+ cmd.flushFrequency(),
+ cmd.perNodeBufferSize(),
+ cmd.perNodeParallelOperations(),
+ cmd.isOrdered()
+ );
+ else
+ cliCtx.disableStreaming();
+ }
+
+ /**
+ * Process bulk load COPY command.
+ *
+ * @param cmd The command.
+ * @param qryId Query id.
+ * @return The context (which is the result of the first request/response).
+ * @throws IgniteCheckedException If something failed.
+ */
+ private FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd, Long qryId)
+ throws IgniteCheckedException {
+ if (cmd.packetSize() == null)
+ cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE);
+
+ GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName());
+
+ if (tbl == null) {
+ throw new IgniteSQLException("Table does not exist: " + cmd.tableName(),
+ IgniteQueryErrorCode.TABLE_NOT_FOUND);
+ }
+
+ H2Utils.checkAndStartNotStartedCache(ctx, tbl);
+
+ UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl);
+
+ IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new DmlBulkLoadDataConverter(plan);
+
+ IgniteDataStreamer<Object, Object> streamer = ctx.grid().dataStreamer(tbl.cacheName());
+
+ BulkLoadCacheWriter outputWriter = new BulkLoadStreamerWriter(streamer);
+
+ BulkLoadParser inputParser = BulkLoadParser.createParser(cmd.inputFormat());
+
+ BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter,
+ runningQryMgr, qryId);
+
+ BulkLoadAckClientParameters params = new BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize());
+
+ return new BulkLoadContextCursor(processor, params);
+ }
+}
\ No newline at end of file
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandResult.java
new file mode 100644
index 0000000..a025bb1
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandResult.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+
+import java.util.List;
+
+/**
+ * Command execution result.
+ */
+public class CommandResult {
+ /** Cursor. */
+ private final FieldsQueryCursor<List<?>> cur;
+
+ /** Whether running query should be unregistered. */
+ private final boolean unregisterRunningQry;
+
+ /**
+ * Constructor.
+ *
+ * @param cur Cursor.
+ * @param unregisterRunningQry Whether running query should be unregistered.
+ */
+ public CommandResult(FieldsQueryCursor<List<?>> cur, boolean unregisterRunningQry) {
+ this.cur = cur;
+ this.unregisterRunningQry = unregisterRunningQry;
+ }
+
+ /**
+ * @return Cursor.
+ */
+ public FieldsQueryCursor<List<?>> cursor() {
+ return cur;
+ }
+
+ /**
+ * @return Whether running query should be unregistered.
+ */
+ public boolean unregisterRunningQuery() {
+ return unregisterRunningQry;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index afdd9e5..f6867d8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -42,7 +42,6 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheServerNotFoundException;
-import org.apache.ignite.cache.query.BulkLoadContextCursor;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -54,11 +53,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadParser;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
@@ -96,7 +90,6 @@ import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.NestedTxMode;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QueryHistoryMetrics;
import org.apache.ignite.internal.processors.query.QueryHistoryMetricsKey;
@@ -106,7 +99,6 @@ import org.apache.ignite.internal.processors.query.RunningQueryManager;
import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
import org.apache.ignite.internal.processors.query.h2.affinity.H2PartitionResolver;
-import org.apache.ignite.internal.processors.query.h2.dml.DmlBulkLoadDataConverter;
import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateResultsIterator;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateSingleEntryIterator;
@@ -125,7 +117,6 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccInnerIO;
import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccLeafIO;
-import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
@@ -165,7 +156,6 @@ import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
-import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
import org.apache.ignite.internal.util.typedef.F;
@@ -194,7 +184,6 @@ import org.h2.engine.Session;
import org.h2.engine.SysProperties;
import org.h2.table.IndexColumn;
import org.h2.util.JdbcUtils;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.lang.Boolean.FALSE;
@@ -251,7 +240,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION);
/** Update plans cache. */
- private final ConcurrentMap<H2CachedStatementKey, UpdatePlan> updatePlanCache =
+ private volatile ConcurrentMap<H2CachedStatementKey, UpdatePlan> updatePlanCache =
new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE);
/** Logger. */
@@ -285,8 +274,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** Query context registry. */
private final QueryContextRegistry qryCtxRegistry = new QueryContextRegistry();
- /** */
- private DdlStatementsProcessor ddlProc;
+ /** Processor to execute commands which are neither SELECT, nor DML. */
+ private CommandProcessor cmdProc;
/** Partition reservation manager. */
private PartitionReservationManager partReservationMgr;
@@ -295,7 +284,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private PartitionExtractor partExtractor;
/** Running query manager. */
- private RunningQueryManager runningQueryMgr;
+ private RunningQueryManager runningQryMgr;
/** */
private volatile GridBoundedConcurrentLinkedHashMap<H2TwoStepCachedQueryKey, H2TwoStepCachedQuery> twoStepCache =
@@ -414,7 +403,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
boolean ifTblExists, boolean ifColNotExists) throws IgniteCheckedException {
schemaMgr.addColumn(schemaName, tblName, cols, ifTblExists, ifColNotExists);
- clearCachedQueries();
+ clearPlanCache();
}
/** {@inheritDoc} */
@@ -422,7 +411,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
boolean ifColExists) throws IgniteCheckedException {
schemaMgr.dropColumn(schemaName, tblName, cols, ifTblExists, ifColExists);
- clearCachedQueries();
+ clearPlanCache();
}
/**
@@ -554,7 +543,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META, new IgniteSingletonIterator<>(updResRow));
}
- else if (DdlStatementsProcessor.isDdlStatement(p)) {
+ else if (CommandProcessor.isCommand(p)) {
throw new IgniteSQLException("DDL statements are supported for the whole cluster only.",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
@@ -823,7 +812,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@SuppressWarnings({"unchecked", "Anonymous2MethodRef"})
private long streamQuery0(String qry, String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt,
final Object[] args) throws IgniteCheckedException {
- Long qryId = runningQueryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
+ Long qryId = runningQryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
boolean fail = false;
@@ -912,7 +901,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
throw e;
}
finally {
- runningQueryMgr.unregister(qryId, fail);
+ runningQryMgr.unregister(qryId, fail);
}
}
@@ -1439,7 +1428,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
SqlFieldsQuery newQry = cloneFieldsQuery(qry).setSql(parser.lastCommandSql());
- return new ParsingResult(newQry, leadingCmd, parser.remainingSql());
+ return new ParsingResult(newQry, leadingCmd, null, parser.remainingSql());
}
catch (SqlStrictParseException e) {
throw new IgniteSQLException(e.getMessage(), IgniteQueryErrorCode.PARSING, e);
@@ -1463,38 +1452,37 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * Executes a query natively.
+ * Execute command.
*
* @param schemaName Schema name.
* @param qry Query.
- * @param cmd Parsed command corresponding to query.
- * @param cliCtx Client context, or {@code null} if not applicable.
- * @return Result cursors.
+ * @param cliCtx CLient context.
+ * @param cmdNative Command (native).
+ * @param cmdH2 Command (H2).
+ * @return Result.
*/
- private List<FieldsQueryCursor<List<?>>> queryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry,
- SqlCommand cmd, @Nullable SqlClientContext cliCtx) {
- boolean fail = false;
- boolean unregister = true;
+ public FieldsQueryCursor<List<?>> executeCommand(
+ String schemaName,
+ SqlFieldsQuery qry,
+ @Nullable SqlClientContext cliCtx,
+ SqlCommand cmdNative,
+ GridSqlStatement cmdH2
+ ) {
+ if (qry.isLocal()) {
+ throw new IgniteSQLException("DDL statements are not supported for LOCAL caches",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+ }
Long qryId = registerRunningQuery(schemaName, null, qry.getSql(), qry.isLocal(), true);
- try {
- FieldsQueryCursor<List<?>> cur;
+ boolean fail = false;
- if (DdlStatementsProcessor.isDdlCommand(cmd))
- cur = ddlProc.runDdlStatement(qry.getSql(), cmd);
- else if (cmd instanceof SqlBulkLoadCommand) {
- // Query will be unregistered when cursor is closed.
- unregister = false;
+ CommandResult res = null;
- cur = processBulkLoadCommand((SqlBulkLoadCommand) cmd, qryId);
- }
- else if (cmd instanceof SqlSetStreamingCommand)
- cur = processSetStreamingCommand((SqlSetStreamingCommand)cmd, cliCtx);
- else
- cur = processTxCommand(cmd, qry);
+ try {
+ res = cmdProc.runCommand(qry, cmdNative, cmdH2, cliCtx, qryId);
- return Collections.singletonList(cur);
+ return res.cursor();
}
catch (IgniteCheckedException e) {
fail = true;
@@ -1503,38 +1491,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
", err=" + e.getMessage() + ']', e);
}
finally {
- if (unregister || fail)
- runningQueryMgr.unregister(qryId, fail);
+ if (fail || (res != null && res.unregisterRunningQuery()))
+ runningQryMgr.unregister(qryId, fail);
}
}
/**
- * Process SET STREAMING command.
- *
- * @param cmd Command.
- * @param cliCtx Client context.
- * @return Cursor.
- */
- private FieldsQueryCursor<List<?>> processSetStreamingCommand(SqlSetStreamingCommand cmd,
- @Nullable SqlClientContext cliCtx) {
- if (cliCtx == null)
- throw new IgniteSQLException("SET STREAMING command can only be executed from JDBC or ODBC driver.");
-
- if (cmd.isTurnOn())
- cliCtx.enableStreaming(
- cmd.allowOverwrite(),
- cmd.flushFrequency(),
- cmd.perNodeBufferSize(),
- cmd.perNodeParallelOperations(),
- cmd.isOrdered()
- );
- else
- cliCtx.disableStreaming();
-
- return H2Utils.zeroCursor();
- }
-
- /**
* Check expected statement type (when it is set by JDBC) and given statement type.
*
* @param qry Query.
@@ -1548,151 +1510,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
IgniteQueryErrorCode.STMT_TYPE_MISMATCH);
}
- /**
- * Process transactional command.
- * @param cmd Command.
- * @param qry Query.
- * @throws IgniteCheckedException if failed.
- */
- private FieldsQueryCursor<List<?>> processTxCommand(SqlCommand cmd, SqlFieldsQuery qry)
- throws IgniteCheckedException {
- NestedTxMode nestedTxMode = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx)qry).getNestedTxMode() :
- NestedTxMode.DEFAULT;
-
- GridNearTxLocal tx = tx(ctx);
-
- if (cmd instanceof SqlBeginTransactionCommand) {
- if (!mvccEnabled(ctx))
- throw new IgniteSQLException("MVCC must be enabled in order to start transaction.",
- IgniteQueryErrorCode.MVCC_DISABLED);
-
- if (tx != null) {
- if (nestedTxMode == null)
- nestedTxMode = NestedTxMode.DEFAULT;
-
- switch (nestedTxMode) {
- case COMMIT:
- doCommit(tx);
-
- txStart(ctx, qry.getTimeout());
-
- break;
-
- case IGNORE:
- log.warning("Transaction has already been started, ignoring BEGIN command.");
-
- break;
-
- case ERROR:
- throw new IgniteSQLException("Transaction has already been started.",
- IgniteQueryErrorCode.TRANSACTION_EXISTS);
-
- default:
- throw new IgniteSQLException("Unexpected nested transaction handling mode: " +
- nestedTxMode.name());
- }
- }
- else
- txStart(ctx, qry.getTimeout());
- }
- else if (cmd instanceof SqlCommitTransactionCommand) {
- // Do nothing if there's no transaction.
- if (tx != null)
- doCommit(tx);
- }
- else {
- assert cmd instanceof SqlRollbackTransactionCommand;
-
- // Do nothing if there's no transaction.
- if (tx != null)
- doRollback(tx);
- }
-
- return H2Utils.zeroCursor();
- }
-
- /**
- * Commit and properly close transaction.
- * @param tx Transaction.
- * @throws IgniteCheckedException if failed.
- */
- private void doCommit(@NotNull GridNearTxLocal tx) throws IgniteCheckedException {
- try {
- tx.commit();
- }
- finally {
- closeTx(tx);
- }
- }
-
- /**
- * Rollback and properly close transaction.
- * @param tx Transaction.
- * @throws IgniteCheckedException if failed.
- */
- private void doRollback(@NotNull GridNearTxLocal tx) throws IgniteCheckedException {
- try {
- tx.rollback();
- }
- finally {
- closeTx(tx);
- }
- }
-
- /**
- * Properly close transaction.
- * @param tx Transaction.
- * @throws IgniteCheckedException if failed.
- */
- private void closeTx(@NotNull GridNearTxLocal tx) throws IgniteCheckedException {
- try {
- tx.close();
- }
- finally {
- ctx.cache().context().tm().resetContext();
- }
- }
-
- /**
- * Process bulk load COPY command.
- *
- * @param cmd The command.
- * @param qryId Query id.
- * @return The context (which is the result of the first request/response).
- * @throws IgniteCheckedException If something failed.
- */
- private FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd, Long qryId)
- throws IgniteCheckedException {
- if (cmd.packetSize() == null)
- cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE);
-
- GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName());
-
- if (tbl == null) {
- throw new IgniteSQLException("Table does not exist: " + cmd.tableName(),
- IgniteQueryErrorCode.TABLE_NOT_FOUND);
- }
-
- H2Utils.checkAndStartNotStartedCache(ctx, tbl);
-
- UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl);
-
- IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new DmlBulkLoadDataConverter(plan);
-
- IgniteDataStreamer<Object, Object> streamer = ctx.grid().dataStreamer(tbl.cacheName());
-
- BulkLoadCacheWriter outputWriter = new BulkLoadStreamerWriter(streamer);
-
- BulkLoadParser inputParser = BulkLoadParser.createParser(cmd.inputFormat());
-
- BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter,
- runningQueryMgr, qryId);
-
- BulkLoadAckClientParameters params = new BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize());
-
- return new BulkLoadContextCursor(processor, params);
- }
-
/** {@inheritDoc} */
@SuppressWarnings({"StringEquality", "unchecked"})
@Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
@@ -1712,8 +1529,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
ParsingResult parseRes = parse(schemaName, remainingQry, firstArg);
- SqlCommand nativeCmd = parseRes.nativeCommand();
-
remainingSql = parseRes.remainingSql();
if (remainingSql != null && failOnMultipleStmts)
@@ -1723,6 +1538,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert newQry.getSql() != null;
+ // Check if operation is performed on an active cluster.
+ SqlCommand nativeCmd = parseRes.commandNative();
+
if (!(nativeCmd instanceof SqlCommitTransactionCommand || nativeCmd instanceof SqlRollbackTransactionCommand)
&& !ctx.state().publicApiActiveState(true)) {
throw new IgniteException("Can not perform the operation because the cluster is inactive. Note, " +
@@ -1730,9 +1548,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
"let all the nodes join the cluster. To activate the cluster call Ignite.active(true).");
}
- if (nativeCmd != null)
- res.addAll(queryDistributedSqlFieldsNative(schemaName, newQry, nativeCmd, cliCtx));
- else {
+ if (parseRes.isCommand()) {
+ // Execute command.
+ FieldsQueryCursor<List<?>> cmdRes = executeCommand(
+ schemaName,
+ newQry,
+ cliCtx,
+ parseRes.commandNative(),
+ parseRes.commandH2()
+ );
+
+ res.add(cmdRes);
+ }
+ else {
+ // Execute query or DML.
List<GridQueryFieldMetadata> meta = parseRes.meta();
GridCacheTwoStepQuery twoStepQry = parseRes.twoStepQuery();
@@ -1861,17 +1690,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- if (DdlStatementsProcessor.isDdlStatement(prepared)) {
- if (loc) {
- fail = true;
-
- throw new IgniteSQLException("DDL statements are not supported for LOCAL caches",
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- }
-
- return Collections.singletonList(ddlProc.runDdlStatement(sqlQry, prepared));
- }
-
if (prepared instanceof NoOperation)
return Collections.singletonList(H2Utils.zeroCursor());
@@ -1881,7 +1699,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
finally {
- runningQueryMgr.unregister(qryId, fail);
+ runningQryMgr.unregister(qryId, fail);
}
}
@@ -1905,7 +1723,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return Collections.singletonList(executeQueryLocal(schemaName, qry, keepBinary, filter, cancel, qryId));
}
catch (IgniteCheckedException e) {
- runningQueryMgr.unregister(qryId, true);
+ runningQryMgr.unregister(qryId, true);
throw new IgniteSQLException("Failed to execute local statement [stmt=" + sqlQry +
", params=" + Arrays.deepToString(qry.getArgs()) + "]", e);
@@ -1923,7 +1741,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private Long registerRunningQuery(String schemaName, GridQueryCancel cancel, String qry, boolean loc,
boolean registerAsNewQry) {
if (registerAsNewQry)
- return runningQueryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, loc, cancel);
+ return runningQryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, loc, cancel);
return null;
}
@@ -1987,7 +1805,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
int paramsCnt = prepared.getParameters().size();
Object[] argsOrig = qry.getArgs();
-
Object[] args = null;
if (!DmlUtils.isBatched(qry) && paramsCnt > 0) {
@@ -2042,6 +1859,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
SqlFieldsQuery newQry = cloneFieldsQuery(qry).setSql(prepared.getSQL()).setArgs(args);
+ if (CommandProcessor.isCommand(prepared)) {
+ GridSqlStatement cmdH2 = new GridSqlQueryParser(false).parse(prepared);
+
+ return new ParsingResult(newQry, null, cmdH2, remainingSql);
+ }
+
boolean hasTwoStep = !loc && prepared.isQuery();
// Let's not cache multiple statements and distributed queries as whole two step query will be cached later on.
@@ -2090,8 +1913,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- checkQueryType(qry, true);
-
return new ParsingResult(prepared, newQry, remainingSql, cachedQry.query(), cachedQryKey, cachedQry.meta());
}
@@ -2294,7 +2115,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
finally {
if (!cursorCreated)
- runningQueryMgr.unregister(qryId, failed);
+ runningQryMgr.unregister(qryId, failed);
}
}
@@ -2556,7 +2377,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @return Running query manager.
*/
public RunningQueryManager runningQueryManager() {
- return runningQueryMgr;
+ return runningQryMgr;
}
/** {@inheritDoc} */
@@ -2593,10 +2414,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
mapQryExec.start(ctx, this);
rdcQryExec.start(ctx, this);
- ddlProc = new DdlStatementsProcessor(ctx, schemaMgr);
-
+ runningQryMgr = new RunningQueryManager(ctx);
partExtractor = new PartitionExtractor(new H2PartitionResolver(this));
- runningQueryMgr = new RunningQueryManager(ctx);
+
+ cmdProc = new CommandProcessor(ctx, schemaMgr, runningQryMgr);
if (JdbcUtils.serializer != null)
U.warn(log, "Custom H2 serialization is already configured, will override.");
@@ -2733,7 +2554,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
qryCtxRegistry.clearSharedOnLocalNodeStop();
- runningQueryMgr.stop();
+ runningQryMgr.stop();
schemaMgr.stop();
connMgr.stop();
@@ -2749,7 +2570,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridNearTxLocal tx = tx(ctx);
if (tx != null)
- doRollback(tx);
+ cmdProc.doRollback(tx);
}
/** {@inheritDoc} */
@@ -2818,8 +2639,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Remove all cached queries from cached two-steps queries.
*/
- private void clearCachedQueries() {
+ private void clearPlanCache() {
twoStepCache = new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE);
+ updatePlanCache = new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE);
}
/** {@inheritDoc} */
@@ -2873,12 +2695,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @return SQL running queries.
*/
public List<GridRunningQueryInfo> runningSqlQueries() {
- return runningQueryMgr.runningSqlQueries();
+ return runningQryMgr.runningSqlQueries();
}
/** {@inheritDoc} */
@Override public Collection<GridRunningQueryInfo> runningQueries(long duration) {
- return runningQueryMgr.longRunningQueries(duration);
+ return runningQryMgr.longRunningQueries(duration);
}
/**
@@ -2887,21 +2709,21 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @return Queries history metrics.
*/
public Map<QueryHistoryMetricsKey, QueryHistoryMetrics> queryHistoryMetrics() {
- return runningQueryMgr.queryHistoryMetrics();
+ return runningQryMgr.queryHistoryMetrics();
}
/**
* Reset query history metrics.
*/
public void resetQueryHistoryMetrics() {
- runningQueryMgr.resetQueryHistoryMetrics();
+ runningQryMgr.resetQueryHistoryMetrics();
}
/** {@inheritDoc} */
@Override public void cancelQueries(Collection<Long> queries) {
if (!F.isEmpty(queries)) {
for (Long qryId : queries)
- runningQueryMgr.cancel(qryId);
+ runningQryMgr.cancel(qryId);
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java
index de2a50e..0358f64 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ParsingResult.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
import org.apache.ignite.internal.sql.command.SqlCommand;
import org.h2.command.Prepared;
@@ -46,8 +47,11 @@ final class ParsingResult {
/** Metadata for two-step query, or {@code} null if this result is for local query. */
private final List<GridQueryFieldMetadata> meta;
- /** Parsed native command. */
- private final SqlCommand nativeCmd;
+ /** Command (native). */
+ private final SqlCommand cmdNative;
+
+ /** Command (H2). */
+ private final GridSqlStatement cmdH2;
/**
* Simple constructor.
@@ -59,7 +63,8 @@ final class ParsingResult {
GridCacheTwoStepQuery twoStepQry,
H2TwoStepCachedQueryKey twoStepQryKey,
List<GridQueryFieldMetadata> meta,
- SqlCommand nativeCmd
+ SqlCommand cmdNative,
+ GridSqlStatement cmdH2
) {
this.prepared = prepared;
this.newQry = newQry;
@@ -67,7 +72,8 @@ final class ParsingResult {
this.twoStepQry = twoStepQry;
this.twoStepQryKey = twoStepQryKey;
this.meta = meta;
- this.nativeCmd = nativeCmd;
+ this.cmdNative = cmdNative;
+ this.cmdH2 = cmdH2;
}
/**
@@ -81,18 +87,19 @@ final class ParsingResult {
H2TwoStepCachedQueryKey twoStepQryKey,
List<GridQueryFieldMetadata> meta
) {
- this(prepared, newQry, remainingSql, twoStepQry, twoStepQryKey, meta, null);
+ this(prepared, newQry, remainingSql, twoStepQry, twoStepQryKey, meta, null, null);
}
/**
* Construct parsing result in case of native parsing.
*
* @param newQry leading sql statement of the original multi-statement query.
- * @param nativeCmd parsed sql command. Represents newQry.
+ * @param cmdNative Command (native).
+ * @param cmdH2 Command (H2).
* @param remainingSql the rest of the original query.
*/
- public ParsingResult(SqlFieldsQuery newQry, SqlCommand nativeCmd, String remainingSql) {
- this(null, newQry, remainingSql, null, null, null, nativeCmd);
+ public ParsingResult(SqlFieldsQuery newQry, SqlCommand cmdNative, GridSqlStatement cmdH2, String remainingSql) {
+ this(null, newQry, remainingSql, null, null, null, cmdNative, cmdH2);
}
/**
@@ -103,7 +110,7 @@ final class ParsingResult {
* @param remainingSql the rest of the original query.
*/
public ParsingResult(Prepared prepared, SqlFieldsQuery newQry, String remainingSql) {
- this(prepared, newQry, remainingSql, null, null, null, null);
+ this(prepared, newQry, remainingSql, null, null, null, null, null);
}
/**
@@ -121,10 +128,17 @@ final class ParsingResult {
}
/**
- * Sql command produced by native sql parser.
+ * Command (native).
+ */
+ public SqlCommand commandNative() {
+ return cmdNative;
+ }
+
+ /**
+ * @return Command (H2).
*/
- public SqlCommand nativeCommand() {
- return nativeCmd;
+ public GridSqlStatement commandH2() {
+ return cmdH2;
}
/**
@@ -162,4 +176,11 @@ final class ParsingResult {
public int parametersCount() {
return prepared != null ? prepared.getParameters().size() : twoStepQry.parametersCount();
}
+
+ /**
+ * @return Check whether this is a command.
+ */
+ public boolean isCommand() {
+ return cmdNative != null || cmdH2 != null;
+ }
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java
index 2204c596..d97ef08 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicTableSelfTest.java
@@ -57,7 +57,7 @@ import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.H2TableDescriptor;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
+import org.apache.ignite.internal.processors.query.h2.CommandProcessor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.util.GridStringBuilder;
@@ -933,7 +933,7 @@ public class H2DynamicTableSelfTest extends AbstractSchemaSelfTest {
}
/**
- * Tests table name conflict check in {@link DdlStatementsProcessor}.
+ * Tests table name conflict check in {@link CommandProcessor}.
* @throws Exception if failed.
*/
@Test