You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/03/08 11:12:45 UTC
[ignite] branch master updated: IGNITE-11227: SQL: Decoupled query
execution entry point from DML. This closes #6246.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 455b56d IGNITE-11227: SQL: Decoupled query execution entry point from DML. This closes #6246.
455b56d is described below
commit 455b56d4166c9455a4b02532049de63e2e6090e3
Author: devozerov <pp...@gmail.com>
AuthorDate: Fri Mar 8 14:12:38 2019 +0300
IGNITE-11227: SQL: Decoupled query execution entry point from DML. This closes #6246.
---
.../thin/JdbcThinDataPageScanPropertySelfTest.java | 33 +-
.../thin/JdbcThinStreamingAbstractSelfTest.java | 22 +-
.../internal/processors/cache/mvcc/MvccUtils.java | 7 +-
.../processors/query/GridQueryIndexing.java | 15 +-
.../processors/query/GridQueryProcessor.java | 9 +-
.../IgniteClientCacheInitializationFailTest.java | 12 +-
.../processors/query/h2/CommandProcessor.java | 26 +-
.../processors/query/h2/IgniteH2Indexing.java | 1163 +++++++++++---------
...eryParserCacheKey.java => QueryDescriptor.java} | 110 +-
.../processors/query/h2/QueryParameters.java | 215 ++++
.../internal/processors/query/h2/QueryParser.java | 131 ++-
.../processors/query/h2/QueryParserResult.java | 29 +-
.../processors/query/h2/dml/UpdatePlanBuilder.java | 68 +-
.../processors/query/RunningQueriesTest.java | 27 +-
14 files changed, 1167 insertions(+), 700 deletions(-)
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataPageScanPropertySelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataPageScanPropertySelfTest.java
index 40e1d78..9bd9064 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataPageScanPropertySelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDataPageScanPropertySelfTest.java
@@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.SqlClientContext;
@@ -121,14 +120,19 @@ public class JdbcThinDataPageScanPropertySelfTest extends GridCommonAbstractTest
private void checkDataPageScanInBatch(String qryWithParam, @Nullable Boolean dps) throws Exception {
String params = (dps == null) ? null : "dataPageScanEnabled=" + dps;
+ int expCnt = 0;
+
try (Connection conn = GridTestUtils.connect(grid(0), params)) {
try (PreparedStatement upd = conn.prepareStatement(qryWithParam)) {
for (int i = 0; i < TOTAL_QUERIES_TO_EXECUTE; i++) {
upd.setInt(1, i);
upd.addBatch();
- if ((i + 1) % BATCH_SIZE == 0 || (i + 1) == TOTAL_QUERIES_TO_EXECUTE)
+ if ((i + 1) % BATCH_SIZE == 0 || (i + 1) == TOTAL_QUERIES_TO_EXECUTE) {
upd.executeBatch();
+
+ expCnt++;
+ }
}
}
}
@@ -146,10 +150,7 @@ public class JdbcThinDataPageScanPropertySelfTest extends GridCommonAbstractTest
int executed = IndexingWithQueries.queries.size();
- assertTrue(
- "Expected that there are executed at least " + TOTAL_QUERIES_TO_EXECUTE + " queries. " +
- "But executed only " + executed,
- executed >= TOTAL_QUERIES_TO_EXECUTE);
+ assertEquals(expCnt, executed);
IndexingWithQueries.queries.clear();
}
@@ -198,12 +199,24 @@ public class JdbcThinDataPageScanPropertySelfTest extends GridCommonAbstractTest
static final Queue<SqlFieldsQuery> queries = new LinkedBlockingQueue<>();
/** {@inheritDoc} */
- @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
- @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts,
- MvccQueryTracker tracker, GridQueryCancel cancel, boolean registerAsNewQry) {
+ @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
+ String schemaName,
+ SqlFieldsQuery qry,
+ @Nullable SqlClientContext cliCtx,
+ boolean keepBinary,
+ boolean failOnMultipleStmts,
+ GridQueryCancel cancel
+ ) {
queries.add(qry);
- return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, tracker, cancel, registerAsNewQry);
+ return super.querySqlFields(
+ schemaName,
+ qry,
+ cliCtx,
+ keepBinary,
+ failOnMultipleStmts,
+ cancel
+ );
}
}
}
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
index 5929839..71b5049 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingAbstractSelfTest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.SqlClientContext;
@@ -517,13 +516,24 @@ public abstract class JdbcThinStreamingAbstractSelfTest extends JdbcStreamingSel
}
/** {@inheritDoc} */
- @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
- @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker,
- GridQueryCancel cancel, boolean registerAsNewQry) {
+ @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
+ String schemaName,
+ SqlFieldsQuery qry,
+ @Nullable SqlClientContext cliCtx,
+ boolean keepBinary,
+ boolean failOnMultipleStmts,
+ GridQueryCancel cancel
+ ) {
IndexingWithContext.cliCtx = cliCtx;
- return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, tracker, cancel,
- registerAsNewQry);
+ return super.querySqlFields(
+ schemaName,
+ qry,
+ cliCtx,
+ keepBinary,
+ failOnMultipleStmts,
+ cancel
+ );
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index cbc7023..c81b2da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -788,16 +788,17 @@ public class MvccUtils {
/**
* Initialises MVCC filter and returns MVCC query tracker if needed.
* @param cctx Cache context.
- * @param startTx Start transaction flag.
+ * @param autoStartTx Start transaction flag.
* @return MVCC query tracker.
* @throws IgniteCheckedException If failed.
*/
- @NotNull public static MvccQueryTracker mvccTracker(GridCacheContext cctx, boolean startTx) throws IgniteCheckedException {
+ @NotNull public static MvccQueryTracker mvccTracker(GridCacheContext cctx, boolean autoStartTx)
+ throws IgniteCheckedException {
assert cctx != null && cctx.mvccEnabled();
GridNearTxLocal tx = tx(cctx.kernalContext());
- if (tx == null && startTx)
+ if (tx == null && autoStartTx)
tx = txStart(cctx, 0);
return mvccTracker(cctx, tx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index f6215b7..557e3ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
@@ -85,14 +84,16 @@ public interface GridQueryIndexing {
* @param cliCtx Client context.
* @param keepBinary Keep binary flag.
* @param failOnMultipleStmts Whether an exception should be thrown for multiple statements query.
- * @param tracker Query tracker.
- * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
- * {@code false} otherwise.
* @return Cursor.
*/
- public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
- SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker,
- GridQueryCancel cancel, boolean registerAsNewQry);
+ public List<FieldsQueryCursor<List<?>>> querySqlFields(
+ String schemaName,
+ SqlFieldsQuery qry,
+ SqlClientContext cliCtx,
+ boolean keepBinary,
+ boolean failOnMultipleStmts,
+ GridQueryCancel cancel
+ );
/**
* Execute an INSERT statement using data streamer as receiver.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index b6af8ca..c3d0b88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2231,6 +2231,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
throw new CacheException("Execution of local SqlFieldsQuery on client node disallowed.");
return executeQuerySafe(cctx, () -> {
+ assert idx != null;
+
final String schemaName = qry.getSchema() != null ? qry.getSchema()
: (cctx != null ? idx.schema(cctx.name()) : QueryUtils.DFLT_SCHEMA);
@@ -2239,16 +2241,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
@Override public List<FieldsQueryCursor<List<?>>> applyx() {
GridQueryCancel cancel0 = cancel != null ? cancel : new GridQueryCancel();
- List<FieldsQueryCursor<List<?>>> res =
- idx.querySqlFields(
+ List<FieldsQueryCursor<List<?>>> res = idx.querySqlFields(
schemaName,
qry,
cliCtx,
keepBinary,
failOnMultipleStmts,
- null,
- cancel0,
- true
+ cancel0
);
if (cctx != null)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index 5313c09..9e3af5a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -285,9 +284,14 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
}
/** {@inheritDoc} */
- @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
- SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker,
- GridQueryCancel cancel, boolean registerAsNewQry) {
+ @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
+ String schemaName,
+ SqlFieldsQuery qry,
+ SqlClientContext cliCtx,
+ boolean keepBinary,
+ boolean failOnMultipleStmts,
+ GridQueryCancel cancel
+ ) {
return null;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
index 0007fd8..c71a5c2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/CommandProcessor.java
@@ -37,7 +37,6 @@ import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.cache.query.BulkLoadContextCursor;
import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -50,7 +49,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -160,15 +158,16 @@ public class CommandProcessor {
/**
* Execute command.
*
- * @param qry Query.
+ * @param sql SQL.
* @param cmdNative Native command (if any).
* @param cmdH2 H2 command (if any).
+ * @param params Parameters.
* @param cliCtx Client context.
* @param qryId Running query ID.
* @return Result.
*/
- public CommandResult runCommand(SqlFieldsQuery qry, SqlCommand cmdNative, GridSqlStatement cmdH2,
- @Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException {
+ public CommandResult runCommand(String sql, SqlCommand cmdNative, GridSqlStatement cmdH2,
+ QueryParameters params, @Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException {
assert cmdNative != null || cmdH2 != null;
// Do execute.
@@ -179,7 +178,7 @@ public class CommandProcessor {
assert cmdH2 == null;
if (isDdl(cmdNative))
- runCommandNativeDdl(qry.getSql(), cmdNative);
+ runCommandNativeDdl(sql, cmdNative);
else if (cmdNative instanceof SqlBulkLoadCommand) {
res = processBulkLoadCommand((SqlBulkLoadCommand) cmdNative, qryId);
@@ -188,12 +187,12 @@ public class CommandProcessor {
else if (cmdNative instanceof SqlSetStreamingCommand)
processSetStreamingCommand((SqlSetStreamingCommand)cmdNative, cliCtx);
else
- processTxCommand(cmdNative, qry);
+ processTxCommand(cmdNative, params);
}
else {
assert cmdH2 != null;
- runCommandH2(qry.getSql(), cmdH2);
+ runCommandH2(sql, cmdH2);
}
return new CommandResult(res, unregister);
@@ -839,13 +838,12 @@ public class CommandProcessor {
/**
* Process transactional command.
* @param cmd Command.
- * @param qry Query.
+ * @param params Parameters.
* @throws IgniteCheckedException if failed.
*/
- private void processTxCommand(SqlCommand cmd, SqlFieldsQuery qry)
+ private void processTxCommand(SqlCommand cmd, QueryParameters params)
throws IgniteCheckedException {
- NestedTxMode nestedTxMode = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx)qry).getNestedTxMode() :
- NestedTxMode.DEFAULT;
+ NestedTxMode nestedTxMode = params.nestedTxMode();
GridNearTxLocal tx = tx(ctx);
@@ -862,7 +860,7 @@ public class CommandProcessor {
case COMMIT:
doCommit(tx);
- txStart(ctx, qry.getTimeout());
+ txStart(ctx, params.timeout());
break;
@@ -881,7 +879,7 @@ public class CommandProcessor {
}
}
else
- txStart(ctx, qry.getTimeout());
+ txStart(ctx, params.timeout());
}
else if (cmd instanceof SqlCommitTransactionCommand) {
// Do nothing if there's no transaction.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 65a85ba..17e9b1d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -71,14 +71,12 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.RegisteredQueryCursor;
-import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.query.EnlistOperation;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
@@ -420,32 +418,27 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Queries individual fields (generally used by JDBC drivers).
*
- * @param schemaName Schema name.
- * @param qry Query.
- * @param params Query parameters.
+ * @param qryDesc Query descriptor.
+ * @param qryParams Query parameters.
+ * @param select Select.
* @param filter Cache name and key filter.
- * @param enforceJoinOrder Enforce join order of tables in the query.
- * @param startTx Start transaction flag.
- * @param qryTimeout Query timeout in milliseconds.
- * @param cancel Query cancel.
+ * @param autoStartTx Start transaction flag.
* @param mvccTracker Query tracker.
- * @param dataPageScanEnabled If data page scan is enabled.
+ * @param cancel Query cancel.
* @return Query result.
* @throws IgniteCheckedException If failed.
*/
- private GridQueryFieldsResult executeQueryLocal0(
- final String schemaName,
- String qry,
- @Nullable final Collection<Object> params,
- @Nullable List<GridQueryFieldMetadata> meta,
+ private GridQueryFieldsResult executeSelectLocal(
+ QueryDescriptor qryDesc,
+ QueryParameters qryParams,
+ QueryParserResultSelect select,
final IndexingQueryFilter filter,
- boolean enforceJoinOrder,
- boolean startTx,
- int qryTimeout,
- final GridQueryCancel cancel,
+ boolean autoStartTx,
MvccQueryTracker mvccTracker,
- Boolean dataPageScanEnabled
+ GridQueryCancel cancel
) throws IgniteCheckedException {
+ String qry = qryDesc.sql();
+
GridNearTxLocal tx = null;
boolean mvccEnabled = mvccEnabled(kernalContext());
@@ -453,37 +446,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert mvccEnabled || mvccTracker == null;
try {
- SqlFieldsQuery fieldsQry = new SqlFieldsQuery(qry)
- .setLocal(true)
- .setEnforceJoinOrder(enforceJoinOrder)
- .setDataPageScanEnabled(dataPageScanEnabled)
- .setTimeout(qryTimeout, TimeUnit.MILLISECONDS);
-
- if (params != null)
- fieldsQry.setArgs(params.toArray());
-
- QueryParserResult parseRes = parser.parse(schemaName, fieldsQry, false);
-
- if (parseRes.isDml()) {
- QueryParserResultDml dml = parseRes.dml();
-
- assert dml != null;
-
- UpdateResult updRes = executeUpdate(schemaName, dml, fieldsQry, true, filter, cancel);
-
- List<?> updResRow = Collections.singletonList(updRes.counter());
-
- return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META, new IgniteSingletonIterator<>(updResRow));
- }
- else if (parseRes.isCommand()) {
- throw new IgniteSQLException("DDL statements are supported for the whole cluster only.",
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- }
-
- assert parseRes.isSelect();
-
- QueryParserResultSelect select = parseRes.select();
-
assert select != null;
MvccSnapshot mvccSnapshot = null;
@@ -499,7 +461,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
GridNearTxSelectForUpdateFuture sfuFut = null;
- int opTimeout = qryTimeout;
+ int opTimeout = qryParams.timeout();
if (mvccEnabled) {
if (mvccTracker == null) {
@@ -512,7 +474,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
throw new IgniteCheckedException("Cache has been stopped concurrently [cacheId=" +
mvccCacheId + ']');
- mvccTracker = MvccUtils.mvccTracker(mvccCacheCtx, startTx);
+ mvccTracker = MvccUtils.mvccTracker(mvccCacheCtx, autoStartTx);
}
}
@@ -563,7 +525,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
null
);
- return new GridQueryFieldsResultAdapter(meta, null) {
+ return new GridQueryFieldsResultAdapter(select.meta(), null) {
@Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
assert qryCtxRegistry.getThreadLocal() == null;
@@ -572,12 +534,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable conn = connMgr.detachThreadConnection();
try {
- Connection conn0 = conn.object().connection(schemaName);
+ Connection conn0 = conn.object().connection(qryDesc.schemaName());
+
+ List<Object> args = F.asList(qryParams.arguments());
PreparedStatement stmt = preparedStatementWithParams(
conn0,
qry0,
- params,
+ args,
true
);
@@ -585,10 +549,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
stmt,
conn0,
qry0,
- params,
+ args,
timeout0,
cancel,
- dataPageScanEnabled
+ qryParams.dataPageScanEnabled()
);
if (sfuFut0 != null) {
@@ -991,65 +955,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * Queries individual fields (generally used by JDBC drivers).
- *
- * @param schemaName Schema name.
- * @param qry Query.
- * @param keepBinary Keep binary flag.
- * @param filter Cache name and key filter.
- * @param cancel Query cancel.
- * @param qryId Running query id. {@code null} in case query is not registered.
- * @return Cursor.
- */
- private FieldsQueryCursor<List<?>> executeQueryLocal(
- String schemaName,
- SqlFieldsQuery qry,
- List<GridQueryFieldMetadata> meta,
- final boolean keepBinary,
- IndexingQueryFilter filter,
- GridQueryCancel cancel,
- Long qryId
- ) throws IgniteCheckedException {
- boolean startTx = autoStartTx(qry);
-
- final GridQueryFieldsResult res = executeQueryLocal0(
- schemaName,
- qry.getSql(),
- F.asList(qry.getArgs()),
- meta,
- filter,
- qry.isEnforceJoinOrder(),
- startTx,
- qry.getTimeout(),
- cancel,
- null,
- qry.isDataPageScanEnabled()
- );
-
- Iterable<List<?>> iter = () -> {
- try {
- return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- };
-
- QueryCursorImpl<List<?>> cursor = qryId != null
- ? new RegisteredQueryCursor<>(iter, cancel, runningQueryManager(), qryId)
- : new QueryCursorImpl<>(iter, cancel);
-
- cursor.fieldsMeta(res.metaData());
-
- return cursor;
- }
-
- /**
* @param schemaName Schema name.
* @param qry Query.
* @param keepCacheObj Flag to keep cache object.
* @param enforceJoinOrder Enforce join order of tables.
- * @param startTx Start transaction flag.
+ * @param autoStartTx Start transaction flag.
* @param qryTimeout Query timeout.
* @param cancel Cancel object.
* @param params Query parameters.
@@ -1060,12 +970,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param pageSize Page size.
* @return Iterable result.
*/
+ @SuppressWarnings("IfMayBeConditional")
private Iterable<List<?>> runQueryTwoStep(
final String schemaName,
final GridCacheTwoStepQuery qry,
final boolean keepCacheObj,
final boolean enforceJoinOrder,
- boolean startTx,
+ boolean autoStartTx,
final int qryTimeout,
final GridQueryCancel cancel,
final Object[] params,
@@ -1078,10 +989,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert !qry.mvccEnabled() || !F.isEmpty(qry.cacheIds());
try {
- final MvccQueryTracker tracker = mvccTracker == null && qry.mvccEnabled() ?
- MvccUtils.mvccTracker(ctx.cache().context().cacheContext(qry.cacheIds().get(0)), startTx) : mvccTracker;
+ final MvccQueryTracker mvccTracker0;
- GridNearTxLocal tx = tracker != null ? tx(ctx) : null;
+ if (mvccTracker == null && qry.mvccEnabled()) {
+ mvccTracker0 = MvccUtils.mvccTracker(
+ ctx.cache().context().cacheContext(qry.cacheIds().get(0)),
+ autoStartTx
+ );
+ }
+ else
+ mvccTracker0 = mvccTracker;
+
+ GridNearTxLocal tx = mvccTracker0 != null ? tx(ctx) : null;
// Locking has no meaning if SELECT FOR UPDATE is not executed in explicit transaction.
// So, we can can reset forUpdate flag if there is no explicit transaction.
@@ -1103,15 +1022,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
params,
parts,
lazy,
- tracker,
+ mvccTracker0,
dataPageScanEnabled,
forUpdate,
pageSize
);
}
catch (Throwable e) {
- if (tracker != null)
- tracker.onDone();
+ if (mvccTracker0 != null)
+ mvccTracker0.onDone();
throw e;
}
@@ -1166,15 +1085,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Execute command.
*
- * @param schemaName Schema name.
- * @param qry Query.
+ * @param qryDesc Query descriptor.
+ * @param qryParams Query parameters.
* @param cliCtx CLient context.
* @param cmd Command (native).
* @return Result.
*/
private FieldsQueryCursor<List<?>> executeCommand(
- String schemaName,
- SqlFieldsQuery qry,
+ QueryDescriptor qryDesc,
+ QueryParameters qryParams,
@Nullable SqlClientContext cliCtx,
QueryParserResultCommand cmd
) {
@@ -1184,26 +1103,26 @@ public class IgniteH2Indexing implements GridQueryIndexing {
SqlCommand cmdNative = cmd.commandNative();
GridSqlStatement cmdH2 = cmd.commandH2();
- if (qry.isLocal()) {
+ if (qryDesc.local()) {
throw new IgniteSQLException("DDL statements are not supported for LOCAL caches",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
- Long qryId = registerRunningQuery(schemaName, null, qry.getSql(), qry.isLocal(), true);
+ Long qryId = registerRunningQuery(qryDesc, null);
boolean fail = false;
CommandResult res = null;
try {
- res = cmdProc.runCommand(qry, cmdNative, cmdH2, cliCtx, qryId);
+ res = cmdProc.runCommand(qryDesc.sql(), cmdNative, cmdH2, qryParams, cliCtx, qryId);
return res.cursor();
}
catch (IgniteCheckedException e) {
fail = true;
- throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql() +
+ throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qryDesc.sql() +
", err=" + e.getMessage() + ']', e);
}
finally {
@@ -1244,11 +1163,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@Nullable SqlClientContext cliCtx,
boolean keepBinary,
boolean failOnMultipleStmts,
- MvccQueryTracker tracker,
- GridQueryCancel cancel,
- boolean registerAsNewQry
+ GridQueryCancel cancel
) {
- boolean mvccEnabled = mvccEnabled(ctx), startTx = autoStartTx(qry);
+ boolean mvccEnabled = mvccEnabled(ctx);
try {
List<FieldsQueryCursor<List<?>>> res = new ArrayList<>(1);
@@ -1262,12 +1179,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
remainingQry = parseRes.remainingQuery();
// Get next command.
- SqlFieldsQuery newQry = parseRes.query();
+ QueryDescriptor newQryDesc = parseRes.queryDescriptor();
+ QueryParameters newQryParams = parseRes.queryParameters();
// Check if there is enough parameters. Batched statements are not checked at this point
// since they pass parameters differently.
- if (!DmlUtils.isBatched(newQry)) {
- int qryParamsCnt = F.isEmpty(newQry.getArgs()) ? 0 : newQry.getArgs().length;
+ if (!newQryDesc.batched()) {
+ int qryParamsCnt = F.isEmpty(newQryParams.arguments()) ? 0 : newQryParams.arguments().length;
if (qryParamsCnt < parseRes.parametersCount())
throw new IgniteSQLException("Invalid number of query parameters [expected=" +
@@ -1283,28 +1201,42 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert cmd != null;
- // Execute command.
FieldsQueryCursor<List<?>> cmdRes = executeCommand(
- schemaName,
- newQry,
+ newQryDesc,
+ newQryParams,
cliCtx,
cmd
);
res.add(cmdRes);
}
+ else if (parseRes.isDml()) {
+ QueryParserResultDml dml = parseRes.dml();
+
+ assert dml != null;
+
+ List<? extends FieldsQueryCursor<List<?>>> dmlRes = executeDml(
+ newQryDesc,
+ newQryParams,
+ dml,
+ cancel
+ );
+
+ res.addAll(dmlRes);
+ }
else {
- // Execute query or DML.
- List<? extends FieldsQueryCursor<List<?>>> qryRes = executeSelectOrDml(
- schemaName,
- newQry,
- parseRes.select(),
- parseRes.dml(),
+ assert parseRes.isSelect();
+
+ QueryParserResultSelect select = parseRes.select();
+
+ assert select != null;
+
+ List<? extends FieldsQueryCursor<List<?>>> qryRes = executeSelect(
+ newQryDesc,
+ newQryParams,
+ select,
keepBinary,
- startTx,
- tracker,
- cancel,
- registerAsNewQry
+ cancel
);
res.addAll(qryRes);
@@ -1329,137 +1261,233 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Execute an all-ready {@link SqlFieldsQuery}.
- * @param schemaName Schema name.
- * @param qry Fields query with flags.
- * @param select Select.
+ *
+ * @param qryDesc Plan key.
+ * @param qryParams Parameters.
* @param dml DML.
+ * @param cancel Query cancel state holder.
+ * @return Query result.
+ */
+ private List<? extends FieldsQueryCursor<List<?>>> executeDml(
+ QueryDescriptor qryDesc,
+ QueryParameters qryParams,
+ QueryParserResultDml dml,
+ GridQueryCancel cancel
+ ) {
+ IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null);
+
+ Long qryId = registerRunningQuery(qryDesc, cancel);
+
+ boolean fail = false;
+
+ try {
+ if (!dml.mvccEnabled() && !updateInTxAllowed && ctx.cache().context().tm().inUserTx()) {
+ throw new IgniteSQLException("DML statements are not allowed inside a transaction over " +
+ "cache(s) with TRANSACTIONAL atomicity mode (change atomicity mode to " +
+ "TRANSACTIONAL_SNAPSHOT or disable this error message with system property " +
+ "\"-DIGNITE_ALLOW_DML_INSIDE_TRANSACTION=true\")");
+ }
+
+ if (!qryDesc.local()) {
+ return executeUpdateDistributed(
+ qryDesc,
+ qryParams,
+ dml,
+ cancel
+ );
+ }
+ else {
+ UpdateResult updRes = executeUpdate(
+ qryDesc,
+ qryParams,
+ dml,
+ true,
+ filter,
+ cancel
+ );
+
+ return Collections.singletonList(new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @SuppressWarnings("NullableProblems")
+ @Override public Iterator<List<?>> iterator() {
+ return new IgniteSingletonIterator<>(Collections.singletonList(updRes.counter()));
+ }
+ }, cancel));
+ }
+ }
+ catch (IgniteCheckedException e) {
+ fail = true;
+
+ throw new IgniteSQLException("Failed to execute DML statement [stmt=" + qryDesc.sql() +
+ ", params=" + Arrays.deepToString(qryParams.arguments()) + "]", e);
+ }
+ finally {
+ runningQryMgr.unregister(qryId, fail);
+ }
+ }
+
+ /**
+ * Execute an all-ready {@link SqlFieldsQuery}.
+ *
+ * @param qryDesc Plan key.
+ * @param qryParams Parameters.
+ * @param select Select.
* @param keepBinary Whether binary objects must not be deserialized automatically.
- * @param startTx Start transaction flag.
- * @param mvccTracker MVCC tracker.
* @param cancel Query cancel state holder.
- * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
* @return Query result.
*/
- private List<? extends FieldsQueryCursor<List<?>>> executeSelectOrDml(
- String schemaName,
- SqlFieldsQuery qry,
- @Nullable QueryParserResultSelect select,
- @Nullable QueryParserResultDml dml,
+ private List<? extends FieldsQueryCursor<List<?>>> executeSelect(
+ QueryDescriptor qryDesc,
+ QueryParameters qryParams,
+ QueryParserResultSelect select,
boolean keepBinary,
- boolean startTx,
- MvccQueryTracker mvccTracker,
- GridQueryCancel cancel,
- boolean registerAsNewQry
+ GridQueryCancel cancel
) {
- String sqlQry = qry.getSql();
-
- boolean loc = qry.isLocal();
+ if (cancel == null)
+ cancel = new GridQueryCancel();
- IndexingQueryFilter filter = (loc ? backupFilter(null, qry.getPartitions()) : null);
+ // Check security.
+ if (ctx.security().enabled())
+ checkSecurity(select.cacheIds());
- if (dml != null) {
- Long qryId = registerRunningQuery(schemaName, cancel, sqlQry, loc, registerAsNewQry);
+ // Register query.
+ Long qryId = registerRunningQuery(qryDesc, cancel);
- boolean fail = false;
+ try {
+ Iterable<List<?>> iter = executeSelect0(qryDesc, qryParams, select, keepBinary, null, cancel);
- try {
- if (!dml.mvccEnabled() && !updateInTxAllowed && ctx.cache().context().tm().inUserTx()) {
- throw new IgniteSQLException("DML statements are not allowed inside a transaction over " +
- "cache(s) with TRANSACTIONAL atomicity mode (change atomicity mode to " +
- "TRANSACTIONAL_SNAPSHOT or disable this error message with system property " +
- "\"-DIGNITE_ALLOW_DML_INSIDE_TRANSACTION=true\")");
- }
+ QueryCursorImpl<List<?>> cursor = qryId != null
+ ? new RegisteredQueryCursor<>(iter, cancel, runningQueryManager(), qryId)
+ : new QueryCursorImpl<>(iter, cancel);
- if (!loc)
- return executeUpdateDistributed(schemaName, dml , qry, cancel);
- else {
- UpdateResult updRes = executeUpdate(schemaName, dml , qry, true, filter, cancel);
+ cursor.fieldsMeta(select.meta());
- return Collections.singletonList(new QueryCursorImpl<>(new Iterable<List<?>>() {
- @SuppressWarnings("NullableProblems")
- @Override public Iterator<List<?>> iterator() {
- return new IgniteSingletonIterator<>(Collections.singletonList(updRes.counter()));
- }
- }, cancel));
- }
- }
- catch (IgniteCheckedException e) {
- fail = true;
+ return Collections.singletonList(cursor);
+ }
+ catch (Exception e) {
+ runningQryMgr.unregister(qryId, true);
- throw new IgniteSQLException("Failed to execute DML statement [stmt=" + sqlQry +
- ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e);
- }
- finally {
- runningQryMgr.unregister(qryId, fail);
- }
+ throw new IgniteSQLException("Failed to execute SELECT statement: " + qryDesc.sql(), e);
}
+ }
+
+ /**
+ * Execute SELECT statement for DML.
+ *
+ * @param schema Schema.
+ * @param selectQry Select query.
+ * @param mvccTracker MVCC tracker.
+ * @param cancel Cancel.
+ * @return Fields query.
+ */
+ private QueryCursorImpl<List<?>> executeSelectForDml(
+ String schema,
+ SqlFieldsQuery selectQry,
+ MvccQueryTracker mvccTracker,
+ GridQueryCancel cancel
+ ) throws IgniteCheckedException {
+ QueryParserResult parseRes = parser.parse(schema, selectQry, false);
+
+ QueryParserResultSelect select = parseRes.select();
- // Execute SQL.
assert select != null;
+ Iterable<List<?>> iter = executeSelect0(
+ parseRes.queryDescriptor(),
+ parseRes.queryParameters(),
+ select,
+ true,
+ mvccTracker,
+ cancel
+ );
+
+ QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(iter, cancel);
+
+ cursor.fieldsMeta(select.meta());
+
+ return cursor;
+ }
+
+ /**
+ * Execute an all-ready {@link SqlFieldsQuery}.
+ *
+ * @param qryDesc Plan key.
+ * @param qryParams Parameters.
+ * @param select Select.
+ * @param keepBinary Whether binary objects must not be deserialized automatically.
+ * @param mvccTracker MVCC tracker.
+ * @param cancel Query cancel state holder.
+ * @return Query result.
+ */
+ private Iterable<List<?>> executeSelect0(
+ QueryDescriptor qryDesc,
+ QueryParameters qryParams,
+ QueryParserResultSelect select,
+ boolean keepBinary,
+ MvccQueryTracker mvccTracker,
+ GridQueryCancel cancel
+ ) throws IgniteCheckedException {
+ boolean autoStartTx = mvccEnabled(ctx) && !qryParams.autoCommit() && tx(ctx) == null;
+
+ Iterable<List<?>> iter;
+
if (select.splitNeeded()) {
// Distributed query.
GridCacheTwoStepQuery twoStepQry = select.twoStepQuery();
assert twoStepQry != null;
- if (ctx.security().enabled())
- checkSecurity(twoStepQry.cacheIds());
-
- FieldsQueryCursor<List<?>> res = executeQueryWithSplit(
- schemaName,
- qry,
+ iter = executeSelectDistributed(
+ qryDesc,
+ qryParams,
twoStepQry,
- select.meta(),
keepBinary,
- startTx,
+ autoStartTx,
mvccTracker,
- cancel,
- registerAsNewQry
+ cancel
);
-
- return Collections.singletonList(res);
}
else {
// Local query.
- Long qryId = registerRunningQuery(schemaName, cancel, sqlQry, loc, registerAsNewQry);
+ IndexingQueryFilter filter = (qryDesc.local() ? backupFilter(null, qryParams.partitions()) : null);
- try {
- FieldsQueryCursor<List<?>> res = executeQueryLocal(
- schemaName,
- qry,
- select.meta(),
- keepBinary,
- filter,
- cancel,
- qryId
- );
-
- return Collections.singletonList(res);
- }
- catch (IgniteCheckedException e) {
- runningQryMgr.unregister(qryId, true);
+ GridQueryFieldsResult res = executeSelectLocal(
+ qryDesc,
+ qryParams,
+ select,
+ filter,
+ autoStartTx,
+ mvccTracker,
+ cancel
+ );
- throw new IgniteSQLException("Failed to execute local statement [stmt=" + sqlQry +
- ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e);
- }
+ iter = () -> {
+ try {
+ return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ };
}
+
+ return iter;
}
/**
- * @param schemaName Schema name.
+ * Register running query.
+ *
+ * @param qryDesc Query descriptor.
* @param cancel Query cancel state holder.
- * @param qry Query.
- * @param loc {@code true} for local query.
- * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
* @return Id of registered query or {@code null} if query wasn't registered.
*/
- private Long registerRunningQuery(String schemaName, GridQueryCancel cancel, String qry, boolean loc,
- boolean registerAsNewQry) {
- if (registerAsNewQry)
- return runningQryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, loc, cancel);
-
- return null;
+ private Long registerRunningQuery(QueryDescriptor qryDesc, GridQueryCancel cancel) {
+ return runningQryMgr.register(
+ qryDesc.sql(),
+ GridCacheQueryType.SQL_FIELDS,
+ qryDesc.schemaName(),
+ qryDesc.local(),
+ cancel
+ );
}
/**
@@ -1479,18 +1507,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- /**
- * @param qry Sql fields query.autoStartTx(qry)
- * @return {@code True} if need to start transaction.
- */
- @SuppressWarnings("SimplifiableIfStatement")
- private boolean autoStartTx(SqlFieldsQuery qry) {
- if (!mvccEnabled(ctx))
- return false;
-
- return qry instanceof SqlFieldsQueryEx && !((SqlFieldsQueryEx)qry).isAutoCommit() && tx(ctx) == null;
- }
-
/** {@inheritDoc} */
@Override public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(
GridCacheContext<?, ?> cctx,
@@ -1549,44 +1565,40 @@ public class IgniteH2Indexing implements GridQueryIndexing {
// Force keepBinary for operation context to avoid binary deserialization inside entry processor
DmlUtils.setKeepBinaryContext(planCctx);
+ SqlFieldsQuery selectFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fldsQry.isCollocated())
+ .setArgs(fldsQry.getArgs())
+ .setDistributedJoins(fldsQry.isDistributedJoins())
+ .setEnforceJoinOrder(fldsQry.isEnforceJoinOrder())
+ .setLocal(fldsQry.isLocal())
+ .setPageSize(fldsQry.getPageSize())
+ .setTimeout(fldsQry.getTimeout(), TimeUnit.MILLISECONDS)
+ .setDataPageScanEnabled(fldsQry.isDataPageScanEnabled());
+
QueryCursorImpl<List<?>> cur;
// Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
// sub-query and not some dummy stuff like "select 1, 2, 3;"
if (!loc && !plan.isLocalSubquery()) {
- SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fldsQry.isCollocated())
- .setArgs(fldsQry.getArgs())
- .setDistributedJoins(fldsQry.isDistributedJoins())
- .setEnforceJoinOrder(fldsQry.isEnforceJoinOrder())
- .setLocal(fldsQry.isLocal())
- .setPageSize(fldsQry.getPageSize())
- .setTimeout(fldsQry.getTimeout(), TimeUnit.MILLISECONDS)
- .setDataPageScanEnabled(fldsQry.isDataPageScanEnabled());
-
- cur = (QueryCursorImpl<List<?>>)querySqlFields(
+ cur = executeSelectForDml(
schema,
- newFieldsQry,
- null,
- true,
- true,
+ selectFieldsQry,
new StaticMvccQueryTracker(planCctx, mvccSnapshot),
- cancel,
- false
- ).get(0);
+ cancel
+ );
}
else {
- GridQueryFieldsResult res = executeQueryLocal0(
- schema,
- plan.selectQuery(),
- F.asList(fldsQry.getArgs()),
- null,
+ selectFieldsQry.setLocal(true);
+
+ QueryParserResult selectParseRes = parser.parse(schema, selectFieldsQry, false);
+
+ GridQueryFieldsResult res = executeSelectLocal(
+ selectParseRes.queryDescriptor(),
+ selectParseRes.queryParameters(),
+ selectParseRes.select(),
filter,
- fldsQry.isEnforceJoinOrder(),
false,
- fldsQry.getTimeout(),
- cancel,
new StaticMvccQueryTracker(planCctx, mvccSnapshot),
- null
+ cancel
);
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@@ -1607,88 +1619,73 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Run distributed query on detected set of partitions.
- * @param schemaName Schema name.
- * @param qry Original query.
+ *
+ * @param qryDesc Query descriptor.
+ * @param qryParams Query parameters.
* @param twoStepQry Two-step query.
- * @param meta Metadata to set to cursor.
* @param keepBinary Keep binary flag.
- * @param startTx Start transaction flag.
+ * @param autoStartTx Start transaction flag.
* @param mvccTracker Query tracker.
* @param cancel Cancel handler.
- * @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
* @return Cursor representing distributed query result.
*/
- private FieldsQueryCursor<List<?>> executeQueryWithSplit(String schemaName, SqlFieldsQuery qry,
- GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, boolean keepBinary,
- boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel, boolean registerAsNewQry) {
- if (log.isDebugEnabled())
- log.debug("Parsed query: `" + qry.getSql() + "` into two step query: " + twoStepQry);
-
- if (cancel == null)
- cancel = new GridQueryCancel();
-
- Long qryId = registerRunningQuery(schemaName, cancel, qry.getSql(), qry.isLocal(), registerAsNewQry);
-
- boolean cursorCreated = false;
- boolean failed = true;
-
- try {
- // When explicit partitions are set, there must be an owning cache they should be applied to.
- int explicitParts[] = qry.getPartitions();
- PartitionResult derivedParts = twoStepQry.derivedPartitions();
-
- int parts[] = PartitionResult.calculatePartitions(explicitParts, derivedParts, qry.getArgs());
+ @SuppressWarnings("IfMayBeConditional")
+ private Iterable<List<?>> executeSelectDistributed(
+ QueryDescriptor qryDesc,
+ QueryParameters qryParams,
+ GridCacheTwoStepQuery twoStepQry,
+ boolean keepBinary,
+ boolean autoStartTx,
+ MvccQueryTracker mvccTracker,
+ GridQueryCancel cancel
+ ) {
+ // When explicit partitions are set, there must be an owning cache they should be applied to.
+ PartitionResult derivedParts = twoStepQry.derivedPartitions();
- if (parts != null && parts.length == 0) {
- failed = false;
+ int parts[] = PartitionResult.calculatePartitions(
+ qryParams.partitions(),
+ derivedParts,
+ qryParams.arguments()
+ );
- return new QueryCursorImpl<>(new Iterable<List<?>>() {
- @SuppressWarnings("NullableProblems")
- @Override public Iterator<List<?>> iterator() {
- return new Iterator<List<?>>() {
- @Override public boolean hasNext() {
- return false;
- }
+ Iterable<List<?>> iter;
- @SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException")
- @Override public List<?> next() {
- return null;
- }
- };
- }
- });
- }
+ if (parts != null && parts.length == 0) {
+ iter = new Iterable<List<?>>() {
+ @SuppressWarnings("NullableProblems")
+ @Override public Iterator<List<?>> iterator() {
+ return new Iterator<List<?>>() {
+ @Override public boolean hasNext() {
+ return false;
+ }
- Iterable<List<?>> iter = runQueryTwoStep(
- schemaName,
+ @SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException")
+ @Override public List<?> next() {
+ return null;
+ }
+ };
+ }
+ };
+ }
+ else {
+ iter = runQueryTwoStep(
+ qryDesc.schemaName(),
twoStepQry,
keepBinary,
- qry.isEnforceJoinOrder(),
- startTx,
- qry.getTimeout(),
+ qryDesc.enforceJoinOrder(),
+ autoStartTx,
+ qryParams.timeout(),
cancel,
- qry.getArgs(),
+ qryParams.arguments(),
parts,
- qry.isLazy(),
+ qryParams.lazy(),
mvccTracker,
- qry.isDataPageScanEnabled(),
- qry.getPageSize()
+ qryParams.dataPageScanEnabled(),
+ qryParams.pageSize()
);
-
- QueryCursorImpl<List<?>> cursor = registerAsNewQry
- ? new RegisteredQueryCursor<>(iter, cancel, runningQueryManager(), qryId)
- : new QueryCursorImpl<>(iter, cancel);
-
- cursor.fieldsMeta(meta);
-
- cursorCreated = true;
-
- return cursor;
- }
- finally {
- if (!cursorCreated)
- runningQryMgr.unregister(qryId, failed);
}
+
+ return iter;
}
/**
@@ -1733,7 +1730,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert dml != null;
- return executeUpdate(schemaName, dml, qry, loc, filter, cancel);
+ return executeUpdate(
+ parseRes.queryDescriptor(),
+ parseRes.queryParameters(),
+ dml,
+ loc,
+ filter,
+ cancel
+ );
}
/**
@@ -2260,26 +2264,24 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * @param schemaName Schema.
+ * @param qryDesc Query descriptor.
+ * @param qryParams Query parameters.
* @param dml DML statement.
- * @param fieldsQry Initial query
* @param cancel Query cancel.
* @return Update result wrapped into {@link GridQueryFieldsResult}
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("unchecked")
private List<QueryCursorImpl<List<?>>> executeUpdateDistributed(
- String schemaName,
+ QueryDescriptor qryDesc,
+ QueryParameters qryParams,
QueryParserResultDml dml,
- SqlFieldsQuery fieldsQry,
GridQueryCancel cancel
) throws IgniteCheckedException {
- if (DmlUtils.isBatched(fieldsQry)) {
- SqlFieldsQueryEx fieldsQry0 = (SqlFieldsQueryEx)fieldsQry;
-
+ if (qryDesc.batched()) {
Collection<UpdateResult> ress;
- List<Object[]> argss = fieldsQry0.batchedArguments();
+ List<Object[]> argss = qryParams.batchedArguments();
UpdatePlan plan = dml.plan();
@@ -2292,7 +2294,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
try {
List<List<List<?>>> cur = plan.createRows(argss);
- ress = DmlUtils.processSelectResultBatched(plan, cur, fieldsQry0.getPageSize());
+ ress = DmlUtils.processSelectResultBatched(plan, cur, qryParams.pageSize());
}
finally {
DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
@@ -2309,15 +2311,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
int cntr = 0;
for (Object[] args : argss) {
- SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)fieldsQry0.copy();
-
- qry0.clearBatchedArgs();
- qry0.setArgs(args);
-
UpdateResult res;
try {
- res = executeUpdate(schemaName, dml, qry0, false, null, cancel);
+ res = executeUpdate(
+ qryDesc,
+ qryParams.toSingleBatchedArguments(args),
+ dml,
+ false,
+ null,
+ cancel
+ );
cntPerRow[cntr++] = (int)res.counter();
@@ -2356,7 +2360,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return resCurs;
}
else {
- UpdateResult res = executeUpdate(schemaName, dml, fieldsQry, false, null, cancel);
+ UpdateResult res = executeUpdate(
+ qryDesc,
+ qryParams,
+ dml,
+ false,
+ null,
+ cancel
+ );
res.throwIfError();
@@ -2372,19 +2383,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
*
- * @param schemaName Schema.
+ * @param qryDesc Query descriptor.
+ * @param qryParams Query parameters.
* @param dml DML command.
- * @param fieldsQry Original query.
* @param loc Query locality flag.
* @param filters Cache name and key filter.
* @param cancel Cancel.
* @return Update result (modified items count and failed keys).
* @throws IgniteCheckedException if failed.
*/
+ @SuppressWarnings("IfMayBeConditional")
private UpdateResult executeUpdate(
- String schemaName,
+ QueryDescriptor qryDesc,
+ QueryParameters qryParams,
QueryParserResultDml dml,
- SqlFieldsQuery fieldsQry,
boolean loc,
IndexingQueryFilter filters,
GridQueryCancel cancel
@@ -2393,20 +2405,39 @@ public class IgniteH2Indexing implements GridQueryIndexing {
long items = 0;
- UpdatePlan plan = dml.plan();
+ GridCacheContext<?, ?> cctx = dml.plan().cacheContext();
+
+ boolean transactional = cctx != null && cctx.mvccEnabled();
- GridCacheContext<?, ?> cctx = plan.cacheContext();
+ int maxRetryCnt = transactional ? 1 : DFLT_UPDATE_RERUN_ATTEMPTS;
- for (int i = 0; i < DFLT_UPDATE_RERUN_ATTEMPTS; i++) {
- CacheOperationContext opCtx = DmlUtils.setKeepBinaryContext(cctx);
+ for (int i = 0; i < maxRetryCnt; i++) {
+ CacheOperationContext opCtx = cctx != null ? DmlUtils.setKeepBinaryContext(cctx) : null;
UpdateResult r;
try {
- r = executeUpdate0(schemaName, plan, fieldsQry, loc, filters, cancel);
+ if (transactional)
+ r = executeUpdateTransactional(
+ qryDesc,
+ qryParams,
+ dml,
+ loc,
+ cancel
+ );
+ else
+ r = executeUpdateNonTransactional(
+ qryDesc,
+ qryParams,
+ dml,
+ loc,
+ filters,
+ cancel
+ );
}
finally {
- DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
+ if (opCtx != null)
+ DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
}
items += r.counter();
@@ -2427,185 +2458,65 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * Actually perform SQL DML operation locally.
+ * Execute update in non-transactional mode.
*
- * @param schemaName Schema name.
- * @param plan Cache context.
- * @param fieldsQry Fields query.
- * @param loc Local query flag.
- * @param filters Cache name and key filter.
- * @param cancel Query cancel state holder.
- * @return Pair [number of successfully processed items; keys that have failed to be processed]
- * @throws IgniteCheckedException if failed.
+ * @param qryDesc Query descriptor.
+ * @param qryParams Query parameters.
+ * @param dml Plan.
+ * @param loc Local flag.
+ * @param filters Filters.
+ * @param cancel Cancel hook.
+ * @return Update result.
+ * @throws IgniteCheckedException If failed.
*/
- @SuppressWarnings({"ConstantConditions"})
- private UpdateResult executeUpdate0(
- String schemaName,
- final UpdatePlan plan,
- SqlFieldsQuery fieldsQry,
+ private UpdateResult executeUpdateNonTransactional(
+ QueryDescriptor qryDesc,
+ QueryParameters qryParams,
+ QueryParserResultDml dml,
boolean loc,
IndexingQueryFilter filters,
GridQueryCancel cancel
) throws IgniteCheckedException {
- GridCacheContext cctx = plan.cacheContext();
-
- DmlDistributedPlanInfo distributedPlan = loc ? null : plan.distributedPlan();
-
- if (cctx != null && cctx.mvccEnabled()) {
- assert cctx.transactional();
-
- GridNearTxLocal tx = tx(ctx);
-
- boolean implicit = (tx == null);
-
- boolean commit = implicit && (!(fieldsQry instanceof SqlFieldsQueryEx) ||
- ((SqlFieldsQueryEx)fieldsQry).isAutoCommit());
-
- if (implicit)
- tx = txStart(cctx, fieldsQry.getTimeout());
-
- requestSnapshot(tx);
-
- try (GridNearTxLocal toCommit = commit ? tx : null) {
- long timeout = implicit
- ? tx.remainingTime()
- : operationTimeout(fieldsQry.getTimeout(), tx);
-
- if (cctx.isReplicated() || distributedPlan == null || ((plan.mode() == UpdateMode.INSERT
- || plan.mode() == UpdateMode.MERGE) && !plan.isLocalSubquery())) {
-
- boolean sequential = true;
-
- UpdateSourceIterator<?> it;
-
- if (plan.fastResult()) {
- IgniteBiTuple row = plan.getFastRow(fieldsQry.getArgs());
-
- EnlistOperation op = UpdatePlan.enlistOperation(plan.mode());
-
- it = new DmlUpdateSingleEntryIterator<>(op, op.isDeleteOrLock() ? row.getKey() : row);
- }
- else if (plan.hasRows())
- it = new DmlUpdateResultsIterator(UpdatePlan.enlistOperation(plan.mode()), plan, plan.createRows(fieldsQry.getArgs()));
- else {
- // TODO IGNITE-8865 if there is no ORDER BY statement it's no use to retain entries order on locking (sequential = false).
- SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
- .setArgs(fieldsQry.getArgs())
- .setDistributedJoins(fieldsQry.isDistributedJoins())
- .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
- .setLocal(fieldsQry.isLocal())
- .setPageSize(fieldsQry.getPageSize())
- .setTimeout((int)timeout, TimeUnit.MILLISECONDS)
- .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
-
- FieldsQueryCursor<List<?>> cur = querySqlFields(schemaName, newFieldsQry, null,
- true, true, MvccUtils.mvccTracker(cctx, tx), cancel, false).get(0);
-
- it = plan.iteratorForTransaction(connMgr, cur);
- }
-
- IgniteInternalFuture<Long> fut = tx.updateAsync(cctx, it,
- fieldsQry.getPageSize(), timeout, sequential);
-
- UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
-
- if (commit)
- toCommit.commit();
-
- return res;
- }
-
- int[] ids = U.toIntArray(distributedPlan.getCacheIds());
-
- int flags = 0;
-
- if (fieldsQry.isEnforceJoinOrder())
- flags |= GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
-
- if (distributedPlan.isReplicatedOnly())
- flags |= GridH2QueryRequest.FLAG_REPLICATED;
-
- flags = GridH2QueryRequest.setDataPageScanEnabled(flags,
- fieldsQry.isDataPageScanEnabled());
-
- int[] parts = PartitionResult.calculatePartitions(
- fieldsQry.getPartitions(),
- distributedPlan.derivedPartitions(),
- fieldsQry.getArgs());
-
- if (parts != null && parts.length == 0)
- return new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
- else {
- IgniteInternalFuture<Long> fut = tx.updateAsync(
- cctx,
- ids,
- parts,
- schemaName,
- fieldsQry.getSql(),
- fieldsQry.getArgs(),
- flags,
- fieldsQry.getPageSize(),
- timeout);
-
- UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
-
- if (commit)
- toCommit.commit();
-
- return res;
- }
- }
- catch (ClusterTopologyServerNotFoundException e) {
- throw new CacheServerNotFoundException(e.getMessage(), e);
- }
- catch (IgniteCheckedException e) {
- IgniteSQLException sqlEx = X.cause(e, IgniteSQLException.class);
-
- if(sqlEx != null)
- throw sqlEx;
-
- Exception ex = IgniteUtils.convertExceptionNoWrap(e);
-
- if (ex instanceof IgniteException)
- throw (IgniteException)ex;
-
- U.error(log, "Error during update [localNodeId=" + ctx.localNodeId() + "]", ex);
-
- throw new IgniteSQLException("Failed to run update. " + ex.getMessage(), ex);
- }
- finally {
- if (commit)
- cctx.tm().resetContext();
- }
- }
+ UpdatePlan plan = dml.plan();
- UpdateResult fastUpdateRes = plan.processFast(fieldsQry.getArgs());
+ UpdateResult fastUpdateRes = plan.processFast(qryParams.arguments());
if (fastUpdateRes != null)
return fastUpdateRes;
+ DmlDistributedPlanInfo distributedPlan = loc ? null : plan.distributedPlan();
+
if (distributedPlan != null) {
if (cancel == null)
cancel = new GridQueryCancel();
UpdateResult result = rdcQryExec.update(
- schemaName,
+ qryDesc.schemaName(),
distributedPlan.getCacheIds(),
- fieldsQry.getSql(),
- fieldsQry.getArgs(),
- fieldsQry.isEnforceJoinOrder(),
- fieldsQry.getPageSize(),
- fieldsQry.getTimeout(),
- fieldsQry.getPartitions(),
+ qryDesc.sql(),
+ qryParams.arguments(),
+ qryDesc.enforceJoinOrder(),
+ qryParams.pageSize(),
+ qryParams.timeout(),
+ qryParams.partitions(),
distributedPlan.isReplicatedOnly(),
cancel
);
- // null is returned in case not all nodes support distributed DML.
+ // Null is returned in case not all nodes support distributed DML.
if (result != null)
return result;
}
+ SqlFieldsQuery selectFieldsQry = new SqlFieldsQuery(plan.selectQuery(), qryDesc.collocated())
+ .setArgs(qryParams.arguments())
+ .setDistributedJoins(qryDesc.distributedJoins())
+ .setEnforceJoinOrder(qryDesc.enforceJoinOrder())
+ .setLocal(qryDesc.local())
+ .setPageSize(qryParams.pageSize())
+ .setTimeout(qryParams.timeout(), TimeUnit.MILLISECONDS)
+ .setDataPageScanEnabled(qryParams.dataPageScanEnabled());
+
Iterable<List<?>> cur;
// Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
@@ -2613,32 +2524,28 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (!loc && !plan.isLocalSubquery()) {
assert !F.isEmpty(plan.selectQuery());
- SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
- .setArgs(fieldsQry.getArgs())
- .setDistributedJoins(fieldsQry.isDistributedJoins())
- .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
- .setLocal(fieldsQry.isLocal())
- .setPageSize(fieldsQry.getPageSize())
- .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS)
- .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
-
- cur = querySqlFields(schemaName, newFieldsQry, null, true, true, null, cancel, false).get(0);
+ cur = executeSelectForDml(
+ qryDesc.schemaName(),
+ selectFieldsQry,
+ null,
+ cancel
+ );
}
else if (plan.hasRows())
- cur = plan.createRows(fieldsQry.getArgs());
+ cur = plan.createRows(qryParams.arguments());
else {
- final GridQueryFieldsResult res = executeQueryLocal0(
- schemaName,
- plan.selectQuery(),
- F.asList(fieldsQry.getArgs()),
- null,
+ selectFieldsQry.setLocal(true);
+
+ QueryParserResult selectParseRes = parser.parse(qryDesc.schemaName(), selectFieldsQry, false);
+
+ final GridQueryFieldsResult res = executeSelectLocal(
+ selectParseRes.queryDescriptor(),
+ selectParseRes.queryParameters(),
+ selectParseRes.select(),
filters,
- fieldsQry.isEnforceJoinOrder(),
false,
- fieldsQry.getTimeout(),
- cancel,
null,
- null
+ cancel
);
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@@ -2654,8 +2561,176 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}, cancel);
}
- int pageSize = loc ? 0 : fieldsQry.getPageSize();
+ int pageSize = loc ? 0 : qryParams.pageSize();
return DmlUtils.processSelectResult(plan, cur, pageSize);
}
+
+ /**
+ * Execute update in transactional mode.
+ *
+ * @param qryDesc Query descriptor.
+ * @param qryParams Query parameters.
+ * @param dml Plan.
+ * @param loc Local flag.
+ * @param cancel Cancel hook.
+ * @return Update result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private UpdateResult executeUpdateTransactional(
+ QueryDescriptor qryDesc,
+ QueryParameters qryParams,
+ QueryParserResultDml dml,
+ boolean loc,
+ GridQueryCancel cancel
+ ) throws IgniteCheckedException {
+ UpdatePlan plan = dml.plan();
+
+ GridCacheContext cctx = plan.cacheContext();
+
+ assert cctx != null;
+ assert cctx.transactional();
+
+ GridNearTxLocal tx = tx(ctx);
+
+ boolean implicit = (tx == null);
+
+ boolean commit = implicit && qryParams.autoCommit();
+
+ if (implicit)
+ tx = txStart(cctx, qryParams.timeout());
+
+ requestSnapshot(tx);
+
+ try (GridNearTxLocal toCommit = commit ? tx : null) {
+ DmlDistributedPlanInfo distributedPlan = loc ? null : plan.distributedPlan();
+
+ long timeout = implicit
+ ? tx.remainingTime()
+ : operationTimeout(qryParams.timeout(), tx);
+
+ if (cctx.isReplicated() || distributedPlan == null || ((plan.mode() == UpdateMode.INSERT
+ || plan.mode() == UpdateMode.MERGE) && !plan.isLocalSubquery())) {
+
+ boolean sequential = true;
+
+ UpdateSourceIterator<?> it;
+
+ if (plan.fastResult()) {
+ IgniteBiTuple row = plan.getFastRow(qryParams.arguments());
+
+ assert row != null;
+
+ EnlistOperation op = UpdatePlan.enlistOperation(plan.mode());
+
+ it = new DmlUpdateSingleEntryIterator<>(op, op.isDeleteOrLock() ? row.getKey() : row);
+ }
+ else if (plan.hasRows()) {
+ it = new DmlUpdateResultsIterator(
+ UpdatePlan.enlistOperation(plan.mode()),
+ plan,
+ plan.createRows(qryParams.arguments())
+ );
+ }
+ else {
+ SqlFieldsQuery selectFieldsQry = new SqlFieldsQuery(plan.selectQuery(), qryDesc.collocated())
+ .setArgs(qryParams.arguments())
+ .setDistributedJoins(qryDesc.distributedJoins())
+ .setEnforceJoinOrder(qryDesc.enforceJoinOrder())
+ .setLocal(qryDesc.local())
+ .setPageSize(qryParams.pageSize())
+ .setTimeout((int)timeout, TimeUnit.MILLISECONDS)
+ .setDataPageScanEnabled(qryParams.dataPageScanEnabled());
+
+ FieldsQueryCursor<List<?>> cur = executeSelectForDml(
+ qryDesc.schemaName(),
+ selectFieldsQry,
+ MvccUtils.mvccTracker(cctx, tx),
+ cancel
+ );
+
+ it = plan.iteratorForTransaction(connMgr, cur);
+ }
+
+ IgniteInternalFuture<Long> fut = tx.updateAsync(
+ cctx,
+ it,
+ qryParams.pageSize(),
+ timeout,
+ sequential
+ );
+
+ UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
+
+ if (commit)
+ toCommit.commit();
+
+ return res;
+ }
+
+ int[] ids = U.toIntArray(distributedPlan.getCacheIds());
+
+ int flags = 0;
+
+ if (qryDesc.enforceJoinOrder())
+ flags |= GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
+
+ if (distributedPlan.isReplicatedOnly())
+ flags |= GridH2QueryRequest.FLAG_REPLICATED;
+
+ flags = GridH2QueryRequest.setDataPageScanEnabled(flags,
+ qryParams.dataPageScanEnabled());
+
+ int[] parts = PartitionResult.calculatePartitions(
+ qryParams.partitions(),
+ distributedPlan.derivedPartitions(),
+ qryParams.arguments()
+ );
+
+ if (parts != null && parts.length == 0)
+ return new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
+ else {
+ IgniteInternalFuture<Long> fut = tx.updateAsync(
+ cctx,
+ ids,
+ parts,
+ qryDesc.schemaName(),
+ qryDesc.sql(),
+ qryParams.arguments(),
+ flags,
+ qryParams.pageSize(),
+ timeout
+ );
+
+ UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
+
+ if (commit)
+ toCommit.commit();
+
+ return res;
+ }
+ }
+ catch (ClusterTopologyServerNotFoundException e) {
+ throw new CacheServerNotFoundException(e.getMessage(), e);
+ }
+ catch (IgniteCheckedException e) {
+ IgniteSQLException sqlEx = X.cause(e, IgniteSQLException.class);
+
+ if(sqlEx != null)
+ throw sqlEx;
+
+ Exception ex = IgniteUtils.convertExceptionNoWrap(e);
+
+ if (ex instanceof IgniteException)
+ throw (IgniteException)ex;
+
+ U.error(log, "Error during update [localNodeId=" + ctx.localNodeId() + "]", ex);
+
+ throw new IgniteSQLException("Failed to run update. " + ex.getMessage(), ex);
+ }
+ finally {
+ if (commit)
+ cctx.tm().resetContext();
+ }
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserCacheKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java
similarity index 52%
rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserCacheKey.java
rename to modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java
index 21bf405..2b1ceaf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserCacheKey.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryDescriptor.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.query.h2;
/**
* Key for cached two-step query.
*/
-public class QueryParserCacheKey {
+public class QueryDescriptor {
/** */
private final String schemaName;
@@ -28,7 +28,7 @@ public class QueryParserCacheKey {
private final String sql;
/** */
- private final boolean grpByCollocated;
+ private final boolean collocated;
/** */
private final boolean distributedJoins;
@@ -37,28 +37,97 @@ public class QueryParserCacheKey {
private final boolean enforceJoinOrder;
/** */
- private final boolean isLocal;
+ private final boolean loc;
+
+ /** Skip reducer on update flag. */
+ private final boolean skipReducerOnUpdate;
+
+ /** Batched flag. */
+ private final boolean batched;
/**
* @param schemaName Schema name.
* @param sql Sql.
- * @param grpByCollocated Collocated GROUP BY.
+ * @param collocated Collocated GROUP BY.
* @param distributedJoins Distributed joins enabled.
* @param enforceJoinOrder Enforce join order of tables.
- * @param isLocal Query is local flag.
+ * @param loc Query is local flag.
+ * @param skipReducerOnUpdate Skip reducer on update flag.
*/
- QueryParserCacheKey(String schemaName,
+ QueryDescriptor(
+ String schemaName,
String sql,
- boolean grpByCollocated,
+ boolean collocated,
boolean distributedJoins,
boolean enforceJoinOrder,
- boolean isLocal) {
+ boolean loc,
+ boolean skipReducerOnUpdate,
+ boolean batched
+ ) {
this.schemaName = schemaName;
this.sql = sql;
- this.grpByCollocated = grpByCollocated;
+ this.collocated = collocated;
this.distributedJoins = distributedJoins;
this.enforceJoinOrder = enforceJoinOrder;
- this.isLocal = isLocal;
+ this.loc = loc;
+ this.skipReducerOnUpdate = skipReducerOnUpdate;
+ this.batched = batched;
+ }
+
+ /**
+ * @return Schema name.
+ */
+ public String schemaName() {
+ return schemaName;
+ }
+
+ /**
+ * @return SQL.
+ */
+ public String sql() {
+ return sql;
+ }
+
+ /**
+ * @return Collocated GROUP BY flag.
+ */
+ public boolean collocated() {
+ return collocated;
+ }
+
+ /**
+ * @return Distributed joins flag.
+ */
+ public boolean distributedJoins() {
+ return distributedJoins;
+ }
+
+ /**
+ * @return Enforce join order flag.
+ */
+ public boolean enforceJoinOrder() {
+ return enforceJoinOrder;
+ }
+
+ /**
+ * @return Local flag.
+ */
+ public boolean local() {
+ return loc;
+ }
+
+ /**
+ * @return Skip reducer on update flag.
+ */
+ public boolean skipReducerOnUpdate() {
+ return skipReducerOnUpdate;
+ }
+
+ /**
+ * @return Batched flag.
+ */
+ public boolean batched() {
+ return batched;
}
/** {@inheritDoc} */
@@ -70,9 +139,9 @@ public class QueryParserCacheKey {
if (o == null || getClass() != o.getClass())
return false;
- QueryParserCacheKey that = (QueryParserCacheKey)o;
+ QueryDescriptor that = (QueryDescriptor)o;
- if (grpByCollocated != that.grpByCollocated)
+ if (collocated != that.collocated)
return false;
if (distributedJoins != that.distributedJoins)
@@ -81,20 +150,31 @@ public class QueryParserCacheKey {
if (enforceJoinOrder != that.enforceJoinOrder)
return false;
+ if (skipReducerOnUpdate != that.skipReducerOnUpdate)
+ return false;
+
+ if (batched != that.batched)
+ return false;
+
if (schemaName != null ? !schemaName.equals(that.schemaName) : that.schemaName != null)
return false;
- return isLocal == that.isLocal && sql.equals(that.sql);
+ return loc == that.loc && sql.equals(that.sql);
}
/** {@inheritDoc} */
+ @SuppressWarnings("AssignmentReplaceableWithOperatorAssignment")
@Override public int hashCode() {
int res = schemaName != null ? schemaName.hashCode() : 0;
+
res = 31 * res + sql.hashCode();
- res = 31 * res + (grpByCollocated ? 1 : 0);
+ res = 31 * res + (collocated ? 1 : 0);
+
res = res + (distributedJoins ? 2 : 0);
res = res + (enforceJoinOrder ? 4 : 0);
- res = res + (isLocal ? 8 : 0);
+ res = res + (loc ? 8 : 0);
+ res = res + (skipReducerOnUpdate ? 16 : 0);
+ res = res + (batched ? 32 : 0);
return res;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java
new file mode 100644
index 0000000..72c0626
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParameters.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.processors.query.NestedTxMode;
+
+import java.util.List;
+
+/**
+ * Query parameters which vary between requests having the same execution plan. Essentially, these are the arguments
+ * of original {@link org.apache.ignite.cache.query.SqlFieldsQuery} which are not part of {@link QueryDescriptor}.
+ */
+public class QueryParameters {
+ /** Arguments. */
+ private final Object[] args;
+
+ /** Partitions. */
+ private final int[] parts;
+
+ /** Timeout. */
+ private final int timeout;
+
+ /** Lazy flag. */
+ private final boolean lazy;
+
+ /** Page size. */
+ private final int pageSize;
+
+ /** Data page scan enabled flag. */
+ private final Boolean dataPageScanEnabled;
+
+ /** Nexted transactional mode. */
+ private final NestedTxMode nestedTxMode;
+
+ /** Auto-commit flag. */
+ private final boolean autoCommit;
+
+ /** Batched arguments. */
+ private final List<Object[]> batchedArgs;
+
+ /**
+ * Create parameters from query.
+ *
+ * @param qry Query.
+ * @return Parameters.
+ */
+ public static QueryParameters fromQuery(SqlFieldsQuery qry) {
+ NestedTxMode nestedTxMode = NestedTxMode.DEFAULT;
+ boolean autoCommit = true;
+ List<Object[]> batchedArgs = null;
+
+ if (qry instanceof SqlFieldsQueryEx) {
+ SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)qry;
+
+ if (qry0.getNestedTxMode() != null)
+ nestedTxMode = qry0.getNestedTxMode();
+
+ autoCommit = qry0.isAutoCommit();
+
+ batchedArgs = qry0.batchedArguments();
+ }
+
+ return new QueryParameters(
+ qry.getArgs(),
+ qry.getPartitions(),
+ qry.getTimeout(),
+ qry.isLazy(),
+ qry.getPageSize(),
+ qry.isDataPageScanEnabled(),
+ nestedTxMode,
+ autoCommit,
+ batchedArgs
+ );
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param args Arguments.
+ * @param parts Partitions.
+ * @param timeout Timeout.
+ * @param lazy Lazy flag.
+ * @param pageSize Page size.
+ * @param dataPageScanEnabled Data page scan enabled flag.
+ * @param nestedTxMode Nested TX mode.
+ * @param autoCommit Auto-commit flag.
+ * @param batchedArgs Batched arguments.
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ private QueryParameters(
+ Object[] args,
+ int[] parts,
+ int timeout,
+ boolean lazy,
+ int pageSize,
+ Boolean dataPageScanEnabled,
+ NestedTxMode nestedTxMode,
+ boolean autoCommit,
+ List<Object[]> batchedArgs
+ ) {
+ this.args = args;
+ this.parts = parts;
+ this.timeout = timeout;
+ this.lazy = lazy;
+ this.pageSize = pageSize;
+ this.dataPageScanEnabled = dataPageScanEnabled;
+ this.nestedTxMode = nestedTxMode;
+ this.autoCommit = autoCommit;
+ this.batchedArgs = batchedArgs;
+ }
+
+ /**
+ * @return Arguments.
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public Object[] arguments() {
+ return args;
+ }
+
+ /**
+ * @return Partitions.
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public int[] partitions() {
+ return parts;
+ }
+
+ /**
+ * @return Timeout.
+ */
+ public int timeout() {
+ return timeout;
+ }
+
+ /**
+ * @return Lazy flag.
+ */
+ public boolean lazy() {
+ return lazy;
+ }
+
+ /**
+ * @return Page size.
+ */
+ public int pageSize() {
+ return pageSize;
+ }
+
+ /**
+ * @return Data page scan enabled flag.
+ */
+ public Boolean dataPageScanEnabled() {
+ return dataPageScanEnabled;
+ }
+
+ /**
+ * @return Nested TX mode.
+ */
+ public NestedTxMode nestedTxMode() {
+ return nestedTxMode;
+ }
+
+ /**
+ * @return Auto-commit flag.
+ */
+ public boolean autoCommit() {
+ return autoCommit;
+ }
+
+ /**
+ * @return Batched arguments.
+ */
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+ public List<Object[]> batchedArguments() {
+ return batchedArgs;
+ }
+
+ /**
+ * Convert current batched arguments to a form with single arguments.
+ *
+ * @param args Arguments.
+ * @return Result.
+ */
+ public QueryParameters toSingleBatchedArguments(Object[] args) {
+ return new QueryParameters(
+ args,
+ this.parts,
+ this.timeout,
+ this.lazy,
+ this.pageSize,
+ this.dataPageScanEnabled,
+ this.nestedTxMode,
+ this.autoCommit,
+ null
+ );
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
index 6ddea41..cded1fe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.dml.DmlAstUtils;
-import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -94,7 +93,7 @@ public class QueryParser {
private final IgniteLogger log;
/** */
- private volatile GridBoundedConcurrentLinkedHashMap<QueryParserCacheKey, QueryParserCacheEntry> cache =
+ private volatile GridBoundedConcurrentLinkedHashMap<QueryDescriptor, QueryParserCacheEntry> cache =
new GridBoundedConcurrentLinkedHashMap<>(CACHE_SIZE);
/**
@@ -135,33 +134,32 @@ public class QueryParser {
* @return Parsing result that contains Parsed leading query and remaining sql script.
*/
private QueryParserResult parse0(String schemaName, SqlFieldsQuery qry, boolean remainingAllowed) {
- // First, let's check if we already have a two-step query for this statement...
- QueryParserCacheKey cachedKey = new QueryParserCacheKey(
- schemaName,
- qry.getSql(),
- qry.isCollocated(),
- qry.isDistributedJoins(),
- qry.isEnforceJoinOrder(),
- qry.isLocal()
- );
+ QueryDescriptor qryDesc = queryDescriptor(schemaName, qry);
- QueryParserCacheEntry cached = cache.get(cachedKey);
+ QueryParserCacheEntry cached = cache.get(qryDesc);
if (cached != null)
- return new QueryParserResult(qry, null, cached.select(), cached.dml(), cached.command());
+ return new QueryParserResult(
+ qryDesc,
+ QueryParameters.fromQuery(qry),
+ null,
+ cached.select(),
+ cached.dml(),
+ cached.command()
+ );
// Try parting as native command.
QueryParserResult parseRes = parseNative(schemaName, qry, remainingAllowed);
// Otherwise parse with H2.
if (parseRes == null)
- parseRes = parseH2(schemaName, qry, remainingAllowed);
+ parseRes = parseH2(schemaName, qry, qryDesc.batched(), remainingAllowed);
// Add to cache if not multi-statement.
if (parseRes.remainingQuery() == null) {
cached = new QueryParserCacheEntry(parseRes.select(), parseRes.dml(), parseRes.command());
- cache.put(cachedKey, cached);
+ cache.put(qryDesc, cached);
}
// Done.
@@ -209,6 +207,8 @@ public class QueryParser {
SqlFieldsQuery newQry = cloneFieldsQuery(qry).setSql(parser.lastCommandSql());
+ QueryDescriptor newPlanKey = queryDescriptor(schemaName, newQry);
+
SqlFieldsQuery remainingQry = null;
if (!F.isEmpty(parser.remainingSql())) {
@@ -219,7 +219,14 @@ public class QueryParser {
QueryParserResultCommand cmd = new QueryParserResultCommand(nativeCmd, null, false);
- return new QueryParserResult(newQry, remainingQry, null, null, cmd);
+ return new QueryParserResult(
+ newPlanKey,
+ QueryParameters.fromQuery(newQry),
+ remainingQry,
+ null,
+ null,
+ cmd
+ );
}
catch (SqlStrictParseException e) {
throw new IgniteSQLException(e.getMessage(), IgniteQueryErrorCode.PARSING, e);
@@ -246,11 +253,13 @@ public class QueryParser {
*
* @param schemaName Schema name.
* @param qry Query.
+ * @param batched Batched flag.
* @param remainingAllowed Whether multiple statements are allowed.
* @return Parsing result.
*/
@SuppressWarnings("IfMayBeConditional")
- private QueryParserResult parseH2(String schemaName, SqlFieldsQuery qry, boolean remainingAllowed) {
+ private QueryParserResult parseH2(String schemaName, SqlFieldsQuery qry, boolean batched,
+ boolean remainingAllowed) {
Connection c = connMgr.connectionForThread().connection(schemaName);
// For queries that are explicitly local, we rely on the flag specified in the query
@@ -303,7 +312,7 @@ public class QueryParser {
Object[] args = null;
Object[] remainingArgs = null;
- if (!DmlUtils.isBatched(qry) && paramsCnt > 0) {
+ if (!batched && paramsCnt > 0) {
if (argsOrig == null || argsOrig.length < paramsCnt)
// Not enough parameters, but we will handle this later on execution phase.
args = argsOrig;
@@ -319,6 +328,8 @@ public class QueryParser {
newQry.setArgs(args);
+ QueryDescriptor newQryDesc = queryDescriptor(schemaName, newQry);
+
if (remainingQry != null)
remainingQry.setArgs(remainingArgs);
@@ -328,17 +339,38 @@ public class QueryParser {
QueryParserResultCommand cmd = new QueryParserResultCommand(null, cmdH2, false);
- return new QueryParserResult(newQry, remainingQry, null, null, cmd);
+ return new QueryParserResult(
+ newQryDesc,
+ QueryParameters.fromQuery(newQry),
+ remainingQry,
+ null,
+ null,
+ cmd
+ );
}
else if (CommandProcessor.isCommandNoOp(prepared)) {
QueryParserResultCommand cmd = new QueryParserResultCommand(null, null, true);
- return new QueryParserResult(newQry, remainingQry, null, null, cmd);
+ return new QueryParserResult(
+ newQryDesc,
+ QueryParameters.fromQuery(newQry),
+ remainingQry,
+ null,
+ null,
+ cmd
+ );
}
else if (GridSqlQueryParser.isDml(prepared)) {
- QueryParserResultDml dml = prepareDmlStatement(schemaName, qry, prepared);
-
- return new QueryParserResult(newQry, remainingQry, null, dml, null);
+ QueryParserResultDml dml = prepareDmlStatement(newQryDesc, prepared);
+
+ return new QueryParserResult(
+ newQryDesc,
+ QueryParameters.fromQuery(newQry),
+ remainingQry,
+ null,
+ dml,
+ null
+ );
}
else if (!prepared.isQuery()) {
throw new IgniteSQLException("Unsupported statement: " + newQry.getSql(),
@@ -405,7 +437,14 @@ public class QueryParser {
forUpdate
);
- return new QueryParserResult(newQry, remainingQry, select, null, null);
+ return new QueryParserResult(
+ newQryDesc,
+ QueryParameters.fromQuery(newQry),
+ remainingQry,
+ select,
+ null,
+ null
+ );
}
catch (IgniteCheckedException e) {
throw new IgniteSQLException("Failed to parse query: " + newQry.getSql(), IgniteQueryErrorCode.PARSING,
@@ -478,14 +517,13 @@ public class QueryParser {
/**
* Prepare DML statement.
*
- * @param schemaName Schema name.
- * @param qry Query.
+ * @param planKey Plan key.
* @param prepared Prepared.
* @return Statement.
*/
- private QueryParserResultDml prepareDmlStatement(String schemaName, SqlFieldsQuery qry, Prepared prepared) {
- if (F.eq(QueryUtils.SCHEMA_SYS, schemaName))
- throw new IgniteSQLException("DML statements are not supported on " + schemaName + " schema",
+ private QueryParserResultDml prepareDmlStatement(QueryDescriptor planKey, Prepared prepared) {
+ if (F.eq(QueryUtils.SCHEMA_SYS, planKey.schemaName()))
+ throw new IgniteSQLException("DML statements are not supported on " + planKey.schemaName() + " schema",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
// Prepare AST.
@@ -532,11 +570,10 @@ public class QueryParser {
try {
plan = UpdatePlanBuilder.planForStatement(
- schemaName,
+ planKey,
stmt,
mvccEnabled,
- idx,
- qry
+ idx
);
}
catch (Exception e) {
@@ -585,4 +622,34 @@ public class QueryParser {
private static SqlFieldsQuery cloneFieldsQuery(SqlFieldsQuery oldQry) {
return oldQry.copy().setLocal(oldQry.isLocal()).setPageSize(oldQry.getPageSize());
}
+
+ /**
+ * Prepare plan key.
+ *
+ * @param schemaName Schema name.
+ * @param qry Query.
+ * @return Plan key.
+ */
+ private static QueryDescriptor queryDescriptor(String schemaName, SqlFieldsQuery qry) {
+ boolean skipReducerOnUpdate = false;
+ boolean batched = false;
+
+ if (qry instanceof SqlFieldsQueryEx) {
+ SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)qry;
+
+ skipReducerOnUpdate = !qry.isLocal() && qry0.isSkipReducerOnUpdate();
+ batched = qry0.isBatched();
+ }
+
+ return new QueryDescriptor(
+ schemaName,
+ qry.getSql(),
+ qry.isCollocated(),
+ qry.isDistributedJoins(),
+ qry.isEnforceJoinOrder(),
+ qry.isLocal(),
+ skipReducerOnUpdate,
+ batched
+ );
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResult.java
index 75caff9..e7c66e7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserResult.java
@@ -24,8 +24,11 @@ import org.jetbrains.annotations.Nullable;
* Result of parsing and splitting SQL from {@link SqlFieldsQuery}.
*/
public class QueryParserResult {
- /** New fields query that may be executed right away. */
- private final SqlFieldsQuery qry;
+ /** Query descriptor. */
+ private final QueryDescriptor qryDesc;
+
+ /** Query parameters. */
+ private final QueryParameters qryParams;
/** Remaining query. */
private final SqlFieldsQuery remainingQry;
@@ -42,20 +45,23 @@ public class QueryParserResult {
/**
* Constructor.
*
- * @param qry New query.
+ * @param qryDesc Query descriptor.
+ * @param qryParams Query parameters.
* @param remainingQry Remaining query.
* @param select Select.
* @param dml DML.
* @param cmd Command.
*/
public QueryParserResult(
- SqlFieldsQuery qry,
+ QueryDescriptor qryDesc,
+ QueryParameters qryParams,
SqlFieldsQuery remainingQry,
@Nullable QueryParserResultSelect select,
@Nullable QueryParserResultDml dml,
@Nullable QueryParserResultCommand cmd
) {
- this.qry = qry;
+ this.qryDesc = qryDesc;
+ this.qryParams = qryParams;
this.remainingQry = remainingQry;
this.select = select;
this.dml = dml;
@@ -63,10 +69,17 @@ public class QueryParserResult {
}
/**
- * @return New fields query that may be executed right away.
+ * @return Query descriptor.
+ */
+ public QueryDescriptor queryDescriptor() {
+ return qryDesc;
+ }
+
+ /**
+ * @return Query parameters.
*/
- public SqlFieldsQuery query() {
- return qry;
+ public QueryParameters queryParameters() {
+ return qryParams;
}
/**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index aa47c1b..6af6955 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -30,11 +30,9 @@ import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -42,6 +40,7 @@ import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessor;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.QueryDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlColumn;
@@ -67,7 +66,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.h2.table.Column;
-import org.jetbrains.annotations.Nullable;
/**
* Logic for building update plans performed by {@link DmlStatementsProcessor}.
@@ -88,25 +86,23 @@ public final class UpdatePlanBuilder {
* Generate SELECT statements to retrieve data for modifications from and find fast UPDATE or DELETE args,
* if available.
*
- * @param schemaName Schema name.
+ * @param planKey Plan key.
* @param stmt Statement.
* @param mvccEnabled MVCC enabled flag.
* @param idx Indexing.
- * @param fieldsQry Original query.
* @return Update plan.
*/
@SuppressWarnings("ConstantConditions")
public static UpdatePlan planForStatement(
- String schemaName,
+ QueryDescriptor planKey,
GridSqlStatement stmt,
boolean mvccEnabled,
- IgniteH2Indexing idx,
- @Nullable SqlFieldsQuery fieldsQry
+ IgniteH2Indexing idx
) throws IgniteCheckedException {
if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert)
- return planForInsert(schemaName, stmt, idx, mvccEnabled, fieldsQry);
+ return planForInsert(planKey, stmt, idx, mvccEnabled);
else if (stmt instanceof GridSqlUpdate || stmt instanceof GridSqlDelete)
- return planForUpdate(schemaName, stmt, idx, mvccEnabled, fieldsQry);
+ return planForUpdate(planKey, stmt, idx, mvccEnabled);
else
throw new IgniteSQLException("Unsupported operation: " + stmt.getSQL(),
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
@@ -115,21 +111,19 @@ public final class UpdatePlanBuilder {
/**
* Prepare update plan for INSERT or MERGE.
*
- * @param schemaName Schema name.
+ * @param planKey Plan key.
* @param stmt INSERT or MERGE statement.
* @param idx Indexing.
* @param mvccEnabled Mvcc flag.
- * @param fieldsQuery Original query.
* @return Update plan.
* @throws IgniteCheckedException if failed.
*/
@SuppressWarnings("ConstantConditions")
private static UpdatePlan planForInsert(
- String schemaName,
+ QueryDescriptor planKey,
GridSqlStatement stmt,
IgniteH2Indexing idx,
- boolean mvccEnabled,
- @Nullable SqlFieldsQuery fieldsQuery
+ boolean mvccEnabled
) throws IgniteCheckedException {
GridSqlQuery sel = null;
@@ -258,8 +252,7 @@ public final class UpdatePlanBuilder {
distributed = checkPlanCanBeDistributed(
idx,
mvccEnabled,
- schemaName,
- fieldsQuery,
+ planKey,
selectSql,
tbl.dataTable().cacheName()
);
@@ -333,20 +326,18 @@ public final class UpdatePlanBuilder {
/**
* Prepare update plan for UPDATE or DELETE.
*
- * @param schemaName Schema name.
+ * @param planKey Plan key.
* @param stmt UPDATE or DELETE statement.
* @param idx Indexing.
* @param mvccEnabled MVCC flag.
- * @param fieldsQuery Original query.
* @return Update plan.
* @throws IgniteCheckedException if failed.
*/
private static UpdatePlan planForUpdate(
- String schemaName,
+ QueryDescriptor planKey,
GridSqlStatement stmt,
IgniteH2Indexing idx,
- boolean mvccEnabled,
- @Nullable SqlFieldsQuery fieldsQuery
+ boolean mvccEnabled
) throws IgniteCheckedException {
GridSqlElement target;
@@ -442,8 +433,7 @@ public final class UpdatePlanBuilder {
distributed = checkPlanCanBeDistributed(
idx,
mvccEnabled,
- schemaName,
- fieldsQuery,
+ planKey,
selectSql,
tbl.dataTable().cacheName()
);
@@ -477,8 +467,7 @@ public final class UpdatePlanBuilder {
distributed = checkPlanCanBeDistributed(
idx,
mvccEnabled,
- schemaName,
- fieldsQuery,
+ planKey,
selectSql,
tbl.dataTable().cacheName()
);
@@ -862,8 +851,7 @@ public final class UpdatePlanBuilder {
*
* @param idx Indexing.
* @param mvccEnabled Mvcc flag.
- * @param schemaName Schema name.
- * @param fieldsQry Initial update query.
+ * @param planKey Plan key.
* @param selectQry Derived select query.
* @param cacheName Cache name.
* @return distributed update plan info, or {@code null} if cannot be distributed.
@@ -872,24 +860,23 @@ public final class UpdatePlanBuilder {
private static DmlDistributedPlanInfo checkPlanCanBeDistributed(
IgniteH2Indexing idx,
boolean mvccEnabled,
- String schemaName,
- SqlFieldsQuery fieldsQry,
+ QueryDescriptor planKey,
String selectQry,
String cacheName
)
throws IgniteCheckedException {
- if ((!mvccEnabled && !isSkipReducerOnUpdateQuery(fieldsQry)) || DmlUtils.isBatched(fieldsQry))
+ if ((!mvccEnabled && !planKey.skipReducerOnUpdate()) || planKey.batched())
return null;
- try (Connection conn = idx.connections().connectionNoCache(schemaName)) {
+ try (Connection conn = idx.connections().connectionNoCache(planKey.schemaName())) {
// Get a new prepared statement for derived select query.
try (PreparedStatement stmt = conn.prepareStatement(selectQry)) {
GridCacheTwoStepQuery qry = GridSqlQuerySplitter.split(
conn,
GridSqlQueryParser.prepared(stmt),
- fieldsQry.isCollocated(),
- fieldsQry.isDistributedJoins(),
- fieldsQry.isEnforceJoinOrder(),
+ planKey.collocated(),
+ planKey.distributedJoins(),
+ planKey.enforceJoinOrder(),
false,
idx
);
@@ -917,17 +904,6 @@ public final class UpdatePlanBuilder {
}
/**
- * Checks whether query flags are compatible with server side update.
- *
- * @param qry Query.
- * @return {@code true} if update can be distributed.
- */
- private static boolean isSkipReducerOnUpdateQuery(SqlFieldsQuery qry) {
- return qry != null && !qry.isLocal() &&
- qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isSkipReducerOnUpdate();
- }
-
- /**
* Simple supplier that just takes specified element of a given row.
*/
private static final class PlainValueSupplier implements KeyValueSupplier {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
index e7f3b8c..be90a5d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.util.typedef.G;
@@ -65,6 +64,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath;
@@ -312,6 +312,7 @@ public class RunningQueriesTest extends AbstractIndexingCommonTest {
*
* @throws Exception Exception in case of failure.
*/
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-11510")
@Test
public void testQueryDmlDelete() throws Exception {
testQueryDML("DELETE FROM /* comment */ Integer");
@@ -322,6 +323,7 @@ public class RunningQueriesTest extends AbstractIndexingCommonTest {
*
* @throws Exception Exception in case of failure.
*/
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-11510")
@Test
public void testQueryDmlInsert() throws Exception {
testQueryDML("INSERT INTO Integer(_key, _val) VALUES(1,1)");
@@ -332,6 +334,7 @@ public class RunningQueriesTest extends AbstractIndexingCommonTest {
*
* @throws Exception Exception in case of failure.
*/
+ @Ignore("https://issues.apache.org/jira/browse/IGNITE-11510")
@Test
public void testQueryDmlUpdate() throws Exception {
testQueryDML("UPDATE Integer set _val = 1 where 1=1");
@@ -644,11 +647,23 @@ public class RunningQueriesTest extends AbstractIndexingCommonTest {
*/
private static class BlockingIndexing extends IgniteH2Indexing {
/** {@inheritDoc} */
- @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
- @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts,
- MvccQueryTracker tracker, GridQueryCancel cancel, boolean registerAsNewQry) {
- List<FieldsQueryCursor<List<?>>> res = super.querySqlFields(schemaName, qry, cliCtx, keepBinary,
- failOnMultipleStmts, tracker, cancel, registerAsNewQry);
+ @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
+ String schemaName,
+ SqlFieldsQuery qry,
+ @Nullable SqlClientContext cliCtx,
+ boolean keepBinary,
+ boolean failOnMultipleStmts,
+ GridQueryCancel cancel
+ ) {
+ List<FieldsQueryCursor<List<?>>> res = super.querySqlFields(
+ schemaName,
+ qry,
+ cliCtx,
+ keepBinary,
+ failOnMultipleStmts,
+ cancel
+ );
+
try {
awaitTimeouted();
}