You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/08 10:14:50 UTC
[ignite] branch master updated: IGNITE-11209: SQL: Merged
IgniteH2Indexing and DmlStatemntsProcessor. This closes #6035.
This is an automated email from the ASF dual-hosted git repository.
vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5ca26ce1 IGNITE-11209: SQL: Merged IgniteH2Indexing and DmlStatemntsProcessor. This closes #6035.
5ca26ce1 is described below
commit 5ca26ce1e3fd0c299dfc9e9ce195f54ac74382ae
Author: devozerov <vo...@gridgain.com>
AuthorDate: Fri Feb 8 13:14:04 2019 +0300
IGNITE-11209: SQL: Merged IgniteH2Indexing and DmlStatemntsProcessor. This closes #6035.
---
.../dht/GridDhtTxQueryEnlistFuture.java | 16 +-
.../processors/query/GridQueryIndexing.java | 33 +-
.../processors/query/GridQueryProcessor.java | 34 +-
.../apache/ignite/internal/util/IgniteUtils.java | 11 +
.../IgniteClientCacheInitializationFailTest.java | 9 +-
.../query/h2/DmlStatementsProcessor.java | 1457 +-------------------
.../processors/query/h2/H2CachedStatementKey.java | 17 +-
.../processors/query/h2/IgniteH2Indexing.java | 928 +++++++++++--
.../internal/processors/query/h2/UpdateResult.java | 22 +
.../query/h2/ddl/DdlStatementsProcessor.java | 13 +
.../processors/query/h2/dml/DmlAstUtils.java | 47 +-
.../query/h2/dml/DmlBulkLoadDataConverter.java | 52 +
.../query/h2/dml/DmlUpdateResultsIterator.java | 71 +
.../query/h2/dml/DmlUpdateSingleEntryIterator.java | 66 +
.../internal/processors/query/h2/dml/DmlUtils.java | 420 +++++-
.../processors/query/h2/dml/UpdatePlanBuilder.java | 39 +-
.../query/h2/sql/GridSqlQueryParser.java | 10 +
.../query/h2/twostep/GridMapQueryExecutor.java | 2 +-
.../query/h2/GridH2IndexingInMemSelfTest.java | 25 -
.../query/h2/GridH2IndexingOffheapSelfTest.java | 28 -
.../query/h2/GridIndexingSpiAbstractSelfTest.java | 174 ---
.../IgniteBinaryCacheQueryTestSuite.java | 7 -
22 files changed, 1594 insertions(+), 1887 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
index cea50d5..ee49c6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistFuture.java
@@ -121,8 +121,20 @@ public final class GridDhtTxQueryEnlistFuture extends GridDhtTxQueryAbstractEnli
@Override protected UpdateSourceIterator<?> createIterator() throws IgniteCheckedException {
checkPartitions(parts);
- return cctx.kernalContext().query().prepareDistributedUpdate(cctx, cacheIds, parts, schema, qry,
- params, flags, pageSize, 0, tx.topologyVersionSnapshot(), mvccSnapshot, new GridQueryCancel());
+ return cctx.kernalContext().query().executeUpdateOnDataNodeTransactional(
+ cctx,
+ cacheIds,
+ parts,
+ schema,
+ qry,
+ params,
+ flags,
+ pageSize,
+ 0,
+ tx.topologyVersionSnapshot(),
+ mvccSnapshot,
+ new GridQueryCancel()
+ );
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 7ee8069..f6215b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -121,21 +121,6 @@ public interface GridQueryIndexing {
SqlClientContext cliCtx) throws IgniteCheckedException;
/**
- * Queries individual fields (generally used by JDBC drivers).
- *
- * @param schemaName Schema name.
- * @param qry Query.
- * @param keepBinary Keep binary flag.
- * @param filter Cache name and key filter.
- * @param cancel Query cancel.
- * @param qryId Running query id. {@code null} in case query is not registered.
- * @return Cursor.
- */
- public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry,
- boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel,
- @Nullable Long qryId) throws IgniteCheckedException;
-
- /**
* Executes text query.
*
* @param schemaName Schema name.
@@ -236,10 +221,20 @@ public interface GridQueryIndexing {
* @return Cursor over entries which are going to be changed.
* @throws IgniteCheckedException If failed.
*/
- public UpdateSourceIterator<?> prepareDistributedUpdate(GridCacheContext<?, ?> cctx, int[] ids, int[] parts,
- String schema, String qry, Object[] params, int flags,
- int pageSize, int timeout, AffinityTopologyVersion topVer,
- MvccSnapshot mvccSnapshot, GridQueryCancel cancel) throws IgniteCheckedException;
+ public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(
+ GridCacheContext<?, ?> cctx,
+ int[] ids,
+ int[] parts,
+ String schema,
+ String qry,
+ Object[] params,
+ int flags,
+ int pageSize,
+ int timeout,
+ AffinityTopologyVersion topVer,
+ MvccSnapshot mvccSnapshot,
+ GridQueryCancel cancel
+ ) throws IgniteCheckedException;
/**
* Registers type if it was not known before or updates it otherwise.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index e5f323d..e099a1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2046,6 +2046,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * Execute update on DHT node (i.e. when it is possible to execute and update on all nodes independently).
*
* @param cctx Cache context.
* @param cacheIds Involved cache ids.
@@ -2062,13 +2063,36 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @return Cursor over entries which are going to be changed.
* @throws IgniteCheckedException If failed.
*/
- public UpdateSourceIterator<?> prepareDistributedUpdate(GridCacheContext<?, ?> cctx, int[] cacheIds,
- int[] parts, String schema, String qry, Object[] params, int flags, int pageSize, int timeout,
- AffinityTopologyVersion topVer, MvccSnapshot mvccSnapshot,
- GridQueryCancel cancel) throws IgniteCheckedException {
+ public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(
+ GridCacheContext<?, ?> cctx,
+ int[] cacheIds,
+ int[] parts,
+ String schema,
+ String qry,
+ Object[] params,
+ int flags,
+ int pageSize,
+ int timeout,
+ AffinityTopologyVersion topVer,
+ MvccSnapshot mvccSnapshot,
+ GridQueryCancel cancel
+ ) throws IgniteCheckedException {
checkxEnabled();
- return idx.prepareDistributedUpdate(cctx, cacheIds, parts, schema, qry, params, flags, pageSize, timeout, topVer, mvccSnapshot, cancel);
+ return idx.executeUpdateOnDataNodeTransactional(
+ cctx,
+ cacheIds,
+ parts,
+ schema,
+ qry,
+ params,
+ flags,
+ pageSize,
+ timeout,
+ topVer,
+ mvccSnapshot,
+ cancel
+ );
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index c57c8bb..51feab6 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -11226,4 +11226,15 @@ public abstract class IgniteUtils {
return Math.abs((Math.abs(grpId) + partId)) % stripes;
}
+
+ /**
+ * Check if flag set.
+ *
+ * @param flags Flags.
+ * @param flag Flag.
+ * @return {@code True} if set.
+ */
+ public static boolean isFlagSet(int flags, int flag) {
+ return (flags & flag) == flag;
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index 98f296a..5313c09 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -304,13 +304,6 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
}
/** {@inheritDoc} */
- @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry,
- boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel,
- Long qryId) throws IgniteCheckedException {
- return null;
- }
-
- /** {@inheritDoc} */
@Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String spaceName,
String cacheName, String qry, String typeName, IndexingQueryFilter filter) throws IgniteCheckedException {
return null;
@@ -355,7 +348,7 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
}
/** {@inheritDoc} */
- @Override public UpdateSourceIterator<?> prepareDistributedUpdate(GridCacheContext<?, ?> cctx, int[] ids, int[] parts,
+ @Override public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(GridCacheContext<?, ?> cctx, int[] ids, int[] parts,
String schema, String qry, Object[] params, int flags, int pageSize, int timeout,
AffinityTopologyVersion topVer,
MvccSnapshot mvccVer, GridQueryCancel cancel) throws IgniteCheckedException {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 569c8b8..937e383 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -17,1278 +17,26 @@
package org.apache.ignite.internal.processors.query.h2;
-import java.sql.BatchUpdateException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.cache.CacheServerNotFoundException;
-import org.apache.ignite.cache.query.BulkLoadContextCursor;
-import org.apache.ignite.cache.query.FieldsQueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadParser;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter;
-import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
-import org.apache.ignite.internal.processors.cache.CacheOperationContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.mvcc.StaticMvccQueryTracker;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
-import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
-import org.apache.ignite.internal.processors.odbc.SqlStateCode;
-import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
-import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
-import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
-import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender;
-import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
-import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
-import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
-import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
-import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
-import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
-import org.apache.ignite.internal.sql.command.SqlCommand;
-import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.lang.IgniteClosureX;
-import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T3;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.spi.indexing.IndexingQueryFilter;
-import org.apache.ignite.transactions.TransactionDuplicateKeyException;
-import org.h2.command.Prepared;
-import org.h2.command.dml.Delete;
-import org.h2.command.dml.Insert;
-import org.h2.command.dml.Merge;
-import org.h2.command.dml.Update;
-import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccTracker;
-import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.requestSnapshot;
-import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
-import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart;
-import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY;
-import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
-import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
/**
- *
+ * Contains entry processors for DML. Should be modified very carefully to maintain binary compatibility due to
+ * seializable anonymous classes.
*/
+@SuppressWarnings({"Anonymous2MethodRef", "PublicInnerClass", "unused"})
public class DmlStatementsProcessor {
- /** Default number of attempts to re-run DELETE and UPDATE queries in case of concurrent modifications of values. */
- private static final int DFLT_DML_RERUN_ATTEMPTS = 4;
-
- /** Kernal context. */
- private final GridKernalContext ctx;
-
- /** Indexing. */
- private final IgniteH2Indexing idx;
-
- /** Object value context. */
- private final CacheObjectValueContext coCtx;
-
- /** Connection manager. */
- private final ConnectionManager connMgr;
-
- /** Schema manager. */
- private final SchemaManager schemaMgr;
-
- /** Logger. */
- private final IgniteLogger log;
-
- /** Default size for update plan cache. */
- private static final int PLAN_CACHE_SIZE = 1024;
-
- /** Cached value of {@code IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION}. */
- private final boolean isDmlAllowedOverride;
-
- /** Update plans cache. */
- private final ConcurrentMap<H2CachedStatementKey, UpdatePlan> planCache =
- new GridBoundedConcurrentLinkedHashMap<>(PLAN_CACHE_SIZE);
-
- /**
- * Default constructor.
- */
- public DmlStatementsProcessor(GridKernalContext ctx, IgniteH2Indexing idx) {
- this.ctx = ctx;
- this.idx = idx;
-
- coCtx = idx.objectContext();
- connMgr = idx.connections();
- schemaMgr = idx.schemaManager();
-
- log = ctx.log(DmlStatementsProcessor.class);
-
- isDmlAllowedOverride = Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION);
- }
-
- /**
- * Handle cache stop.
- *
- * @param cacheName Cache name.
- */
- public void onCacheStop(String cacheName) {
- Iterator<Map.Entry<H2CachedStatementKey, UpdatePlan>> iter = planCache.entrySet().iterator();
-
- while (iter.hasNext()) {
- UpdatePlan plan = iter.next().getValue();
-
- if (F.eq(cacheName, plan.cacheContext().name()))
- iter.remove();
- }
- }
-
- /**
- * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
- *
- * @param schemaName Schema.
- * @param conn Connection.
- * @param prepared Prepared statement.
- * @param fieldsQry Original query.
- * @param loc Query locality flag.
- * @param filters Cache name and key filter.
- * @param cancel Cancel.
- * @return Update result (modified items count and failed keys).
- * @throws IgniteCheckedException if failed.
- */
- private UpdateResult updateSqlFields(String schemaName, Connection conn, Prepared prepared,
- SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel)
- throws IgniteCheckedException {
- Object[] errKeys = null;
-
- long items = 0;
-
- UpdatePlan plan = getPlanForStatement(schemaName, conn, prepared, fieldsQry, loc, null);
-
- GridCacheContext<?, ?> cctx = plan.cacheContext();
-
- for (int i = 0; i < DFLT_DML_RERUN_ATTEMPTS; i++) {
- CacheOperationContext opCtx = setKeepBinaryContext(cctx);
-
- UpdateResult r;
-
- try {
- r = executeUpdateStatement(schemaName, plan, fieldsQry, loc, filters, cancel);
- }
- finally {
- cctx.operationContextPerCall(opCtx);
- }
-
- items += r.counter();
- errKeys = r.errorKeys();
-
- if (F.isEmpty(errKeys))
- break;
- }
-
- if (F.isEmpty(errKeys)) {
- if (items == 1L)
- return UpdateResult.ONE;
- else if (items == 0L)
- return UpdateResult.ZERO;
- }
-
- return new UpdateResult(items, errKeys);
- }
-
- /**
- * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
- *
- * @param schemaName Schema.
- * @param conn Connection.
- * @param prepared Prepared statement.
- * @param fieldsQry Original query.
- * @param loc Query locality flag.
- * @param filters Cache name and key filter.
- * @param cancel Cancel.
- * @return Update result (modified items count and failed keys).
- * @throws IgniteCheckedException if failed.
- */
- private Collection<UpdateResult> updateSqlFieldsBatched(String schemaName, Connection conn, Prepared prepared,
- SqlFieldsQueryEx fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel)
- throws IgniteCheckedException {
- List<Object[]> argss = fieldsQry.batchedArguments();
-
- UpdatePlan plan = getPlanForStatement(schemaName, conn, prepared, fieldsQry, loc, null);
-
- GridCacheContext<?, ?> cctx = plan.cacheContext();
-
- // For MVCC case, let's enlist batch elements one by one.
- if (plan.hasRows() && plan.mode() == UpdateMode.INSERT && !cctx.mvccEnabled()) {
- CacheOperationContext opCtx = setKeepBinaryContext(cctx);
-
- try {
- List<List<List<?>>> cur = plan.createRows(argss);
-
- return processDmlSelectResultBatched(plan, cur, fieldsQry.getPageSize());
- }
- finally {
- cctx.operationContextPerCall(opCtx);
- }
- }
- else {
- // Fallback to previous mode.
- Collection<UpdateResult> ress = new ArrayList<>(argss.size());
-
- SQLException batchException = null;
-
- int[] cntPerRow = new int[argss.size()];
-
- int cntr = 0;
-
- for (Object[] args : argss) {
- SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)fieldsQry.copy();
-
- qry0.clearBatchedArgs();
- qry0.setArgs(args);
-
- UpdateResult res;
-
- try {
- res = updateSqlFields(schemaName, conn, prepared, qry0, loc, filters, cancel);
-
- cntPerRow[cntr++] = (int)res.counter();
-
- ress.add(res);
- }
- catch (Exception e ) {
- SQLException sqlEx = QueryUtils.toSqlException(e);
-
- batchException = chainException(batchException, sqlEx);
-
- cntPerRow[cntr++] = Statement.EXECUTE_FAILED;
- }
- }
-
- if (batchException != null) {
- BatchUpdateException e = new BatchUpdateException(batchException.getMessage(),
- batchException.getSQLState(), batchException.getErrorCode(), cntPerRow, batchException);
-
- throw new IgniteCheckedException(e);
- }
-
- return ress;
- }
- }
-
- /**
- * Makes current operation context as keepBinary.
- *
- * @param cctx Cache context.
- * @return Old operation context.
- */
- private CacheOperationContext setKeepBinaryContext(GridCacheContext<?, ?> cctx) {
- CacheOperationContext opCtx = cctx.operationContextPerCall();
-
- // Force keepBinary for operation context to avoid binary deserialization inside entry processor
- if (cctx.binaryMarshaller()) {
- CacheOperationContext newOpCtx = null;
-
- if (opCtx == null)
- // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
- newOpCtx = new CacheOperationContext(false, null, true, null, false, null, false, true);
- else if (!opCtx.isKeepBinary())
- newOpCtx = opCtx.keepBinary();
-
- if (newOpCtx != null)
- cctx.operationContextPerCall(newOpCtx);
- }
-
- return opCtx;
- }
-
- /**
- * @param schemaName Schema.
- * @param c Connection.
- * @param p Prepared statement.
- * @param fieldsQry Initial query
- * @param cancel Query cancel.
- * @return Update result wrapped into {@link GridQueryFieldsResult}
- * @throws IgniteCheckedException if failed.
- */
- @SuppressWarnings("unchecked")
- List<QueryCursorImpl<List<?>>> updateSqlFieldsDistributed(String schemaName, Connection c, Prepared p,
- SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
- if (DmlUtils.isBatched(fieldsQry)) {
- Collection<UpdateResult> ress = updateSqlFieldsBatched(schemaName, c, p, (SqlFieldsQueryEx)fieldsQry,
- false, null, cancel);
-
- ArrayList<QueryCursorImpl<List<?>>> resCurs = new ArrayList<>(ress.size());
-
- for (UpdateResult res : ress) {
- checkUpdateResult(res);
-
- QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
- (Collections.singletonList(res.counter())), cancel, false);
-
- resCur.fieldsMeta(UPDATE_RESULT_META);
-
- resCurs.add(resCur);
- }
-
- return resCurs;
- }
- else {
- UpdateResult res = updateSqlFields(schemaName, c, p, fieldsQry, false, null, cancel);
-
- checkUpdateResult(res);
-
- QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
- (Collections.singletonList(res.counter())), cancel, false);
-
- resCur.fieldsMeta(UPDATE_RESULT_META);
-
- return Collections.singletonList(resCur);
- }
- }
-
- /**
- * Execute DML statement on local cache.
- *
- * @param schemaName Schema.
- * @param conn Connection.
- * @param prepared H2 prepared command.
- * @param fieldsQry Fields query.
- * @param filters Cache name and key filter.
- * @param cancel Query cancel.
- * @return Update result wrapped into {@link GridQueryFieldsResult}
- * @throws IgniteCheckedException if failed.
- */
- @SuppressWarnings("unchecked")
- GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, Connection conn, Prepared prepared,
- SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel)
- throws IgniteCheckedException {
- UpdateResult res = updateSqlFields(schemaName, conn, prepared, fieldsQry, true,
- filters, cancel);
-
- return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
- new IgniteSingletonIterator(Collections.singletonList(res.counter())));
- }
-
- /**
- * Perform given statement against given data streamer. Only rows based INSERT is supported.
- *
- * @param qry Query.
- * @param schemaName Schema name.
- * @param streamer Streamer to feed data to.
- * @param stmt Statement.
- * @param args Statement arguments.
- * @return Number of rows in given INSERT statement.
- * @throws IgniteCheckedException if failed.
- */
- @SuppressWarnings({"unchecked"})
- long streamUpdateQuery(String qry, String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt,
- final Object[] args) throws IgniteCheckedException {
- Long qryId = idx.runningQueryManager().register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
-
- boolean fail = false;
-
- try {
- idx.checkStatementStreamable(stmt);
-
- Prepared p = GridSqlQueryParser.prepared(stmt);
-
- assert p != null;
-
- final UpdatePlan plan = getPlanForStatement(schemaName, null, p, null, true, null);
-
- assert plan.isLocalSubquery();
-
- final GridCacheContext cctx = plan.cacheContext();
-
- QueryCursorImpl<List<?>> cur;
-
- final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
-
- QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
- @Override public Iterator<List<?>> iterator() {
- try {
- Iterator<List<?>> it;
-
- if (!F.isEmpty(plan.selectQuery())) {
- GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()),
- plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)),
- null, false, false, 0, null, null);
-
- it = res.iterator();
- }
- else
- it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator();
-
- return new GridQueryCacheObjectsIterator(it, coCtx, cctx.keepBinary());
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }, null);
-
- data.addAll(stepCur.getAll());
-
- cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
- @Override public Iterator<List<?>> iterator() {
- return data.iterator();
- }
- }, null);
-
- if (plan.rowCount() == 1) {
- IgniteBiTuple t = plan.processRow(cur.iterator().next());
-
- streamer.addData(t.getKey(), t.getValue());
-
- return 1;
- }
-
- Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
-
- for (List<?> row : cur) {
- final IgniteBiTuple t = plan.processRow(row);
-
- rows.put(t.getKey(), t.getValue());
- }
-
- streamer.addData(rows);
-
- return rows.size();
- }
- catch (IgniteCheckedException e) {
- fail = true;
-
- throw e;
- }
- finally {
- idx.runningQueryManager().unregister(qryId, fail);
- }
- }
-
- /**
- * Actually perform SQL DML operation locally.
- *
- * @param schemaName Schema name.
- * @param plan Cache context.
- * @param fieldsQry Fields query.
- * @param loc Local query flag.
- * @param filters Cache name and key filter.
- * @param cancel Query cancel state holder.
- * @return Pair [number of successfully processed items; keys that have failed to be processed]
- * @throws IgniteCheckedException if failed.
- */
- @SuppressWarnings({"ConstantConditions"})
- private UpdateResult executeUpdateStatement(String schemaName, final UpdatePlan plan,
- SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters,
- GridQueryCancel cancel) throws IgniteCheckedException {
- GridCacheContext cctx = plan.cacheContext();
-
- if (cctx != null && cctx.mvccEnabled()) {
- assert cctx.transactional();
-
- DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
-
- GridNearTxLocal tx = tx(cctx.kernalContext());
-
- boolean implicit = (tx == null);
-
- boolean commit = implicit && (!(fieldsQry instanceof SqlFieldsQueryEx) ||
- ((SqlFieldsQueryEx)fieldsQry).isAutoCommit());
-
- if (implicit)
- tx = txStart(cctx, fieldsQry.getTimeout());
-
- requestSnapshot(cctx, tx);
-
- try (GridNearTxLocal toCommit = commit ? tx : null) {
- long timeout = implicit
- ? tx.remainingTime()
- : IgniteH2Indexing.operationTimeout(fieldsQry.getTimeout(), tx);
-
- if (cctx.isReplicated() || distributedPlan == null || ((plan.mode() == UpdateMode.INSERT
- || plan.mode() == UpdateMode.MERGE) && !plan.isLocalSubquery())) {
-
- boolean sequential = true;
-
- UpdateSourceIterator<?> it;
-
- if (plan.fastResult()) {
- IgniteBiTuple row = plan.getFastRow(fieldsQry.getArgs());
-
- EnlistOperation op = UpdatePlan.enlistOperation(plan.mode());
-
- it = new DmlUpdateSingleEntryIterator<>(op, op.isDeleteOrLock() ? row.getKey() : row);
- }
- else if (plan.hasRows())
- it = new DmlUpdateResultsIterator(UpdatePlan.enlistOperation(plan.mode()), plan, plan.createRows(fieldsQry.getArgs()));
- else {
- // TODO IGNITE-8865 if there is no ORDER BY statement it's no use to retain entries order on locking (sequential = false).
- SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
- .setArgs(fieldsQry.getArgs())
- .setDistributedJoins(fieldsQry.isDistributedJoins())
- .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
- .setLocal(fieldsQry.isLocal())
- .setPageSize(fieldsQry.getPageSize())
- .setTimeout((int)timeout, TimeUnit.MILLISECONDS)
- .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
-
- FieldsQueryCursor<List<?>> cur = idx.querySqlFields(schemaName, newFieldsQry, null,
- true, true, mvccTracker(cctx, tx), cancel, false).get(0);
-
- it = plan.iteratorForTransaction(connMgr, cur);
- }
-
- IgniteInternalFuture<Long> fut = tx.updateAsync(cctx, it,
- fieldsQry.getPageSize(), timeout, sequential);
-
- UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
-
- if (commit)
- toCommit.commit();
-
- return res;
- }
-
- int[] ids = U.toIntArray(distributedPlan.getCacheIds());
-
- int flags = 0;
-
- if (fieldsQry.isEnforceJoinOrder())
- flags |= GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
-
- if (distributedPlan.isReplicatedOnly())
- flags |= GridH2QueryRequest.FLAG_REPLICATED;
-
- flags = GridH2QueryRequest.setDataPageScanEnabled(flags,
- fieldsQry.isDataPageScanEnabled());
-
- int[] parts = fieldsQry.getPartitions();
-
- IgniteInternalFuture<Long> fut = tx.updateAsync(
- cctx,
- ids,
- parts,
- schemaName,
- fieldsQry.getSql(),
- fieldsQry.getArgs(),
- flags,
- fieldsQry.getPageSize(),
- timeout);
-
- UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
-
- if (commit)
- toCommit.commit();
-
- return res;
- }
- catch (ClusterTopologyServerNotFoundException e) {
- throw new CacheServerNotFoundException(e.getMessage(), e);
- }
- catch (IgniteCheckedException e) {
- checkSqlException(e);
-
- Exception ex = IgniteUtils.convertExceptionNoWrap(e);
-
- if (ex instanceof IgniteException)
- throw (IgniteException)ex;
-
- U.error(log, "Error during update [localNodeId=" + cctx.localNodeId() + "]", ex);
-
- throw new IgniteSQLException("Failed to run update. " + ex.getMessage(), ex);
- }
- finally {
- if (commit)
- cctx.tm().resetContext();
- }
- }
-
- UpdateResult fastUpdateRes = plan.processFast(fieldsQry.getArgs());
-
- if (fastUpdateRes != null)
- return fastUpdateRes;
-
- if (plan.distributedPlan() != null) {
- UpdateResult result = doDistributedUpdate(schemaName, fieldsQry, plan, cancel);
-
- // null is returned in case not all nodes support distributed DML.
- if (result != null)
- return result;
- }
-
- Iterable<List<?>> cur;
-
- // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
- // sub-query and not some dummy stuff like "select 1, 2, 3;"
- if (!loc && !plan.isLocalSubquery()) {
- assert !F.isEmpty(plan.selectQuery());
-
- SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
- .setArgs(fieldsQry.getArgs())
- .setDistributedJoins(fieldsQry.isDistributedJoins())
- .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
- .setLocal(fieldsQry.isLocal())
- .setPageSize(fieldsQry.getPageSize())
- .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS)
- .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
-
- cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, newFieldsQry, null, true, true,
- null, cancel, false).get(0);
- }
- else if (plan.hasRows())
- cur = plan.createRows(fieldsQry.getArgs());
- else {
- final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQuery(),
- F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), false, fieldsQry.getTimeout(),
- cancel, null);
-
- cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
- @Override public Iterator<List<?>> iterator() {
- try {
- return new GridQueryCacheObjectsIterator(res.iterator(), coCtx, true);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }, cancel);
- }
-
- int pageSize = loc ? 0 : fieldsQry.getPageSize();
-
- return processDmlSelectResult(plan, cur, pageSize);
- }
-
- /**
- * @param e Exception.
- */
- private void checkSqlException(IgniteCheckedException e) {
- IgniteSQLException sqlEx = X.cause(e, IgniteSQLException.class);
-
- if(sqlEx != null)
- throw sqlEx;
- }
-
- /**
- * Performs the planned update.
- * @param plan Update plan.
- * @param rows Rows to update.
- * @param pageSize Page size.
- * @return {@link List} of update results.
- * @throws IgniteCheckedException If failed.
- */
- private List<UpdateResult> processDmlSelectResultBatched(UpdatePlan plan, List<List<List<?>>> rows, int pageSize)
- throws IgniteCheckedException {
- switch (plan.mode()) {
- case MERGE:
- // TODO
- throw new IgniteCheckedException("Unsupported, fix");
-
- case INSERT:
- return doInsertBatched(plan, rows, pageSize);
-
- default:
- throw new IgniteSQLException("Unexpected batched DML operation [mode=" + plan.mode() + ']',
- IgniteQueryErrorCode.UNEXPECTED_OPERATION);
- }
- }
-
- /**
- * @param plan Update plan.
- * @param cursor Cursor over select results.
- * @param pageSize Page size.
- * @return Pair [number of successfully processed items; keys that have failed to be processed]
- * @throws IgniteCheckedException if failed.
- */
- private UpdateResult processDmlSelectResult(UpdatePlan plan, Iterable<List<?>> cursor,
- int pageSize) throws IgniteCheckedException {
- switch (plan.mode()) {
- case MERGE:
- return new UpdateResult(doMerge(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
-
- case INSERT:
- return new UpdateResult(doInsert(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
-
- case UPDATE:
- return doUpdate(plan, cursor, pageSize);
-
- case DELETE:
- return doDelete(plan.cacheContext(), cursor, pageSize);
-
- default:
- throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode() + ']',
- IgniteQueryErrorCode.UNEXPECTED_OPERATION);
- }
- }
-
- /**
- * Generate SELECT statements to retrieve data for modifications from and find fast UPDATE or DELETE args,
- * if available.
- *
- * @param schema Schema.
- * @param conn Connection.
- * @param p Prepared statement.
- * @param fieldsQry Original fields query.
- * @param loc Local query flag.
- * @return Update plan.
- */
- UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry,
- boolean loc, @Nullable Integer errKeysPos) throws IgniteCheckedException {
- isDmlOnSchemaSupported(schema);
-
- H2CachedStatementKey planKey = H2CachedStatementKey.forDmlStatement(schema, p.getSQL(), fieldsQry, loc);
-
- UpdatePlan res = (errKeysPos == null ? planCache.get(planKey) : null);
-
- if (res != null)
- return res;
-
- res = UpdatePlanBuilder.planForStatement(p, loc, idx, conn, fieldsQry, errKeysPos, isDmlAllowedOverride);
-
- // Don't cache re-runs
- if (errKeysPos == null)
- return U.firstNotNull(planCache.putIfAbsent(planKey, res), res);
- else
- return res;
- }
-
- /**
- * @param schemaName Schema name.
- * @param fieldsQry Initial query.
- * @param plan Update plan.
- * @param cancel Cancel state.
- * @return Update result.
- * @throws IgniteCheckedException if failed.
- */
- private UpdateResult doDistributedUpdate(String schemaName, SqlFieldsQuery fieldsQry, UpdatePlan plan,
- GridQueryCancel cancel) throws IgniteCheckedException {
- DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
-
- assert distributedPlan != null;
-
- if (cancel == null)
- cancel = new GridQueryCancel();
-
- return idx.runDistributedUpdate(schemaName, fieldsQry, distributedPlan.getCacheIds(),
- distributedPlan.isReplicatedOnly(), cancel);
- }
-
- /**
- * Perform DELETE operation on top of results of SELECT.
- * @param cctx Cache context.
- * @param cursor SELECT results.
- * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
- * @return Results of DELETE (number of items affected AND keys that failed to be updated).
- */
- private UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize)
- throws IgniteCheckedException {
- DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
-
- for (List<?> row : cursor) {
- if (row.size() != 2) {
- U.warn(log, "Invalid row size on DELETE - expected 2, got " + row.size());
-
- continue;
- }
-
- sender.add(row.get(0), new ModifyingEntryProcessor(row.get(1), RMV), 0);
- }
-
- sender.flush();
-
- SQLException resEx = sender.error();
-
- if (resEx != null) {
- if (!F.isEmpty(sender.failedKeys())) {
- // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
- // had been modified concurrently right away.
- String msg = "Failed to DELETE some keys because they had been modified concurrently " +
- "[keys=" + sender.failedKeys() + ']';
-
- SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
-
- conEx.setNextException(resEx);
-
- resEx = conEx;
- }
-
- throw new IgniteSQLException(resEx);
- }
-
- return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
- }
-
- /**
- * Perform UPDATE operation on top of results of SELECT.
- * @param cursor SELECT results.
- * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
- * @return Pair [cursor corresponding to results of UPDATE (contains number of items affected); keys whose values
- * had been modified concurrently (arguments for a re-run)].
- */
- private UpdateResult doUpdate(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize)
- throws IgniteCheckedException {
- GridCacheContext cctx = plan.cacheContext();
-
- DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
-
- for (List<?> row : cursor) {
- T3<Object, Object, Object> row0 = plan.processRowForUpdate(row);
-
- Object key = row0.get1();
- Object oldVal = row0.get2();
- Object newVal = row0.get3();
-
- sender.add(key, new ModifyingEntryProcessor(oldVal, new EntryValueUpdater(newVal)), 0);
- }
-
- sender.flush();
-
- SQLException resEx = sender.error();
-
- if (resEx != null) {
- if (!F.isEmpty(sender.failedKeys())) {
- // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
- // had been modified concurrently right away.
- String msg = "Failed to UPDATE some keys because they had been modified concurrently " +
- "[keys=" + sender.failedKeys() + ']';
-
- SQLException dupEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
-
- dupEx.setNextException(resEx);
-
- resEx = dupEx;
- }
-
- throw new IgniteSQLException(resEx);
- }
-
- return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
- }
-
- /**
- * Execute MERGE statement plan.
- * @param cursor Cursor to take inserted data from.
- * @param pageSize Batch size to stream data from {@code cursor}, anything <= 0 for single page operations.
- * @return Number of items affected.
- * @throws IgniteCheckedException if failed.
- */
- @SuppressWarnings("unchecked")
- private long doMerge(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
- GridCacheContext cctx = plan.cacheContext();
-
- // If we have just one item to put, just do so
- if (plan.rowCount() == 1) {
- IgniteBiTuple t = plan.processRow(cursor.iterator().next());
-
- cctx.cache().put(t.getKey(), t.getValue());
-
- return 1;
- }
- else {
- int resCnt = 0;
-
- Map<Object, Object> rows = new LinkedHashMap<>();
-
- for (Iterator<List<?>> it = cursor.iterator(); it.hasNext();) {
- List<?> row = it.next();
-
- IgniteBiTuple t = plan.processRow(row);
-
- rows.put(t.getKey(), t.getValue());
-
- if ((pageSize > 0 && rows.size() == pageSize) || !it.hasNext()) {
- cctx.cache().putAll(rows);
-
- resCnt += rows.size();
-
- if (it.hasNext())
- rows.clear();
- }
- }
-
- return resCnt;
- }
- }
-
- /**
- * Execute INSERT statement plan.
- * @param cursor Cursor to take inserted data from.
- * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
- * @return Number of items affected.
- * @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
- */
- @SuppressWarnings({"unchecked"})
- private long doInsert(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
- GridCacheContext cctx = plan.cacheContext();
-
- // If we have just one item to put, just do so
- if (plan.rowCount() == 1) {
- IgniteBiTuple t = plan.processRow(cursor.iterator().next());
-
- if (cctx.cache().putIfAbsent(t.getKey(), t.getValue()))
- return 1;
- else
- throw new TransactionDuplicateKeyException("Duplicate key during INSERT [key=" + t.getKey() + ']');
- }
- else {
- // Keys that failed to INSERT due to duplication.
- DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
-
- for (List<?> row : cursor) {
- final IgniteBiTuple keyValPair = plan.processRow(row);
-
- sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue()), 0);
- }
-
- sender.flush();
-
- SQLException resEx = sender.error();
-
- if (!F.isEmpty(sender.failedKeys())) {
- String msg = "Failed to INSERT some keys because they are already in cache " +
- "[keys=" + sender.failedKeys() + ']';
-
- SQLException dupEx = new SQLException(msg, SqlStateCode.CONSTRAINT_VIOLATION);
-
- if (resEx == null)
- resEx = dupEx;
- else
- resEx.setNextException(dupEx);
- }
-
- if (resEx != null)
- throw new IgniteSQLException(resEx);
-
- return sender.updateCount();
- }
- }
-
- /**
- * Execute INSERT statement plan.
- *
- * @param plan Plan to execute.
- * @param cursor Cursor to take inserted data from. I.e. list of batch arguments for each query.
- * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
- * @return Number of items affected.
- * @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
- */
- private List<UpdateResult> doInsertBatched(UpdatePlan plan, List<List<List<?>>> cursor, int pageSize)
- throws IgniteCheckedException {
- GridCacheContext cctx = plan.cacheContext();
-
- DmlBatchSender snd = new DmlBatchSender(cctx, pageSize, cursor.size());
-
- int rowNum = 0;
-
- SQLException resEx = null;
-
- for (List<List<?>> qryRow : cursor) {
- for (List<?> row : qryRow) {
- try {
- final IgniteBiTuple keyValPair = plan.processRow(row);
-
- snd.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue()), rowNum);
- }
- catch (Exception e) {
- String sqlState;
-
- int code;
-
- if (e instanceof IgniteSQLException) {
- sqlState = ((IgniteSQLException)e).sqlState();
-
- code = ((IgniteSQLException)e).statusCode();
- } else {
- sqlState = SqlStateCode.INTERNAL_ERROR;
-
- code = IgniteQueryErrorCode.UNKNOWN;
- }
-
- resEx = chainException(resEx, new SQLException(e.getMessage(), sqlState, code, e));
-
- snd.setFailed(rowNum);
- }
- }
-
- rowNum++;
- }
-
- try {
- snd.flush();
- }
- catch (Exception e) {
- resEx = chainException(resEx, new SQLException(e.getMessage(), SqlStateCode.INTERNAL_ERROR,
- IgniteQueryErrorCode.UNKNOWN, e));
- }
-
- resEx = chainException(resEx, snd.error());
-
- if (!F.isEmpty(snd.failedKeys())) {
- SQLException e = new SQLException("Failed to INSERT some keys because they are already in cache [keys=" +
- snd.failedKeys() + ']', SqlStateCode.CONSTRAINT_VIOLATION, DUPLICATE_KEY);
-
- resEx = chainException(resEx, e);
- }
-
- if (resEx != null) {
- BatchUpdateException e = new BatchUpdateException(resEx.getMessage(), resEx.getSQLState(),
- resEx.getErrorCode(), snd.perRowCounterAsArray(), resEx);
-
- throw new IgniteCheckedException(e);
- }
-
- int[] cntPerRow = snd.perRowCounterAsArray();
-
- List<UpdateResult> res = new ArrayList<>(cntPerRow.length);
-
- for (int i = 0; i < cntPerRow.length; i++ ) {
- int cnt = cntPerRow[i];
-
- res.add(new UpdateResult(cnt , X.EMPTY_OBJECT_ARRAY));
- }
-
- return res;
- }
-
- /**
- * Adds exception to the chain.
- *
- * @param main Exception to add another exception to.
- * @param add Exception which should be added to chain.
- * @return Chained exception.
- */
- private SQLException chainException(SQLException main, SQLException add) {
- if (main == null) {
- if (add != null) {
- main = add;
-
- return main;
- }
- else
- return null;
- }
- else {
- main.setNextException(add);
-
- return main;
- }
- }
-
- /**
- *
- * @param schemaName Schema name.
- * @param stmt Prepared statement.
- * @param fldsQry Query.
- * @param filter Filter.
- * @param cancel Cancel state.
- * @param local Locality flag.
- * @return Update result.
- * @throws IgniteCheckedException if failed.
- */
- UpdateResult mapDistributedUpdate(String schemaName, PreparedStatement stmt, SqlFieldsQuery fldsQry,
- IndexingQueryFilter filter, GridQueryCancel cancel, boolean local) throws IgniteCheckedException {
- Connection c;
-
- try {
- c = stmt.getConnection();
- }
- catch (SQLException e) {
- throw new IgniteCheckedException(e);
- }
-
- return updateSqlFields(schemaName, c, GridSqlQueryParser.prepared(stmt), fldsQry, local, filter, cancel);
- }
-
- /**
- * @param schema Schema name.
- * @param conn Connection.
- * @param stmt Prepared statement.
- * @param qry Sql fields query
- * @param filter Backup filter.
- * @param cancel Query cancel object.
- * @param local {@code true} if should be executed locally.
- * @param topVer Topology version.
- * @param mvccSnapshot MVCC snapshot.
- * @return Iterator upon updated values.
- * @throws IgniteCheckedException If failed.
- */
- public UpdateSourceIterator<?> prepareDistributedUpdate(String schema, Connection conn,
- PreparedStatement stmt, SqlFieldsQuery qry,
- IndexingQueryFilter filter, GridQueryCancel cancel, boolean local,
- AffinityTopologyVersion topVer, MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
-
- Prepared prepared = GridSqlQueryParser.prepared(stmt);
-
- UpdatePlan plan = getPlanForStatement(schema, conn, prepared, qry, local, null);
-
- GridCacheContext cctx = plan.cacheContext();
-
- CacheOperationContext opCtx = cctx.operationContextPerCall();
-
- // Force keepBinary for operation context to avoid binary deserialization inside entry processor
- if (cctx.binaryMarshaller()) {
- CacheOperationContext newOpCtx = null;
-
- if (opCtx == null)
- newOpCtx = new CacheOperationContext().keepBinary();
- else if (!opCtx.isKeepBinary())
- newOpCtx = opCtx.keepBinary();
-
- if (newOpCtx != null)
- cctx.operationContextPerCall(newOpCtx);
- }
-
- QueryCursorImpl<List<?>> cur;
-
- // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
- // sub-query and not some dummy stuff like "select 1, 2, 3;"
- if (!local && !plan.isLocalSubquery()) {
- SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), qry.isCollocated())
- .setArgs(qry.getArgs())
- .setDistributedJoins(qry.isDistributedJoins())
- .setEnforceJoinOrder(qry.isEnforceJoinOrder())
- .setLocal(qry.isLocal())
- .setPageSize(qry.getPageSize())
- .setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS)
- .setDataPageScanEnabled(qry.isDataPageScanEnabled());
-
- cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schema, newFieldsQry, null, true, true,
- new StaticMvccQueryTracker(cctx, mvccSnapshot), cancel, false).get(0);
- }
- else {
- final GridQueryFieldsResult res = idx.queryLocalSqlFields(schema, plan.selectQuery(),
- F.asList(qry.getArgs()), filter, qry.isEnforceJoinOrder(), false, qry.getTimeout(), cancel,
- new StaticMvccQueryTracker(cctx, mvccSnapshot), null);
-
- cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
- @Override public Iterator<List<?>> iterator() {
- try {
- return res.iterator();
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }, cancel);
- }
-
- return plan.iteratorForTransaction(connMgr, cur);
- }
-
- /**
- * Runs a DML statement for which we have internal command executor.
- *
- * @param schemaName Schema name.
- * @param sql The SQL command text to execute.
- * @param cmd The command to execute.
- * @return The cursor returned by the statement.
- * @throws IgniteSQLException If failed.
- */
- public FieldsQueryCursor<List<?>> runNativeDmlStatement(String schemaName, String sql, SqlCommand cmd) {
- Long qryId = idx.runningQueryManager().register(sql, GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
-
- try {
- if (cmd instanceof SqlBulkLoadCommand)
- return processBulkLoadCommand((SqlBulkLoadCommand)cmd, qryId);
- else
- throw new IgniteSQLException("Unsupported DML operation: " + sql,
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-
- }
- catch (IgniteSQLException e) {
- idx.runningQueryManager().unregister(qryId, true);
-
- throw e;
- }
- catch (Exception e) {
- idx.runningQueryManager().unregister(qryId, true);
-
- throw new IgniteSQLException("Unexpected DML operation failure: " + e.getMessage(), e);
- }
- }
-
- /**
- * Process bulk load COPY command.
- *
- * @param cmd The command.
- * @param qryId Query id.
- * @return The context (which is the result of the first request/response).
- * @throws IgniteCheckedException If something failed.
- */
- public FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd,
- Long qryId) throws IgniteCheckedException {
- if (cmd.packetSize() == null)
- cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE);
-
- GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName());
-
- if (tbl == null) {
- throw new IgniteSQLException("Table does not exist: " + cmd.tableName(),
- IgniteQueryErrorCode.TABLE_NOT_FOUND);
- }
-
- H2Utils.checkAndStartNotStartedCache(ctx, tbl);
-
- UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl);
-
- IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new BulkLoadDataConverter(plan);
-
- IgniteDataStreamer<Object, Object> streamer = ctx.grid().dataStreamer(tbl.cacheName());
-
- BulkLoadCacheWriter outputWriter = new BulkLoadStreamerWriter(streamer);
-
- BulkLoadParser inputParser = BulkLoadParser.createParser(cmd.inputFormat());
-
- BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter,
- idx.runningQueryManager(), qryId);
-
- BulkLoadAckClientParameters params = new BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize());
-
- return new BulkLoadContextCursor(processor, params);
- }
-
/** */
- private static final class InsertEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
+ public static final class InsertEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
/** Value to set. */
private final Object val;
/** */
- private InsertEntryProcessor(Object val) {
+ public InsertEntryProcessor(Object val) {
this.val = val;
}
@@ -1306,7 +54,7 @@ public class DmlStatementsProcessor {
/**
* Entry processor invoked by UPDATE and DELETE operations.
*/
- private static final class ModifyingEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
+ public static final class ModifyingEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
/** Value to expect. */
private final Object val;
@@ -1314,7 +62,7 @@ public class DmlStatementsProcessor {
private final IgniteInClosure<MutableEntry<Object, Object>> entryModifier;
/** */
- private ModifyingEntryProcessor(Object val, IgniteInClosure<MutableEntry<Object, Object>> entryModifier) {
+ public ModifyingEntryProcessor(Object val, IgniteInClosure<MutableEntry<Object, Object>> entryModifier) {
assert val != null;
this.val = val;
@@ -1342,174 +90,63 @@ public class DmlStatementsProcessor {
}
}
- /** */
- private static IgniteInClosure<MutableEntry<Object, Object>> RMV = new IgniteInClosure<MutableEntry<Object, Object>>() {
- /** {@inheritDoc} */
- @Override public void apply(MutableEntry<Object, Object> e) {
- e.remove();
+ /** Dummy anonymous class to advance RMV anonymous value to 5. */
+ private static final Runnable DUMMY_1 = new Runnable() {
+ @Override public void run() {
+ // No-op.
}
};
- /**
- *
- */
- private static final class EntryValueUpdater implements IgniteInClosure<MutableEntry<Object, Object>> {
- /** Value to set. */
- private final Object val;
-
- /** */
- private EntryValueUpdater(Object val) {
- assert val != null;
-
- this.val = val;
+ /** Dummy anonymous class to advance RMV anonymous value to 5. */
+ private static final Runnable DUMMY_2 = new Runnable() {
+ @Override public void run() {
+ // No-op.
}
+ };
- /** {@inheritDoc} */
- @Override public void apply(MutableEntry<Object, Object> e) {
- e.setValue(val);
+ /** Dummy anonymous class to advance RMV anonymous value to 5. */
+ private static final Runnable DUMMY_3 = new Runnable() {
+ @Override public void run() {
+ // No-op.
}
- }
-
- /**
- * Check whether statement is DML statement.
- *
- * @param stmt Statement.
- * @return {@code True} if this is DML.
- */
- static boolean isDmlStatement(Prepared stmt) {
- return stmt instanceof Merge || stmt instanceof Insert || stmt instanceof Update || stmt instanceof Delete;
- }
-
- /**
- * Check if schema supports DDL statement.
- *
- * @param schemaName Schema name.
- */
- private static void isDmlOnSchemaSupported(String schemaName) {
- if (F.eq(QueryUtils.SCHEMA_SYS, schemaName))
- throw new IgniteSQLException("DML statements are not supported on " + schemaName + " schema",
- IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
- }
-
- /**
- * Check update result for erroneous keys and throws concurrent update exception if necessary.
- *
- * @param r Update result.
- */
- static void checkUpdateResult(UpdateResult r) {
- if (!F.isEmpty(r.errorKeys())) {
- String msg = "Failed to update some keys because they had been modified concurrently " +
- "[keys=" + Arrays.toString(r.errorKeys()) + ']';
+ };
- SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+ /** Dummy anonymous class to advance RMV anonymous value to 5. */
+ private static final Runnable DUMMY_4 = new Runnable() {
+ @Override public void run() {
+ // No-op.
+ }
+ };
- throw new IgniteSQLException(conEx);
+ /** Remove updater. Must not be moved around to keep at anonymous position 5. */
+ public static final IgniteInClosure<MutableEntry<Object, Object>> RMV =
+ new IgniteInClosure<MutableEntry<Object, Object>>() {
+ @Override public void apply(MutableEntry<Object, Object> e) {
+ e.remove();
}
- }
+ };
/**
- * Converts a row of values to actual key+value using {@link UpdatePlan#processRow(List)}.
+ * Entry value updater.
*/
- private static class BulkLoadDataConverter extends IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> {
- /** Update plan to convert incoming rows. */
- private final UpdatePlan plan;
-
- /**
- * Creates the converter with the given update plan.
- *
- * @param plan The update plan to use.
- */
- private BulkLoadDataConverter(UpdatePlan plan) {
- this.plan = plan;
- }
+ public static final class EntryValueUpdater implements IgniteInClosure<MutableEntry<Object, Object>> {
+ /** Value to set. */
+ private final Object val;
/**
- * Converts the record to a key+value.
+ * Constructor.
*
- * @param record The record to convert.
- * @return The key+value.
- * @throws IgniteCheckedException If conversion failed for some reason.
+ * @param val Value.
*/
- @Override public IgniteBiTuple<?, ?> applyx(List<?> record) throws IgniteCheckedException {
- return plan.processRow(record);
- }
- }
-
- /** */
- private static class DmlUpdateResultsIterator
- implements UpdateSourceIterator<Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private EnlistOperation op;
-
- /** */
- private UpdatePlan plan;
-
- /** */
- private Iterator<List<?>> it;
-
- /** */
- DmlUpdateResultsIterator(EnlistOperation op, UpdatePlan plan, Iterable<List<?>> rows) {
- this.op = op;
- this.plan = plan;
- this.it = rows.iterator();
- }
-
- /** {@inheritDoc} */
- @Override public EnlistOperation operation() {
- return op;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNextX() {
- return it.hasNext();
- }
-
- /** {@inheritDoc} */
- @Override public Object nextX() throws IgniteCheckedException {
- return plan.processRowForTx(it.next());
- }
- }
-
- /** */
- private static class DmlUpdateSingleEntryIterator<T> implements UpdateSourceIterator<T> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private EnlistOperation op;
-
- /** */
- private boolean first = true;
-
- /** */
- private T entry;
-
- /** */
- DmlUpdateSingleEntryIterator(EnlistOperation op, T entry) {
- this.op = op;
- this.entry = entry;
- }
-
- /** {@inheritDoc} */
- @Override public EnlistOperation operation() {
- return op;
- }
+ public EntryValueUpdater(Object val) {
+ assert val != null;
- /** {@inheritDoc} */
- @Override public boolean hasNextX() {
- return first;
+ this.val = val;
}
/** {@inheritDoc} */
- @Override public T nextX() {
- T res = first ? entry : null;
-
- first = false;
-
- return res;
+ @Override public void apply(MutableEntry<Object, Object> e) {
+ e.setValue(val);
}
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
index 7b43f52..300ed6c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
@@ -46,21 +46,6 @@ class H2CachedStatementKey {
}
/**
- * Build key with details relevant to DML plans cache.
- *
- * @param schemaName Schema name.
- * @param sql SQL.
- * @param fieldsQry Query with flags.
- * @param loc DML {@code SELECT} Locality flag.
- * @return Statement key.
- * @see UpdatePlanBuilder
- * @see DmlStatementsProcessor#getPlanForStatement
- */
- static H2CachedStatementKey forDmlStatement(String schemaName, String sql, SqlFieldsQuery fieldsQry, boolean loc) {
- return new H2CachedStatementKey(schemaName, sql, fieldsQry, loc);
- }
-
- /**
* Full-fledged constructor.
*
* @param schemaName Schema name.
@@ -68,7 +53,7 @@ class H2CachedStatementKey {
* @param fieldsQry Query with flags.
* @param loc DML {@code SELECT} Locality flag.
*/
- private H2CachedStatementKey(String schemaName, String sql, SqlFieldsQuery fieldsQry, boolean loc) {
+ public H2CachedStatementKey(String schemaName, String sql, SqlFieldsQuery fieldsQry, boolean loc) {
this.schemaName = schemaName;
this.sql = sql;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 6d346c0..2b60286 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -17,19 +17,23 @@
package org.apache.ignite.internal.processors.query.h2;
+import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.cache.CacheException;
@@ -38,6 +42,8 @@ import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheServerNotFoundException;
+import org.apache.ignite.cache.query.BulkLoadContextCursor;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -46,9 +52,16 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadParser;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
@@ -61,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.TxTopologyVe
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
+import org.apache.ignite.internal.processors.cache.mvcc.StaticMvccQueryTracker;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
@@ -74,6 +88,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.query.CacheQueryObjectValueContext;
+import org.apache.ignite.internal.processors.query.EnlistOperation;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -89,10 +104,21 @@ import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QueryHistoryMetrics;
import org.apache.ignite.internal.processors.query.QueryHistoryMetricsKey;
import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
+import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.RunningQueryManager;
import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
import org.apache.ignite.internal.processors.query.h2.affinity.H2PartitionResolver;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlBulkLoadDataConverter;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateResultsIterator;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateSingleEntryIterator;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
+import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
+import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
+import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
import org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeClientIndex;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
@@ -107,8 +133,6 @@ import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuery;
@@ -119,7 +143,6 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable;
import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
-import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
@@ -139,15 +162,17 @@ import org.apache.ignite.internal.sql.command.SqlDropIndexCommand;
import org.apache.ignite.internal.sql.command.SqlDropUserCommand;
import org.apache.ignite.internal.sql.command.SqlRollbackTransactionCommand;
import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
-import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.lang.IgniteClosureX;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -178,6 +203,7 @@ import org.jetbrains.annotations.Nullable;
import static java.lang.Boolean.FALSE;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.requestSnapshot;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
@@ -217,6 +243,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
private static final int TWO_STEP_QRY_CACHE_SIZE = 1024;
+ /** Default number of attempts to re-run DELETE and UPDATE queries in case of concurrent modifications of values. */
+ private static final int DFLT_UPDATE_RERUN_ATTEMPTS = 4;
+
+ /** Default size for update plan cache. */
+ private static final int UPDATE_PLAN_CACHE_SIZE = 1024;
+
+ /** Cached value of {@code IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION}. */
+ private final boolean updateInTxAllowed =
+ Boolean.getBoolean(IgniteSystemProperties.IGNITE_ALLOW_DML_INSIDE_TRANSACTION);
+
+ /** Update plans cache. */
+ private final ConcurrentMap<H2CachedStatementKey, UpdatePlan> updatePlanCache =
+ new GridBoundedConcurrentLinkedHashMap<>(UPDATE_PLAN_CACHE_SIZE);
+
/** Logger. */
@LoggerResource
private IgniteLogger log;
@@ -249,9 +289,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private final QueryContextRegistry qryCtxRegistry = new QueryContextRegistry();
/** */
- private DmlStatementsProcessor dmlProc;
-
- /** */
private DdlStatementsProcessor ddlProc;
/** Partition reservation manager. */
@@ -467,35 +504,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param filter Cache name and key filter.
* @param enforceJoinOrder Enforce join order of tables in the query.
* @param startTx Start transaction flag.
- * @param timeout Query timeout in milliseconds.
- * @param cancel Query cancel.
- * @param dataPageScanEnabled If data page scan is enabled.
- * @return Query result.
- * @throws IgniteCheckedException If failed.
- */
- public GridQueryFieldsResult queryLocalSqlFields(
- String schemaName,
- String qry,
- @Nullable Collection<Object> params,
- IndexingQueryFilter filter,
- boolean enforceJoinOrder,
- boolean startTx,
- int timeout,
- GridQueryCancel cancel,
- Boolean dataPageScanEnabled
- ) throws IgniteCheckedException {
- return queryLocalSqlFields(schemaName, qry, params, filter, enforceJoinOrder, startTx, timeout, cancel, null, dataPageScanEnabled);
- }
-
- /**
- * Queries individual fields (generally used by JDBC drivers).
- *
- * @param schemaName Schema name.
- * @param qry Query.
- * @param params Query parameters.
- * @param filter Cache name and key filter.
- * @param enforceJoinOrder Enforce join order of tables in the query.
- * @param startTx Start transaction flag.
* @param qryTimeout Query timeout in milliseconds.
* @param cancel Query cancel.
* @param mvccTracker Query tracker.
@@ -503,7 +511,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @return Query result.
* @throws IgniteCheckedException If failed.
*/
- GridQueryFieldsResult queryLocalSqlFields(
+ private GridQueryFieldsResult executeQueryLocal0(
final String schemaName,
String qry,
@Nullable final Collection<Object> params,
@@ -533,7 +541,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
Prepared p = GridSqlQueryParser.prepared(stmt);
- if (DmlStatementsProcessor.isDmlStatement(p)) {
+ if (GridSqlQueryParser.isDml(p)) {
SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry);
if (params != null)
@@ -543,7 +551,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fldsQry.setTimeout(qryTimeout, TimeUnit.MILLISECONDS);
fldsQry.setDataPageScanEnabled(dataPageScanEnabled);
- return dmlProc.updateSqlFieldsLocal(schemaName, conn, p, fldsQry, filter, cancel);
+ UpdateResult updRes = executeUpdate(schemaName, conn, p, fldsQry, true, filter, cancel);
+
+ List<?> updResRow = Collections.singletonList(updRes.counter());
+
+ return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META, new IgniteSingletonIterator<>(updResRow));
}
else if (DdlStatementsProcessor.isDdlStatement(p)) {
throw new IgniteSQLException("DDL statements are supported for the whole cluster only.",
@@ -761,7 +773,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
throw new IgniteSQLException(e);
}
- return dmlProc.streamUpdateQuery(qry, schemaName, streamer, stmt, params);
+ return streamQuery0(qry, schemaName, streamer, stmt, params);
}
/** {@inheritDoc} */
@@ -786,7 +798,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
Prepared p = GridSqlQueryParser.prepared(stmt);
- UpdatePlan plan = dmlProc.getPlanForStatement(schemaName, conn, p, null, true, null);
+ UpdatePlan plan = updatePlan(schemaName, conn, p, null, true);
IgniteDataStreamer<?, ?> streamer = cliCtx.streamerForCache(plan.cacheContext().name());
@@ -795,12 +807,119 @@ public class IgniteH2Indexing implements GridQueryIndexing {
List<Long> res = new ArrayList<>(params.size());
for (int i = 0; i < params.size(); i++)
- res.add(dmlProc.streamUpdateQuery(qry, schemaName, streamer, stmt, params.get(i)));
+ res.add(streamQuery0(qry, schemaName, streamer, stmt, params.get(i)));
return res;
}
/**
+ * Perform given statement against given data streamer. Only rows based INSERT is supported.
+ *
+ * @param qry Query.
+ * @param schemaName Schema name.
+ * @param streamer Streamer to feed data to.
+ * @param stmt Statement.
+ * @param args Statement arguments.
+ * @return Number of rows in given INSERT statement.
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings({"unchecked", "Anonymous2MethodRef"})
+ private long streamQuery0(String qry, String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt,
+ final Object[] args) throws IgniteCheckedException {
+ Long qryId = runningQueryMgr.register(qry, GridCacheQueryType.SQL_FIELDS, schemaName, true, null);
+
+ boolean fail = false;
+
+ try {
+ checkStatementStreamable(stmt);
+
+ Prepared p = GridSqlQueryParser.prepared(stmt);
+
+ assert p != null;
+
+ final UpdatePlan plan = updatePlan(schemaName, null, p, null, true);
+
+ assert plan.isLocalSubquery();
+
+ final GridCacheContext cctx = plan.cacheContext();
+
+ QueryCursorImpl<List<?>> cur;
+
+ final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
+
+ QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ try {
+ Object[] params = args != null ? args : X.EMPTY_OBJECT_ARRAY;
+
+ Iterator<List<?>> it;
+
+ if (!F.isEmpty(plan.selectQuery())) {
+ GridQueryFieldsResult res = executeQueryLocal0(
+ schema(cctx.name()),
+ plan.selectQuery(),
+ F.asList(params),
+ null,
+ false,
+ false,
+ 0,
+ null,
+ null,
+ null
+ );
+
+ it = res.iterator();
+ }
+ else
+ it = plan.createRows(params).iterator();
+
+ return new GridQueryCacheObjectsIterator(it, objectContext(), cctx.keepBinary());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }, null);
+
+ data.addAll(stepCur.getAll());
+
+ cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ return data.iterator();
+ }
+ }, null);
+
+ if (plan.rowCount() == 1) {
+ IgniteBiTuple t = plan.processRow(cur.iterator().next());
+
+ streamer.addData(t.getKey(), t.getValue());
+
+ return 1;
+ }
+
+ Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
+
+ for (List<?> row : cur) {
+ final IgniteBiTuple t = plan.processRow(row);
+
+ rows.put(t.getKey(), t.getValue());
+ }
+
+ streamer.addData(rows);
+
+ return rows.size();
+ }
+ catch (IgniteCheckedException e) {
+ fail = true;
+
+ throw e;
+ }
+ finally {
+ runningQueryMgr.unregister(qryId, fail);
+ }
+ }
+
+ /**
* @param size Result size.
* @return List of given size filled with 0Ls.
*/
@@ -1000,17 +1119,39 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- /** {@inheritDoc} */
- @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry,
- final boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel,
- Long qryId) throws IgniteCheckedException {
- String sql = qry.getSql();
- List<Object> params = F.asList(qry.getArgs());
- boolean enforceJoinOrder = qry.isEnforceJoinOrder(), startTx = autoStartTx(qry);
- int timeout = qry.getTimeout();
+ /**
+ * Queries individual fields (generally used by JDBC drivers).
+ *
+ * @param schemaName Schema name.
+ * @param qry Query.
+ * @param keepBinary Keep binary flag.
+ * @param filter Cache name and key filter.
+ * @param cancel Query cancel.
+ * @param qryId Running query id. {@code null} in case query is not registered.
+ * @return Cursor.
+ */
+ private FieldsQueryCursor<List<?>> executeQueryLocal(
+ String schemaName,
+ SqlFieldsQuery qry,
+ final boolean keepBinary,
+ IndexingQueryFilter filter,
+ GridQueryCancel cancel,
+ Long qryId
+ ) throws IgniteCheckedException {
+ boolean startTx = autoStartTx(qry);
- final GridQueryFieldsResult res = queryLocalSqlFields(schemaName, sql, params, filter,
- enforceJoinOrder, startTx, timeout, cancel, qry.isDataPageScanEnabled());
+ final GridQueryFieldsResult res = executeQueryLocal0(
+ schemaName,
+ qry.getSql(),
+ F.asList(qry.getArgs()),
+ filter,
+ qry.isEnforceJoinOrder(),
+ startTx,
+ qry.getTimeout(),
+ cancel,
+ null,
+ qry.isDataPageScanEnabled()
+ );
Iterable<List<?>> iter = () -> {
try {
@@ -1323,38 +1464,27 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private List<FieldsQueryCursor<List<?>>> queryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry,
SqlCommand cmd, @Nullable SqlClientContext cliCtx) {
boolean fail = false;
+ boolean unregister = true;
- // Execute.
- if (cmd instanceof SqlBulkLoadCommand)
- return Collections.singletonList(dmlProc.runNativeDmlStatement(schemaName, qry.getSql(), cmd));
-
- //Always registry new running query for native commands except COPY. Currently such operations don't support cancellation.
Long qryId = registerRunningQuery(schemaName, null, qry.getSql(), qry.isLocal(), true);
try {
- if (cmd instanceof SqlCreateIndexCommand
- || cmd instanceof SqlDropIndexCommand
- || cmd instanceof SqlAlterTableCommand
- || cmd instanceof SqlCreateUserCommand
- || cmd instanceof SqlAlterUserCommand
- || cmd instanceof SqlDropUserCommand)
- return Collections.singletonList(ddlProc.runDdlStatement(qry.getSql(), cmd));
- else if (cmd instanceof SqlSetStreamingCommand) {
- if (cliCtx == null)
- throw new IgniteSQLException("SET STREAMING command can only be executed from JDBC or ODBC driver.");
-
- SqlSetStreamingCommand setCmd = (SqlSetStreamingCommand)cmd;
-
- if (setCmd.isTurnOn())
- cliCtx.enableStreaming(setCmd.allowOverwrite(), setCmd.flushFrequency(),
- setCmd.perNodeBufferSize(), setCmd.perNodeParallelOperations(), setCmd.isOrdered());
- else
- cliCtx.disableStreaming();
+ FieldsQueryCursor<List<?>> cur;
+
+ if (DdlStatementsProcessor.isDdlCommand(cmd))
+ cur = ddlProc.runDdlStatement(qry.getSql(), cmd);
+ else if (cmd instanceof SqlBulkLoadCommand) {
+ // Query will be unregistered when cursor is closed.
+ unregister = false;
+
+ cur = processBulkLoadCommand((SqlBulkLoadCommand) cmd, qryId);
}
+ else if (cmd instanceof SqlSetStreamingCommand)
+ cur = processSetStreamingCommand((SqlSetStreamingCommand)cmd, cliCtx);
else
- processTxCommand(cmd, qry);
+ cur = processTxCommand(cmd, qry);
- return Collections.singletonList(H2Utils.zeroCursor());
+ return Collections.singletonList(cur);
}
catch (IgniteCheckedException e) {
fail = true;
@@ -1363,17 +1493,44 @@ public class IgniteH2Indexing implements GridQueryIndexing {
", err=" + e.getMessage() + ']', e);
}
finally {
- runningQueryMgr.unregister(qryId, fail);
+ if (unregister || fail)
+ runningQueryMgr.unregister(qryId, fail);
}
}
/**
+ * Process SET STREAMING command.
+ *
+ * @param cmd Command.
+ * @param cliCtx Client context.
+ * @return Cursor.
+ */
+ private FieldsQueryCursor<List<?>> processSetStreamingCommand(SqlSetStreamingCommand cmd,
+ @Nullable SqlClientContext cliCtx) {
+ if (cliCtx == null)
+ throw new IgniteSQLException("SET STREAMING command can only be executed from JDBC or ODBC driver.");
+
+ if (cmd.isTurnOn())
+ cliCtx.enableStreaming(
+ cmd.allowOverwrite(),
+ cmd.flushFrequency(),
+ cmd.perNodeBufferSize(),
+ cmd.perNodeParallelOperations(),
+ cmd.isOrdered()
+ );
+ else
+ cliCtx.disableStreaming();
+
+ return H2Utils.zeroCursor();
+ }
+
+ /**
* Check expected statement type (when it is set by JDBC) and given statement type.
*
* @param qry Query.
* @param isQry {@code true} for select queries, otherwise (DML/DDL queries) {@code false}.
*/
- private void checkQueryType(SqlFieldsQuery qry, boolean isQry) {
+ private static void checkQueryType(SqlFieldsQuery qry, boolean isQry) {
Boolean qryFlag = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx) qry).isQuery() : null;
if (qryFlag != null && qryFlag != isQry)
@@ -1387,7 +1544,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param qry Query.
* @throws IgniteCheckedException if failed.
*/
- private void processTxCommand(SqlCommand cmd, SqlFieldsQuery qry) throws IgniteCheckedException {
+ private FieldsQueryCursor<List<?>> processTxCommand(SqlCommand cmd, SqlFieldsQuery qry)
+ throws IgniteCheckedException {
NestedTxMode nestedTxMode = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx)qry).getNestedTxMode() :
NestedTxMode.DEFAULT;
@@ -1439,6 +1597,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (tx != null)
doRollback(tx);
}
+
+ return H2Utils.zeroCursor();
}
/**
@@ -1448,9 +1608,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
private void doCommit(@NotNull GridNearTxLocal tx) throws IgniteCheckedException {
try {
- // TODO: Why checking for rollback only?
- //if (!tx.isRollbackOnly())
- tx.commit();
+ tx.commit();
}
finally {
closeTx(tx);
@@ -1485,6 +1643,46 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
+ /**
+ * Process bulk load COPY command.
+ *
+ * @param cmd The command.
+ * @param qryId Query id.
+ * @return The context (which is the result of the first request/response).
+ * @throws IgniteCheckedException If something failed.
+ */
+ private FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd, Long qryId)
+ throws IgniteCheckedException {
+ if (cmd.packetSize() == null)
+ cmd.packetSize(BulkLoadAckClientParameters.DFLT_PACKET_SIZE);
+
+ GridH2Table tbl = schemaMgr.dataTable(cmd.schemaName(), cmd.tableName());
+
+ if (tbl == null) {
+ throw new IgniteSQLException("Table does not exist: " + cmd.tableName(),
+ IgniteQueryErrorCode.TABLE_NOT_FOUND);
+ }
+
+ H2Utils.checkAndStartNotStartedCache(ctx, tbl);
+
+ UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl);
+
+ IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new DmlBulkLoadDataConverter(plan);
+
+ IgniteDataStreamer<Object, Object> streamer = ctx.grid().dataStreamer(tbl.cacheName());
+
+ BulkLoadCacheWriter outputWriter = new BulkLoadStreamerWriter(streamer);
+
+ BulkLoadParser inputParser = BulkLoadParser.createParser(cmd.inputFormat());
+
+ BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter,
+ runningQueryMgr, qryId);
+
+ BulkLoadAckClientParameters params = new BulkLoadAckClientParameters(cmd.localFileName(), cmd.packetSize());
+
+ return new BulkLoadContextCursor(processor, params);
+ }
+
/** {@inheritDoc} */
@SuppressWarnings({"StringEquality", "unchecked"})
@Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
@@ -1625,14 +1823,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param meta Metadata for {@code twoStepQry}.
* @param keepBinary Whether binary objects must not be deserialized automatically.
* @param startTx Start transaction flag.
- * @param tracker MVCC tracker.
+ * @param mvccTracker MVCC tracker.
* @param cancel Query cancel state holder.
* @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
* @return Query result.
*/
private List<? extends FieldsQueryCursor<List<?>>> doRunPrepared(String schemaName, Prepared prepared,
SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, boolean keepBinary,
- boolean startTx, MvccQueryTracker tracker, GridQueryCancel cancel, boolean registerAsNewQry) {
+ boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel, boolean registerAsNewQry) {
String sqlQry = qry.getSql();
boolean loc = qry.isLocal();
@@ -1645,26 +1843,19 @@ public class IgniteH2Indexing implements GridQueryIndexing {
boolean fail = false;
try {
- if (DmlStatementsProcessor.isDmlStatement(prepared)) {
+ if (GridSqlQueryParser.isDml(prepared)) {
try {
Connection conn = connMgr.connectionForThread().connection(schemaName);
if (!loc)
- return dmlProc.updateSqlFieldsDistributed(schemaName, conn, prepared, qry, cancel);
+ return executeUpdateDistributed(schemaName, conn, prepared, qry, cancel);
else {
- final GridQueryFieldsResult updRes =
- dmlProc.updateSqlFieldsLocal(schemaName, conn, prepared, qry, filter, cancel);
+ UpdateResult updRes = executeUpdate(schemaName, conn, prepared, qry, true, filter, cancel);
return Collections.singletonList(new QueryCursorImpl<>(new Iterable<List<?>>() {
@SuppressWarnings("NullableProblems")
@Override public Iterator<List<?>> iterator() {
- try {
- return new GridQueryCacheObjectsIterator(updRes.iterator(), objectContext(),
- true);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ return new IgniteSingletonIterator<>(Collections.singletonList(updRes.counter()));
}
}, cancel));
}
@@ -1710,16 +1901,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (ctx.security().enabled())
checkSecurity(twoStepQry.cacheIds());
- return Collections.singletonList(doRunDistributedQuery(schemaName, qry, twoStepQry, meta, keepBinary,
- startTx, tracker, cancel, registerAsNewQry));
-
+ return Collections.singletonList(executeQuery(schemaName, qry, twoStepQry, meta, keepBinary,
+ startTx, mvccTracker, cancel, registerAsNewQry));
}
// We've encountered a local query, let's just run it.
Long qryId = registerRunningQuery(schemaName, cancel, sqlQry, loc, registerAsNewQry);
try {
- return Collections.singletonList(queryLocalSqlFields(schemaName, qry, keepBinary, filter, cancel, qryId));
+ return Collections.singletonList(executeQueryLocal(schemaName, qry, keepBinary, filter, cancel, qryId));
}
catch (IgniteCheckedException e) {
runningQueryMgr.unregister(qryId, true);
@@ -1963,17 +2153,26 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/** {@inheritDoc} */
- @Override public UpdateSourceIterator<?> prepareDistributedUpdate(GridCacheContext<?, ?> cctx, int[] ids,
- int[] parts, String schema, String qry, Object[] params, int flags,
- int pageSize, int timeout, AffinityTopologyVersion topVer,
- MvccSnapshot mvccSnapshot, GridQueryCancel cancel) throws IgniteCheckedException {
-
+ @Override public UpdateSourceIterator<?> executeUpdateOnDataNodeTransactional(
+ GridCacheContext<?, ?> cctx,
+ int[] ids,
+ int[] parts,
+ String schema,
+ String qry,
+ Object[] params,
+ int flags,
+ int pageSize,
+ int timeout,
+ AffinityTopologyVersion topVer,
+ MvccSnapshot mvccSnapshot,
+ GridQueryCancel cancel
+ ) throws IgniteCheckedException {
SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry);
if (params != null)
fldsQry.setArgs(params);
- fldsQry.setEnforceJoinOrder(isFlagSet(flags, GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER));
+ fldsQry.setEnforceJoinOrder(U.isFlagSet(flags, GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER));
fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
fldsQry.setPageSize(pageSize);
fldsQry.setLocal(true);
@@ -1981,7 +2180,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
boolean loc = true;
- final boolean replicated = isFlagSet(flags, GridH2QueryRequest.FLAG_REPLICATED);
+ final boolean replicated = U.isFlagSet(flags, GridH2QueryRequest.FLAG_REPLICATED);
GridCacheContext<?, ?> cctx0;
@@ -2001,19 +2200,52 @@ public class IgniteH2Indexing implements GridQueryIndexing {
PreparedStatement stmt = preparedStatementWithParams(conn, fldsQry.getSql(),
F.asList(fldsQry.getArgs()), true);
- return dmlProc.prepareDistributedUpdate(schema, conn, stmt, fldsQry, backupFilter(topVer, parts), cancel, loc,
- topVer, mvccSnapshot);
- }
+ IndexingQueryFilter filter = backupFilter(topVer, parts);
- /**
- * Check if flag set.
- *
- * @param flags Flags.
- * @param flag Flag.
- * @return {@code True} if set.
- */
- private boolean isFlagSet(int flags, int flag) {
- return (flags & flag) == flag;
+ Prepared prepared = GridSqlQueryParser.prepared(stmt);
+
+ UpdatePlan plan = updatePlan(schema, conn, prepared, fldsQry, loc);
+
+ GridCacheContext planCctx = plan.cacheContext();
+
+ // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+ DmlUtils.setKeepBinaryContext(planCctx);
+
+ QueryCursorImpl<List<?>> cur;
+
+ // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
+ // sub-query and not some dummy stuff like "select 1, 2, 3;"
+ if (!loc && !plan.isLocalSubquery()) {
+ SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fldsQry.isCollocated())
+ .setArgs(fldsQry.getArgs())
+ .setDistributedJoins(fldsQry.isDistributedJoins())
+ .setEnforceJoinOrder(fldsQry.isEnforceJoinOrder())
+ .setLocal(fldsQry.isLocal())
+ .setPageSize(fldsQry.getPageSize())
+ .setTimeout(fldsQry.getTimeout(), TimeUnit.MILLISECONDS)
+ .setDataPageScanEnabled(fldsQry.isDataPageScanEnabled());
+
+ cur = (QueryCursorImpl<List<?>>)querySqlFields(schema, newFieldsQry, null, true, true,
+ new StaticMvccQueryTracker(planCctx, mvccSnapshot), cancel, false).get(0);
+ }
+ else {
+ final GridQueryFieldsResult res = executeQueryLocal0(schema, plan.selectQuery(),
+ F.asList(fldsQry.getArgs()), filter, fldsQry.isEnforceJoinOrder(), false, fldsQry.getTimeout(), cancel,
+ new StaticMvccQueryTracker(planCctx, mvccSnapshot), null);
+
+ cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ try {
+ return res.iterator();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }, cancel);
+ }
+
+ return plan.iteratorForTransaction(connMgr, cur);
}
/**
@@ -2029,7 +2261,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
* @return Cursor representing distributed query result.
*/
- private FieldsQueryCursor<List<?>> doRunDistributedQuery(String schemaName, SqlFieldsQuery qry,
+ private FieldsQueryCursor<List<?>> executeQuery(String schemaName, SqlFieldsQuery qry,
GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, boolean keepBinary,
boolean startTx, MvccQueryTracker mvccTracker, GridQueryCancel cancel, boolean registerAsNewQry) {
if (log.isDebugEnabled())
@@ -2159,26 +2391,40 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * Run DML request from other node.
+ * Executes DML request on map node. Happens only for "skip reducer" mode.
*
* @param schemaName Schema name.
- * @param fldsQry Query.
+ * @param qry Query.
* @param filter Filter.
* @param cancel Cancel state.
- * @param local Locality flag.
+ * @param loc Locality flag.
* @return Update result.
* @throws IgniteCheckedException if failed.
*/
- public UpdateResult mapDistributedUpdate(String schemaName, SqlFieldsQuery fldsQry, IndexingQueryFilter filter,
- GridQueryCancel cancel, boolean local) throws IgniteCheckedException {
+ public UpdateResult executeUpdateOnDataNode(
+ String schemaName,
+ SqlFieldsQuery qry,
+ IndexingQueryFilter filter,
+ GridQueryCancel cancel,
+ boolean loc
+ ) throws IgniteCheckedException {
Connection conn = connMgr.connectionForThread().connection(schemaName);
- H2Utils.setupConnection(conn, false, fldsQry.isEnforceJoinOrder());
+ H2Utils.setupConnection(conn, false, qry.isEnforceJoinOrder());
- PreparedStatement stmt = preparedStatementWithParams(conn, fldsQry.getSql(),
- Arrays.asList(fldsQry.getArgs()), true);
+ PreparedStatement stmt = preparedStatementWithParams(conn, qry.getSql(),
+ Arrays.asList(qry.getArgs()), true);
+
+ Connection c;
+
+ try {
+ c = stmt.getConnection();
+ }
+ catch (SQLException e) {
+ throw new IgniteCheckedException(e);
+ }
- return dmlProc.mapDistributedUpdate(schemaName, stmt, fldsQry, filter, cancel, local);
+ return executeUpdate(schemaName, c, GridSqlQueryParser.prepared(stmt), qry, loc, filter, cancel);
}
/**
@@ -2438,7 +2684,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
mapQryExec.start(ctx, this);
rdcQryExec.start(ctx, this);
- dmlProc = new DmlStatementsProcessor(ctx, this);
ddlProc = new DdlStatementsProcessor(ctx, schemaMgr);
partExtractor = new PartitionExtractor(new H2PartitionResolver(this));
@@ -2630,7 +2875,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
String cacheName = cacheInfo.name();
partReservationMgr.onCacheStop(cacheName);
- dmlProc.onCacheStop(cacheName);
+
+ // Remove cached DML plans.
+ Iterator<Map.Entry<H2CachedStatementKey, UpdatePlan>> iter = updatePlanCache.entrySet().iterator();
+
+ while (iter.hasNext()) {
+ UpdatePlan plan = iter.next().getValue();
+
+ if (F.eq(cacheName, plan.cacheContext().name()))
+ iter.remove();
+ }
// Drop schema (needs to be called after callback to DML processor because the latter depends on schema).
schemaMgr.onCacheDestroyed(cacheName, rmvIdx);
@@ -2824,4 +3078,416 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return cacheIds;
}
}
+
+ /**
+ * @param schemaName Schema.
+ * @param conn Connection.
+ * @param prepared Prepared statement.
+ * @param fieldsQry Initial query
+ * @param cancel Query cancel.
+ * @return Update result wrapped into {@link GridQueryFieldsResult}
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings("unchecked")
+ private List<QueryCursorImpl<List<?>>> executeUpdateDistributed(
+ String schemaName,
+ Connection conn,
+ Prepared prepared,
+ SqlFieldsQuery fieldsQry,
+ GridQueryCancel cancel
+ ) throws IgniteCheckedException {
+ if (DmlUtils.isBatched(fieldsQry)) {
+ SqlFieldsQueryEx fieldsQry0 = (SqlFieldsQueryEx)fieldsQry;
+
+ Collection<UpdateResult> ress;
+
+ List<Object[]> argss = fieldsQry0.batchedArguments();
+
+ UpdatePlan plan = updatePlan(schemaName, conn, prepared, fieldsQry0, false);
+
+ GridCacheContext<?, ?> cctx = plan.cacheContext();
+
+ // For MVCC case, let's enlist batch elements one by one.
+ if (plan.hasRows() && plan.mode() == UpdateMode.INSERT && !cctx.mvccEnabled()) {
+ CacheOperationContext opCtx = DmlUtils.setKeepBinaryContext(cctx);
+
+ try {
+ List<List<List<?>>> cur = plan.createRows(argss);
+
+ ress = DmlUtils.processSelectResultBatched(plan, cur, fieldsQry0.getPageSize());
+ }
+ finally {
+ DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
+ }
+ }
+ else {
+ // Fallback to previous mode.
+ ress = new ArrayList<>(argss.size());
+
+ SQLException batchException = null;
+
+ int[] cntPerRow = new int[argss.size()];
+
+ int cntr = 0;
+
+ for (Object[] args : argss) {
+ SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)fieldsQry0.copy();
+
+ qry0.clearBatchedArgs();
+ qry0.setArgs(args);
+
+ UpdateResult res;
+
+ try {
+ res = executeUpdate(schemaName, conn, prepared, qry0, false, null, cancel);
+
+ cntPerRow[cntr++] = (int)res.counter();
+
+ ress.add(res);
+ }
+ catch (Exception e ) {
+ SQLException sqlEx = QueryUtils.toSqlException(e);
+
+ batchException = DmlUtils.chainException(batchException, sqlEx);
+
+ cntPerRow[cntr++] = Statement.EXECUTE_FAILED;
+ }
+ }
+
+ if (batchException != null) {
+ BatchUpdateException e = new BatchUpdateException(batchException.getMessage(),
+ batchException.getSQLState(), batchException.getErrorCode(), cntPerRow, batchException);
+
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ ArrayList<QueryCursorImpl<List<?>>> resCurs = new ArrayList<>(ress.size());
+
+ for (UpdateResult res : ress) {
+ res.throwIfError();
+
+ QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
+ (Collections.singletonList(res.counter())), cancel, false);
+
+ resCur.fieldsMeta(UPDATE_RESULT_META);
+
+ resCurs.add(resCur);
+ }
+
+ return resCurs;
+ }
+ else {
+ UpdateResult res = executeUpdate(schemaName, conn, prepared, fieldsQry, false, null, cancel);
+
+ res.throwIfError();
+
+ QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
+ (Collections.singletonList(res.counter())), cancel, false);
+
+ resCur.fieldsMeta(UPDATE_RESULT_META);
+
+ return Collections.singletonList(resCur);
+ }
+ }
+
+ /**
+ * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
+ *
+ * @param schemaName Schema.
+ * @param conn Connection.
+ * @param prepared Prepared statement.
+ * @param fieldsQry Original query.
+ * @param loc Query locality flag.
+ * @param filters Cache name and key filter.
+ * @param cancel Cancel.
+ * @return Update result (modified items count and failed keys).
+ * @throws IgniteCheckedException if failed.
+ */
+ private UpdateResult executeUpdate(String schemaName, Connection conn, Prepared prepared,
+ SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel)
+ throws IgniteCheckedException {
+ Object[] errKeys = null;
+
+ long items = 0;
+
+ UpdatePlan plan = updatePlan(schemaName, conn, prepared, fieldsQry, loc);
+
+ GridCacheContext<?, ?> cctx = plan.cacheContext();
+
+ for (int i = 0; i < DFLT_UPDATE_RERUN_ATTEMPTS; i++) {
+ CacheOperationContext opCtx = DmlUtils.setKeepBinaryContext(cctx);
+
+ UpdateResult r;
+
+ try {
+ r = executeUpdate0(schemaName, plan, fieldsQry, loc, filters, cancel);
+ }
+ finally {
+ DmlUtils.restoreKeepBinaryContext(cctx, opCtx);
+ }
+
+ items += r.counter();
+ errKeys = r.errorKeys();
+
+ if (F.isEmpty(errKeys))
+ break;
+ }
+
+ if (F.isEmpty(errKeys)) {
+ if (items == 1L)
+ return UpdateResult.ONE;
+ else if (items == 0L)
+ return UpdateResult.ZERO;
+ }
+
+ return new UpdateResult(items, errKeys);
+ }
+
+ /**
+ * Actually perform SQL DML operation locally.
+ *
+ * @param schemaName Schema name.
+ * @param plan Cache context.
+ * @param fieldsQry Fields query.
+ * @param loc Local query flag.
+ * @param filters Cache name and key filter.
+ * @param cancel Query cancel state holder.
+ * @return Pair [number of successfully processed items; keys that have failed to be processed]
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings({"ConstantConditions"})
+ private UpdateResult executeUpdate0(
+ String schemaName,
+ final UpdatePlan plan,
+ SqlFieldsQuery fieldsQry,
+ boolean loc,
+ IndexingQueryFilter filters,
+ GridQueryCancel cancel
+ ) throws IgniteCheckedException {
+ GridCacheContext cctx = plan.cacheContext();
+
+ if (cctx != null && cctx.mvccEnabled()) {
+ assert cctx.transactional();
+
+ DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
+
+ GridNearTxLocal tx = tx(cctx.kernalContext());
+
+ boolean implicit = (tx == null);
+
+ boolean commit = implicit && (!(fieldsQry instanceof SqlFieldsQueryEx) ||
+ ((SqlFieldsQueryEx)fieldsQry).isAutoCommit());
+
+ if (implicit)
+ tx = txStart(cctx, fieldsQry.getTimeout());
+
+ requestSnapshot(cctx, tx);
+
+ try (GridNearTxLocal toCommit = commit ? tx : null) {
+ long timeout = implicit
+ ? tx.remainingTime()
+ : operationTimeout(fieldsQry.getTimeout(), tx);
+
+ if (cctx.isReplicated() || distributedPlan == null || ((plan.mode() == UpdateMode.INSERT
+ || plan.mode() == UpdateMode.MERGE) && !plan.isLocalSubquery())) {
+
+ boolean sequential = true;
+
+ UpdateSourceIterator<?> it;
+
+ if (plan.fastResult()) {
+ IgniteBiTuple row = plan.getFastRow(fieldsQry.getArgs());
+
+ EnlistOperation op = UpdatePlan.enlistOperation(plan.mode());
+
+ it = new DmlUpdateSingleEntryIterator<>(op, op.isDeleteOrLock() ? row.getKey() : row);
+ }
+ else if (plan.hasRows())
+ it = new DmlUpdateResultsIterator(UpdatePlan.enlistOperation(plan.mode()), plan, plan.createRows(fieldsQry.getArgs()));
+ else {
+ // TODO IGNITE-8865 if there is no ORDER BY statement it's no use to retain entries order on locking (sequential = false).
+ SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
+ .setArgs(fieldsQry.getArgs())
+ .setDistributedJoins(fieldsQry.isDistributedJoins())
+ .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
+ .setLocal(fieldsQry.isLocal())
+ .setPageSize(fieldsQry.getPageSize())
+ .setTimeout((int)timeout, TimeUnit.MILLISECONDS)
+ .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
+
+ FieldsQueryCursor<List<?>> cur = querySqlFields(schemaName, newFieldsQry, null,
+ true, true, MvccUtils.mvccTracker(cctx, tx), cancel, false).get(0);
+
+ it = plan.iteratorForTransaction(connMgr, cur);
+ }
+
+ IgniteInternalFuture<Long> fut = tx.updateAsync(cctx, it,
+ fieldsQry.getPageSize(), timeout, sequential);
+
+ UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
+
+ if (commit)
+ toCommit.commit();
+
+ return res;
+ }
+
+ int[] ids = U.toIntArray(distributedPlan.getCacheIds());
+
+ int flags = 0;
+
+ if (fieldsQry.isEnforceJoinOrder())
+ flags |= GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
+
+ if (distributedPlan.isReplicatedOnly())
+ flags |= GridH2QueryRequest.FLAG_REPLICATED;
+
+ flags = GridH2QueryRequest.setDataPageScanEnabled(flags,
+ fieldsQry.isDataPageScanEnabled());
+
+ int[] parts = fieldsQry.getPartitions();
+
+ IgniteInternalFuture<Long> fut = tx.updateAsync(
+ cctx,
+ ids,
+ parts,
+ schemaName,
+ fieldsQry.getSql(),
+ fieldsQry.getArgs(),
+ flags,
+ fieldsQry.getPageSize(),
+ timeout);
+
+ UpdateResult res = new UpdateResult(fut.get(), X.EMPTY_OBJECT_ARRAY);
+
+ if (commit)
+ toCommit.commit();
+
+ return res;
+ }
+ catch (ClusterTopologyServerNotFoundException e) {
+ throw new CacheServerNotFoundException(e.getMessage(), e);
+ }
+ catch (IgniteCheckedException e) {
+ IgniteSQLException sqlEx = X.cause(e, IgniteSQLException.class);
+
+ if(sqlEx != null)
+ throw sqlEx;
+
+ Exception ex = IgniteUtils.convertExceptionNoWrap(e);
+
+ if (ex instanceof IgniteException)
+ throw (IgniteException)ex;
+
+ U.error(log, "Error during update [localNodeId=" + cctx.localNodeId() + "]", ex);
+
+ throw new IgniteSQLException("Failed to run update. " + ex.getMessage(), ex);
+ }
+ finally {
+ if (commit)
+ cctx.tm().resetContext();
+ }
+ }
+
+ UpdateResult fastUpdateRes = plan.processFast(fieldsQry.getArgs());
+
+ if (fastUpdateRes != null)
+ return fastUpdateRes;
+
+ if (plan.distributedPlan() != null) {
+ DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
+
+ assert distributedPlan != null;
+
+ if (cancel == null)
+ cancel = new GridQueryCancel();
+
+ UpdateResult result = runDistributedUpdate(schemaName, fieldsQry, distributedPlan.getCacheIds(),
+ distributedPlan.isReplicatedOnly(), cancel);
+
+ // null is returned in case not all nodes support distributed DML.
+ if (result != null)
+ return result;
+ }
+
+ Iterable<List<?>> cur;
+
+ // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
+ // sub-query and not some dummy stuff like "select 1, 2, 3;"
+ if (!loc && !plan.isLocalSubquery()) {
+ assert !F.isEmpty(plan.selectQuery());
+
+ SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
+ .setArgs(fieldsQry.getArgs())
+ .setDistributedJoins(fieldsQry.isDistributedJoins())
+ .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
+ .setLocal(fieldsQry.isLocal())
+ .setPageSize(fieldsQry.getPageSize())
+ .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS)
+ .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
+
+ cur = querySqlFields(schemaName, newFieldsQry, null, true, true, null, cancel, false).get(0);
+ }
+ else if (plan.hasRows())
+ cur = plan.createRows(fieldsQry.getArgs());
+ else {
+ final GridQueryFieldsResult res = executeQueryLocal0(schemaName, plan.selectQuery(),
+ F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), false, fieldsQry.getTimeout(),
+ cancel, null, null);
+
+ cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+ @Override public Iterator<List<?>> iterator() {
+ try {
+ return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), true);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }, cancel);
+ }
+
+ int pageSize = loc ? 0 : fieldsQry.getPageSize();
+
+ return DmlUtils.processSelectResult(plan, cur, pageSize);
+ }
+
+ /**
+ * Generate SELECT statements to retrieve data for modifications from and find fast UPDATE or DELETE args,
+ * if available.
+ *
+ * @param schema Schema.
+ * @param conn Connection.
+ * @param p Prepared statement.
+ * @param fieldsQry Original fields query.
+ * @param loc Local query flag.
+ * @return Update plan.
+ */
+ @SuppressWarnings("IfMayBeConditional")
+ private UpdatePlan updatePlan(
+ String schema,
+ Connection conn,
+ Prepared p,
+ SqlFieldsQuery fieldsQry,
+ boolean loc
+ ) throws IgniteCheckedException {
+ if (F.eq(QueryUtils.SCHEMA_SYS, schema))
+ throw new IgniteSQLException("DML statements are not supported on " + schema + " schema",
+ IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+ H2CachedStatementKey planKey = new H2CachedStatementKey(schema, p.getSQL(), fieldsQry, loc);
+
+ UpdatePlan res = updatePlanCache.get(planKey);
+
+ if (res != null)
+ return res;
+
+ res = UpdatePlanBuilder.planForStatement(p, loc, this, conn, fieldsQry, updateInTxAllowed);
+
+ // Don't cache re-runs
+ UpdatePlan oldRes = updatePlanCache.putIfAbsent(planKey, res);
+
+ return oldRes != null ? oldRes : res;
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
index 32d84e1..cac2d06 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
@@ -17,9 +17,17 @@
package org.apache.ignite.internal.processors.query.h2;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
+
/**
* Update result - modifications count and keys to re-run query with, if needed.
*/
@@ -60,4 +68,18 @@ public final class UpdateResult {
public Object[] errorKeys() {
return errKeys;
}
+
+ /**
+ * Check update result for erroneous keys and throws concurrent update exception if necessary.
+ */
+ public void throwIfError() {
+ if (!F.isEmpty(errKeys)) {
+ String msg = "Failed to update some keys because they had been modified concurrently " +
+ "[keys=" + Arrays.toString(errKeys) + ']';
+
+ SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+
+ throw new IgniteSQLException(conEx);
+ }
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
index f55abfa..84dfea2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
@@ -124,6 +124,19 @@ public class DdlStatementsProcessor {
}
/**
+ * @param cmd Command.
+ * @return {@code True} if this is supported DDL command.
+ */
+ public static boolean isDdlCommand(SqlCommand cmd) {
+ return cmd instanceof SqlCreateIndexCommand
+ || cmd instanceof SqlDropIndexCommand
+ || cmd instanceof SqlAlterTableCommand
+ || cmd instanceof SqlCreateUserCommand
+ || cmd instanceof SqlAlterUserCommand
+ || cmd instanceof SqlDropUserCommand;
+ }
+
+ /**
* Run DDL statement.
*
* @param sql Original SQL.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
index 7e7f224..c90d149 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
@@ -138,10 +138,9 @@ public final class DmlAstUtils {
* Generate SQL SELECT based on DELETE's WHERE, LIMIT, etc.
*
* @param del Delete statement.
- * @param keysParamIdx Index for .
* @return SELECT statement.
*/
- public static GridSqlSelect selectForDelete(GridSqlDelete del, @Nullable Integer keysParamIdx) {
+ public static GridSqlSelect selectForDelete(GridSqlDelete del) {
GridSqlSelect mapQry = new GridSqlSelect();
mapQry.from(del.from());
@@ -172,8 +171,6 @@ public final class DmlAstUtils {
mapQry.addColumn(valCol, true);
GridSqlElement where = del.where();
- if (keysParamIdx != null)
- where = injectKeysFilterParam(where, keyCol, keysParamIdx);
mapQry.where(where);
mapQry.limit(del.limit());
@@ -231,7 +228,7 @@ public final class DmlAstUtils {
*/
@SuppressWarnings("RedundantCast")
private static IgnitePair<GridSqlElement> findKeyValueEqualityCondition(GridSqlElement where) {
- if (where == null || !(where instanceof GridSqlOperation))
+ if (!(where instanceof GridSqlOperation))
return null;
GridSqlOperation whereOp = (GridSqlOperation) where;
@@ -319,10 +316,9 @@ public final class DmlAstUtils {
* Generate SQL SELECT based on UPDATE's WHERE, LIMIT, etc.
*
* @param update Update statement.
- * @param keysParamIdx Index of new param for the array of keys.
* @return SELECT statement.
*/
- public static GridSqlSelect selectForUpdate(GridSqlUpdate update, @Nullable Integer keysParamIdx) {
+ public static GridSqlSelect selectForUpdate(GridSqlUpdate update) {
GridSqlSelect mapQry = new GridSqlSelect();
mapQry.from(update.target());
@@ -362,8 +358,6 @@ public final class DmlAstUtils {
}
GridSqlElement where = update.where();
- if (keysParamIdx != null)
- where = injectKeysFilterParam(where, keyCol, keysParamIdx);
mapQry.where(where);
mapQry.limit(update.limit());
@@ -422,39 +416,6 @@ public final class DmlAstUtils {
}
/**
- * Append additional condition to WHERE for it to select only specific keys.
- *
- * @param where Initial condition.
- * @param keyCol Column to base the new condition on.
- * @return New condition.
- */
- private static GridSqlElement injectKeysFilterParam(GridSqlElement where, GridSqlColumn keyCol, int paramIdx) {
- // Yes, we need a subquery for "WHERE _key IN ?" to work with param being an array without dirty query rewriting.
- GridSqlSelect sel = new GridSqlSelect();
-
- GridSqlFunction from = new GridSqlFunction(GridSqlFunctionType.TABLE);
-
- sel.from(from);
-
- GridSqlColumn col = new GridSqlColumn(null, from, null, "TABLE", "_IGNITE_ERR_KEYS");
-
- sel.addColumn(col, true);
-
- GridSqlAlias alias = new GridSqlAlias("_IGNITE_ERR_KEYS", new GridSqlParameter(paramIdx));
-
- alias.resultType(keyCol.resultType());
-
- from.addChild(alias);
-
- GridSqlElement e = new GridSqlOperation(GridSqlOperationType.IN, keyCol, new GridSqlSubquery(sel));
-
- if (where == null)
- return e;
- else
- return new GridSqlOperation(GridSqlOperationType.AND, where, e);
- }
-
- /**
* @param qry Select.
* @param params Parameters.
* @param target Extracted parameters.
@@ -550,7 +511,7 @@ public final class DmlAstUtils {
* @param c Closure each found table and subquery will be passed to. If returns {@code true} the we need to stop.
* @return {@code true} If we have found.
*/
- @SuppressWarnings("RedundantCast")
+ @SuppressWarnings({"RedundantCast", "RedundantIfStatement"})
private static boolean findTablesInFrom(GridSqlElement from, IgnitePredicate<GridSqlElement> c) {
if (from == null)
return false;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBulkLoadDataConverter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBulkLoadDataConverter.java
new file mode 100644
index 0000000..e6084d4
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBulkLoadDataConverter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.dml;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.lang.IgniteClosureX;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import java.util.List;
+
+/**
+ * Converts a row of values to actual key+value using {@link UpdatePlan#processRow(List)}.
+ */
+public class DmlBulkLoadDataConverter extends IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> {
+ /** Update plan to convert incoming rows. */
+ private final UpdatePlan plan;
+
+ /**
+ * Creates the converter with the given update plan.
+ *
+ * @param plan The update plan to use.
+ */
+ public DmlBulkLoadDataConverter(UpdatePlan plan) {
+ this.plan = plan;
+ }
+
+ /**
+ * Converts the record to a key+value.
+ *
+ * @param record The record to convert.
+ * @return The key+value.
+ * @throws IgniteCheckedException If conversion failed for some reason.
+ */
+ @Override public IgniteBiTuple<?, ?> applyx(List<?> record) throws IgniteCheckedException {
+ return plan.processRow(record);
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUpdateResultsIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUpdateResultsIterator.java
new file mode 100644
index 0000000..b850c8a
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUpdateResultsIterator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.dml;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.query.EnlistOperation;
+import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * DML update results iterator.
+ */
+public class DmlUpdateResultsIterator implements UpdateSourceIterator<Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final EnlistOperation op;
+
+ /** */
+ private final UpdatePlan plan;
+
+ /** */
+ private Iterator<List<?>> it;
+
+ /**
+ * Constructor.
+ *
+ * @param op Operation.
+ * @param plan Plan.
+ * @param rows Rows.
+ */
+ public DmlUpdateResultsIterator(EnlistOperation op, UpdatePlan plan, Iterable<List<?>> rows) {
+ this.op = op;
+ this.plan = plan;
+
+ it = rows.iterator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public EnlistOperation operation() {
+ return op;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNextX() {
+ return it.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object nextX() throws IgniteCheckedException {
+ return plan.processRowForTx(it.next());
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUpdateSingleEntryIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUpdateSingleEntryIterator.java
new file mode 100644
index 0000000..0266806
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUpdateSingleEntryIterator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.dml;
+
+import org.apache.ignite.internal.processors.query.EnlistOperation;
+import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
+
+/** */
+public class DmlUpdateSingleEntryIterator<T> implements UpdateSourceIterator<T> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final EnlistOperation op;
+
+ /** */
+ private final T entry;
+
+ /** */
+ private boolean first = true;
+
+ /**
+ * Constructor.
+ *
+ * @param op Operation.
+ * @param entry Entry.
+ */
+ public DmlUpdateSingleEntryIterator(EnlistOperation op, T entry) {
+ this.op = op;
+ this.entry = entry;
+ }
+
+ /** {@inheritDoc} */
+ @Override public EnlistOperation operation() {
+ return op;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNextX() {
+ return first;
+ }
+
+ /** {@inheritDoc} */
+ @Override public T nextX() {
+ T res = first ? entry : null;
+
+ first = false;
+
+ return res;
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
index 7be823d..fab9869 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
@@ -18,17 +18,35 @@
package org.apache.ignite.internal.processors.query.h2.dml;
import java.lang.reflect.Array;
+import java.sql.BatchUpdateException;
+import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessor;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.UpdateResult;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.transactions.TransactionDuplicateKeyException;
import org.h2.util.DateTimeUtils;
import org.h2.util.LocalDateTimeUtils;
import org.h2.value.Value;
@@ -36,6 +54,9 @@ import org.h2.value.ValueDate;
import org.h2.value.ValueTime;
import org.h2.value.ValueTimestamp;
+import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY;
+import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
+
/**
* DML utility methods.
*/
@@ -48,11 +69,9 @@ public class DmlUtils {
* @param expCls Expected value class.
* @param type Expected column type to convert to.
* @return Converted object.
- * @throws IgniteCheckedException if failed.
*/
@SuppressWarnings({"ConstantConditions", "SuspiciousSystemArraycopy"})
- public static Object convert(Object val, GridH2RowDescriptor desc, Class<?> expCls, int type)
- throws IgniteCheckedException {
+ public static Object convert(Object val, GridH2RowDescriptor desc, Class<?> expCls, int type) {
if (val == null)
return null;
@@ -128,6 +147,401 @@ public class DmlUtils {
}
/**
+ * @param plan Update plan.
+ * @param cursor Cursor over select results.
+ * @param pageSize Page size.
+ * @return Pair [number of successfully processed items; keys that have failed to be processed]
+ * @throws IgniteCheckedException if failed.
+ */
+ public static UpdateResult processSelectResult(UpdatePlan plan, Iterable<List<?>> cursor,
+ int pageSize) throws IgniteCheckedException {
+ switch (plan.mode()) {
+ case MERGE:
+ return new UpdateResult(doMerge(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
+
+ case INSERT:
+ return new UpdateResult(dmlDoInsert(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
+
+ case UPDATE:
+ return doUpdate(plan, cursor, pageSize);
+
+ case DELETE:
+ return doDelete(plan.cacheContext(), cursor, pageSize);
+
+ default:
+ throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode() + ']',
+ IgniteQueryErrorCode.UNEXPECTED_OPERATION);
+ }
+ }
+
+ /**
+ * Execute INSERT statement plan.
+ * @param cursor Cursor to take inserted data from.
+ * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+ * @return Number of items affected.
+ * @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
+ */
+ @SuppressWarnings({"unchecked"})
+ private static long dmlDoInsert(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
+ GridCacheContext cctx = plan.cacheContext();
+
+ // If we have just one item to put, just do so
+ if (plan.rowCount() == 1) {
+ IgniteBiTuple t = plan.processRow(cursor.iterator().next());
+
+ if (cctx.cache().putIfAbsent(t.getKey(), t.getValue()))
+ return 1;
+ else
+ throw new TransactionDuplicateKeyException("Duplicate key during INSERT [key=" + t.getKey() + ']');
+ }
+ else {
+ // Keys that failed to INSERT due to duplication.
+ DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
+
+ for (List<?> row : cursor) {
+ final IgniteBiTuple keyValPair = plan.processRow(row);
+
+ sender.add(keyValPair.getKey(), new DmlStatementsProcessor.InsertEntryProcessor(keyValPair.getValue()), 0);
+ }
+
+ sender.flush();
+
+ SQLException resEx = sender.error();
+
+ if (!F.isEmpty(sender.failedKeys())) {
+ String msg = "Failed to INSERT some keys because they are already in cache " +
+ "[keys=" + sender.failedKeys() + ']';
+
+ SQLException dupEx = new SQLException(msg, SqlStateCode.CONSTRAINT_VIOLATION);
+
+ if (resEx == null)
+ resEx = dupEx;
+ else
+ resEx.setNextException(dupEx);
+ }
+
+ if (resEx != null)
+ throw new IgniteSQLException(resEx);
+
+ return sender.updateCount();
+ }
+ }
+
+ /**
+ * Perform UPDATE operation on top of results of SELECT.
+ * @param cursor SELECT results.
+ * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+ * @return Pair [cursor corresponding to results of UPDATE (contains number of items affected); keys whose values
+ * had been modified concurrently (arguments for a re-run)].
+ */
+ private static UpdateResult doUpdate(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize)
+ throws IgniteCheckedException {
+ GridCacheContext cctx = plan.cacheContext();
+
+ DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
+
+ for (List<?> row : cursor) {
+ T3<Object, Object, Object> row0 = plan.processRowForUpdate(row);
+
+ Object key = row0.get1();
+ Object oldVal = row0.get2();
+ Object newVal = row0.get3();
+
+ sender.add(key, new DmlStatementsProcessor.ModifyingEntryProcessor(
+ oldVal,
+ new DmlStatementsProcessor.EntryValueUpdater(newVal)),
+ 0
+ );
+ }
+
+ sender.flush();
+
+ SQLException resEx = sender.error();
+
+ if (resEx != null) {
+ if (!F.isEmpty(sender.failedKeys())) {
+ // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
+ // had been modified concurrently right away.
+ String msg = "Failed to UPDATE some keys because they had been modified concurrently " +
+ "[keys=" + sender.failedKeys() + ']';
+
+ SQLException dupEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+
+ dupEx.setNextException(resEx);
+
+ resEx = dupEx;
+ }
+
+ throw new IgniteSQLException(resEx);
+ }
+
+ return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
+ }
+
+ /**
+ * Execute MERGE statement plan.
+ * @param cursor Cursor to take inserted data from.
+ * @param pageSize Batch size to stream data from {@code cursor}, anything <= 0 for single page operations.
+ * @return Number of items affected.
+ * @throws IgniteCheckedException if failed.
+ */
+ @SuppressWarnings("unchecked")
+ private static long doMerge(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
+ GridCacheContext cctx = plan.cacheContext();
+
+ // If we have just one item to put, just do so
+ if (plan.rowCount() == 1) {
+ IgniteBiTuple t = plan.processRow(cursor.iterator().next());
+
+ cctx.cache().put(t.getKey(), t.getValue());
+
+ return 1;
+ }
+ else {
+ int resCnt = 0;
+
+ Map<Object, Object> rows = new LinkedHashMap<>();
+
+ for (Iterator<List<?>> it = cursor.iterator(); it.hasNext();) {
+ List<?> row = it.next();
+
+ IgniteBiTuple t = plan.processRow(row);
+
+ rows.put(t.getKey(), t.getValue());
+
+ if ((pageSize > 0 && rows.size() == pageSize) || !it.hasNext()) {
+ cctx.cache().putAll(rows);
+
+ resCnt += rows.size();
+
+ if (it.hasNext())
+ rows.clear();
+ }
+ }
+
+ return resCnt;
+ }
+ }
+
+ /**
+ * Perform DELETE operation on top of results of SELECT.
+ *
+ * @param cctx Cache context.
+ * @param cursor SELECT results.
+ * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+ * @return Results of DELETE (number of items affected AND keys that failed to be updated).
+ */
+ private static UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize)
+ throws IgniteCheckedException {
+ DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
+
+ for (List<?> row : cursor) {
+ if (row.size() != 2)
+ continue;
+
+ sender.add(
+ row.get(0),
+ new DmlStatementsProcessor.ModifyingEntryProcessor(row.get(1), DmlStatementsProcessor.RMV),
+ 0
+ );
+ }
+
+ sender.flush();
+
+ SQLException resEx = sender.error();
+
+ if (resEx != null) {
+ if (!F.isEmpty(sender.failedKeys())) {
+ // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
+ // had been modified concurrently right away.
+ String msg = "Failed to DELETE some keys because they had been modified concurrently " +
+ "[keys=" + sender.failedKeys() + ']';
+
+ SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+
+ conEx.setNextException(resEx);
+
+ resEx = conEx;
+ }
+
+ throw new IgniteSQLException(resEx);
+ }
+
+ return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
+ }
+
+ /**
+ * Performs the planned update.
+ * @param plan Update plan.
+ * @param rows Rows to update.
+ * @param pageSize Page size.
+ * @return {@link List} of update results.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static List<UpdateResult> processSelectResultBatched(UpdatePlan plan, List<List<List<?>>> rows, int pageSize)
+ throws IgniteCheckedException {
+ switch (plan.mode()) {
+ case MERGE:
+ // TODO
+ throw new IgniteCheckedException("Unsupported, fix");
+
+ case INSERT:
+ return doInsertBatched(plan, rows, pageSize);
+
+ default:
+ throw new IgniteSQLException("Unexpected batched DML operation [mode=" + plan.mode() + ']',
+ IgniteQueryErrorCode.UNEXPECTED_OPERATION);
+ }
+ }
+
+ /**
+ * Execute INSERT statement plan.
+ *
+ * @param plan Plan to execute.
+ * @param cursor Cursor to take inserted data from. I.e. list of batch arguments for each query.
+ * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+ * @return Number of items affected.
+ * @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
+ */
+ private static List<UpdateResult> doInsertBatched(UpdatePlan plan, List<List<List<?>>> cursor, int pageSize)
+ throws IgniteCheckedException {
+ GridCacheContext cctx = plan.cacheContext();
+
+ DmlBatchSender snd = new DmlBatchSender(cctx, pageSize, cursor.size());
+
+ int rowNum = 0;
+
+ SQLException resEx = null;
+
+ for (List<List<?>> qryRow : cursor) {
+ for (List<?> row : qryRow) {
+ try {
+ final IgniteBiTuple keyValPair = plan.processRow(row);
+
+ snd.add(keyValPair.getKey(), new DmlStatementsProcessor.InsertEntryProcessor(keyValPair.getValue()), rowNum);
+ }
+ catch (Exception e) {
+ String sqlState;
+
+ int code;
+
+ if (e instanceof IgniteSQLException) {
+ sqlState = ((IgniteSQLException)e).sqlState();
+
+ code = ((IgniteSQLException)e).statusCode();
+ } else {
+ sqlState = SqlStateCode.INTERNAL_ERROR;
+
+ code = IgniteQueryErrorCode.UNKNOWN;
+ }
+
+ resEx = chainException(resEx, new SQLException(e.getMessage(), sqlState, code, e));
+
+ snd.setFailed(rowNum);
+ }
+ }
+
+ rowNum++;
+ }
+
+ try {
+ snd.flush();
+ }
+ catch (Exception e) {
+ resEx = chainException(resEx, new SQLException(e.getMessage(), SqlStateCode.INTERNAL_ERROR,
+ IgniteQueryErrorCode.UNKNOWN, e));
+ }
+
+ resEx = chainException(resEx, snd.error());
+
+ if (!F.isEmpty(snd.failedKeys())) {
+ SQLException e = new SQLException("Failed to INSERT some keys because they are already in cache [keys=" +
+ snd.failedKeys() + ']', SqlStateCode.CONSTRAINT_VIOLATION, DUPLICATE_KEY);
+
+ resEx = chainException(resEx, e);
+ }
+
+ if (resEx != null) {
+ BatchUpdateException e = new BatchUpdateException(resEx.getMessage(), resEx.getSQLState(),
+ resEx.getErrorCode(), snd.perRowCounterAsArray(), resEx);
+
+ throw new IgniteCheckedException(e);
+ }
+
+ int[] cntPerRow = snd.perRowCounterAsArray();
+
+ List<UpdateResult> res = new ArrayList<>(cntPerRow.length);
+
+ for (int i = 0; i < cntPerRow.length; i++ ) {
+ int cnt = cntPerRow[i];
+
+ res.add(new UpdateResult(cnt , X.EMPTY_OBJECT_ARRAY));
+ }
+
+ return res;
+ }
+
+ /**
+ * Adds exception to the chain.
+ *
+ * @param main Exception to add another exception to.
+ * @param add Exception which should be added to chain.
+ * @return Chained exception.
+ */
+ public static SQLException chainException(SQLException main, SQLException add) {
+ if (main == null) {
+ if (add != null) {
+ main = add;
+
+ return main;
+ }
+ else
+ return null;
+ }
+ else {
+ main.setNextException(add);
+
+ return main;
+ }
+ }
+
+ /**
+ * Makes current operation context as keepBinary.
+ *
+ * @param cctx Cache context.
+ * @return Old operation context.
+ */
+ public static CacheOperationContext setKeepBinaryContext(GridCacheContext<?, ?> cctx) {
+ CacheOperationContext opCtx = cctx.operationContextPerCall();
+
+ // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+ if (cctx.binaryMarshaller()) {
+ CacheOperationContext newOpCtx = null;
+
+ if (opCtx == null)
+ // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
+ newOpCtx = new CacheOperationContext(false, null, true, null, false, null, false, true);
+ else if (!opCtx.isKeepBinary())
+ newOpCtx = opCtx.keepBinary();
+
+ if (newOpCtx != null)
+ cctx.operationContextPerCall(newOpCtx);
+ }
+
+ return opCtx;
+ }
+
+ /**
+ * Restore previous binary context.
+ *
+ * @param cctx Cache context.
+ * @param oldOpCtx Old operation context.
+ */
+ public static void restoreKeepBinaryContext(GridCacheContext<?, ?> cctx, CacheOperationContext oldOpCtx) {
+ cctx.operationContextPerCall(oldOpCtx);
+ }
+
+ /**
* Private constructor.
*/
private DmlUtils() {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 068e3b7..d87e7da 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -100,9 +100,14 @@ public final class UpdatePlanBuilder {
* @return Update plan.
*/
@SuppressWarnings("ConstantConditions")
- public static UpdatePlan planForStatement(Prepared prepared, boolean loc, IgniteH2Indexing idx,
- @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQry, @Nullable Integer errKeysPos,
- boolean dmlInsideTxAllowed)
+ public static UpdatePlan planForStatement(
+ Prepared prepared,
+ boolean loc,
+ IgniteH2Indexing idx,
+ @Nullable Connection conn,
+ @Nullable SqlFieldsQuery fieldsQry,
+ boolean dmlInsideTxAllowed
+ )
throws IgniteCheckedException {
assert !prepared.isQuery();
@@ -140,7 +145,7 @@ public final class UpdatePlanBuilder {
if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert)
return planForInsert(stmt, loc, idx, mvccEnabled, conn, fieldsQry);
else if (stmt instanceof GridSqlUpdate || stmt instanceof GridSqlDelete)
- return planForUpdate(stmt, loc, idx, mvccEnabled, conn, fieldsQry, errKeysPos);
+ return planForUpdate(stmt, loc, idx, mvccEnabled, conn, fieldsQry);
else
throw new IgniteSQLException("Unsupported operation: " + prepared.getSQL(),
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
@@ -402,13 +407,17 @@ public final class UpdatePlanBuilder {
* @param mvccEnabled Mvcc flag.
* @param conn Connection.
* @param fieldsQuery Original query.
- * @param errKeysPos index to inject param for re-run keys at. Null if it's not a re-run plan.
* @return Update plan.
* @throws IgniteCheckedException if failed.
*/
- private static UpdatePlan planForUpdate(GridSqlStatement stmt, boolean loc, IgniteH2Indexing idx,
- boolean mvccEnabled, @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery,
- @Nullable Integer errKeysPos) throws IgniteCheckedException {
+ private static UpdatePlan planForUpdate(
+ GridSqlStatement stmt,
+ boolean loc,
+ IgniteH2Indexing idx,
+ boolean mvccEnabled,
+ @Nullable Connection conn,
+ @Nullable SqlFieldsQuery fieldsQuery
+ ) throws IgniteCheckedException {
GridSqlElement target;
FastUpdate fastUpdate;
@@ -493,7 +502,7 @@ public final class UpdatePlanBuilder {
KeyValueSupplier valSupplier = createSupplier(desc.context(), desc.type(), newValColIdx, hasProps,
false, true);
- sel = DmlAstUtils.selectForUpdate((GridSqlUpdate) stmt, errKeysPos);
+ sel = DmlAstUtils.selectForUpdate((GridSqlUpdate)stmt);
String selectSql = sel.getSQL();
@@ -519,7 +528,7 @@ public final class UpdatePlanBuilder {
);
}
else {
- sel = DmlAstUtils.selectForDelete((GridSqlDelete) stmt, errKeysPos);
+ sel = DmlAstUtils.selectForDelete((GridSqlDelete)stmt);
String selectSql = sel.getSQL();
@@ -641,7 +650,7 @@ public final class UpdatePlanBuilder {
* @return Closure returning key or value.
* @throws IgniteCheckedException If failed.
*/
- @SuppressWarnings({"ConstantConditions", "unchecked"})
+ @SuppressWarnings({"ConstantConditions", "unchecked", "IfMayBeConditional"})
private static KeyValueSupplier createSupplier(final GridCacheContext<?, ?> cctx, GridQueryTypeDescriptor desc,
final int colIdx, boolean hasProps, final boolean key, boolean forUpdate) throws IgniteCheckedException {
final String typeName = key ? desc.keyTypeName() : desc.valueTypeName();
@@ -666,7 +675,7 @@ public final class UpdatePlanBuilder {
// If we have key or value explicitly present in query, create new builder upon them...
return new KeyValueSupplier() {
/** {@inheritDoc} */
- @Override public Object apply(List<?> arg) throws IgniteCheckedException {
+ @Override public Object apply(List<?> arg) {
Object obj = arg.get(colIdx);
if (obj == null)
@@ -686,7 +695,7 @@ public final class UpdatePlanBuilder {
// ...and if we don't, just create a new builder.
return new KeyValueSupplier() {
/** {@inheritDoc} */
- @Override public Object apply(List<?> arg) throws IgniteCheckedException {
+ @Override public Object apply(List<?> arg) {
BinaryObjectBuilder builder = cctx.grid().binary().builder(typeName);
cctx.prepareAffinityField(builder);
@@ -776,7 +785,7 @@ public final class UpdatePlanBuilder {
* @param statement Statement.
*/
private static void verifyUpdateColumns(GridSqlStatement statement) {
- if (statement == null || !(statement instanceof GridSqlUpdate))
+ if (!(statement instanceof GridSqlUpdate))
return;
GridSqlUpdate update = (GridSqlUpdate) statement;
@@ -974,7 +983,7 @@ public final class UpdatePlanBuilder {
}
/** {@inheritDoc} */
- @Override public Object apply(List<?> arg) throws IgniteCheckedException {
+ @Override public Object apply(List<?> arg) {
return arg.get(colIdx);
}
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index f66a289..5b1609d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -1688,6 +1688,16 @@ public class GridSqlQueryParser {
}
/**
+ * Check whether statement is DML statement.
+ *
+ * @param stmt Statement.
+ * @return {@code True} if this is DML.
+ */
+ public static boolean isDml(Prepared stmt) {
+ return stmt instanceof Merge || stmt instanceof Insert || stmt instanceof Update || stmt instanceof Delete;
+ }
+
+ /**
* @param stmt Prepared.
* @return Target table.
*/
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 9b35f04..7a104b0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -913,7 +913,7 @@ public class GridMapQueryExecutor {
local = false;
}
- UpdateResult updRes = h2.mapDistributedUpdate(req.schemaName(), fldsQry, filter, cancel, local);
+ UpdateResult updRes = h2.executeUpdateOnDataNode(req.schemaName(), fldsQry, filter, cancel, local);
GridCacheContext<?, ?> mainCctx =
!F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingInMemSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingInMemSelfTest.java
deleted file mode 100644
index abf1f11..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingInMemSelfTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-/**
- * Tests for H2 indexing SPI.
- */
-public class GridH2IndexingInMemSelfTest extends GridIndexingSpiAbstractSelfTest {
- // No-op.
-}
\ No newline at end of file
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingOffheapSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingOffheapSelfTest.java
deleted file mode 100644
index 4722d76..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridH2IndexingOffheapSelfTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-/**
- * Tests for H2 indexing SPI.
- */
-public class GridH2IndexingOffheapSelfTest extends GridIndexingSpiAbstractSelfTest {
- /** {@inheritDoc} */
- @Override protected boolean offheap() {
- return true;
- }
-}
\ No newline at end of file
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
deleted file mode 100644
index 7367245..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.QueryEntity;
-import org.apache.ignite.cache.QueryIndex;
-import org.apache.ignite.cache.QueryIndexType;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridStringLogger;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.junit.Test;
-
-/**
- * Tests for all SQL based indexing SPI implementations.
- */
-public abstract class GridIndexingSpiAbstractSelfTest extends AbstractIndexingCommonTest {
- /** */
- private static final LinkedHashMap<String, String> fieldsAA = new LinkedHashMap<>();
-
- /** */
- private static final LinkedHashMap<String, String> fieldsAB = new LinkedHashMap<>();
-
- /** */
- private IgniteEx ignite0;
-
- /** {@inheritDoc} */
- @SuppressWarnings("deprecation")
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- cfg.setMarshaller(new BinaryMarshaller());
-
- return cfg;
- }
-
- /*
- * Fields initialization.
- */
- static {
- fieldsAA.put("id", Long.class.getName());
- fieldsAA.put("name", String.class.getName());
- fieldsAA.put("age", Integer.class.getName());
-
- fieldsAB.putAll(fieldsAA);
- fieldsAB.put("txt", String.class.getName());
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- ignite0 = startGrid(0);
- }
-
- /**
- */
- private CacheConfiguration cacheACfg() {
- CacheConfiguration<?,?> cfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
-
- cfg.setName("A");
-
- QueryEntity eA = new QueryEntity(Integer.class.getName(), "A");
- eA.setFields(fieldsAA);
-
- QueryEntity eB = new QueryEntity(Integer.class.getName(), "B");
- eB.setFields(fieldsAB);
-
- List<QueryEntity> list = new ArrayList<>(2);
-
- list.add(eA);
- list.add(eB);
-
- QueryIndex idx = new QueryIndex("txt");
- idx.setIndexType(QueryIndexType.FULLTEXT);
- eB.setIndexes(Collections.singleton(idx));
-
- cfg.setQueryEntities(list);
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
- }
-
- /**
- * @return Indexing.
- */
- private IgniteH2Indexing getIndexing() {
- return U.field(ignite0.context().query(), "idx");
- }
-
- /**
- * @return {@code true} if OFF-HEAP mode should be tested.
- */
- protected boolean offheap() {
- return false;
- }
-
- /**
- * Test long queries write explain warnings into log.
- *
- * @throws Exception If failed.
- */
- @SuppressWarnings({"unchecked", "deprecation"})
- @Test
- public void testLongQueries() throws Exception {
- IgniteH2Indexing spi = getIndexing();
-
- ignite0.createCache(cacheACfg());
-
- long longQryExecTime = IgniteConfiguration.DFLT_LONG_QRY_WARN_TIMEOUT;
-
- GridStringLogger log = new GridStringLogger(false, this.log);
-
- IgniteLogger oldLog = GridTestUtils.getFieldValue(spi, "log");
-
- try {
- GridTestUtils.setFieldValue(spi, "log", log);
-
- String sql = "select sum(x) FROM SYSTEM_RANGE(?, ?)";
-
- long now = U.currentTimeMillis();
- long time = now;
-
- long range = 1000000L;
-
- while (now - time <= longQryExecTime * 3 / 2) {
- time = now;
- range *= 3;
-
- GridQueryFieldsResult res = spi.queryLocalSqlFields(spi.schema("A"), sql, Arrays.<Object>asList(1,
- range), null, false, false, 0, null, null);
-
- assert res.iterator().hasNext();
-
- now = U.currentTimeMillis();
- }
-
- String res = log.toString();
-
- assertTrue(res.contains("/* PUBLIC.RANGE_INDEX */"));
- }
- finally {
- GridTestUtils.setFieldValue(spi, "log", oldLog);
- }
- }
-}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index e337fcc..095518c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -203,8 +203,6 @@ import org.apache.ignite.internal.processors.query.SqlQueryHistoryFromClientSelf
import org.apache.ignite.internal.processors.query.SqlQueryHistorySelfTest;
import org.apache.ignite.internal.processors.query.SqlSchemaSelfTest;
import org.apache.ignite.internal.processors.query.SqlSystemViewsSelfTest;
-import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest;
-import org.apache.ignite.internal.processors.query.h2.GridH2IndexingOffheapSelfTest;
import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildSelfTest;
import org.apache.ignite.internal.processors.query.h2.H2ResultSetIteratorNullifyOnEndSelfTest;
import org.apache.ignite.internal.processors.query.h2.H2StatementCacheSelfTest;
@@ -292,11 +290,6 @@ import org.junit.runners.Suite;
DynamicIndexServerNodeFIlterBasicSelfTest.class,
DynamicIndexClientBasicSelfTest.class,
- // H2 tests.
-
- GridH2IndexingInMemSelfTest.class,
- GridH2IndexingOffheapSelfTest.class,
-
// Parsing
GridQueryParsingTest.class,
IgniteCacheSqlQueryErrorSelfTest.class,