You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2019/01/22 05:33:45 UTC
[ignite] branch master updated: IGNITE-10798 Data page scan for
ScanQuery, SqlQuery and SqlFieldsQuery - Fixes #5639.
This is an automated email from the ASF dual-hosted git repository.
sergi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new bc209d0 IGNITE-10798 Data page scan for ScanQuery, SqlQuery and SqlFieldsQuery - Fixes #5639.
bc209d0 is described below
commit bc209d082bd905f0e8e6fd5cf38306336129d4d0
Author: Sergi Vladykin <se...@gmail.com>
AuthorDate: Tue Jan 22 08:32:25 2019 +0300
IGNITE-10798 Data page scan for ScanQuery, SqlQuery and SqlFieldsQuery - Fixes #5639.
Signed-off-by: Sergi Vladykin <se...@gmail.com>
---
.../org/apache/ignite/cache/query/ScanQuery.java | 31 ++
.../apache/ignite/cache/query/SqlFieldsQuery.java | 32 ++
.../org/apache/ignite/cache/query/SqlQuery.java | 31 ++
.../ignite/configuration/CacheConfiguration.java | 2 +
.../processors/cache/GridCacheAdapter.java | 12 +-
.../cache/IgniteCacheOffheapManager.java | 25 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 79 ++-
.../processors/cache/IgniteCacheProxyImpl.java | 2 +-
.../processors/cache/IncompleteObject.java | 19 +-
.../internal/processors/cache/mvcc/MvccUtils.java | 67 ++-
.../cache/persistence/CacheDataRowAdapter.java | 239 +++++---
.../GridCacheDatabaseSharedManager.java | 10 +
.../cache/persistence/pagemem/PageMemoryEx.java | 11 +-
.../cache/persistence/pagemem/PageMemoryImpl.java | 24 +-
.../cache/persistence/tree/BPlusTree.java | 6 +-
.../persistence/tree/io/AbstractDataPageIO.java | 8 +
.../query/GridCacheDistributedQueryManager.java | 14 +-
.../cache/query/GridCacheQueryAdapter.java | 37 +-
.../cache/query/GridCacheQueryManager.java | 60 +-
.../cache/query/GridCacheQueryRequest.java | 130 +++--
.../continuous/CacheContinuousQueryManager.java | 4 +-
.../processors/cache/tree/CacheDataRowStore.java | 7 +
.../processors/cache/tree/CacheDataTree.java | 200 ++++++-
.../internal/processors/cache/tree/DataRow.java | 2 +-
.../cache/tree/mvcc/data/MvccDataRow.java | 10 +-
.../tree/mvcc/search/MvccDataPageClosure.java | 36 ++
.../datastructures/GridCacheSetImpl.java | 6 +-
.../processors/query/GridQueryTypeDescriptor.java | 7 +
.../processors/query/QueryTypeDescriptorImpl.java | 13 +
.../twostep/messages/GridQueryNextPageRequest.java | 44 +-
.../processors/service/GridServiceProcessor.java | 2 +-
.../cache/query/CacheDataPageScanQueryTest.java | 153 ++++++
.../query/h2/DmlStatementsProcessor.java | 18 +-
.../processors/query/h2/H2TableDescriptor.java | 3 +-
.../processors/query/h2/IgniteH2Indexing.java | 97 +++-
.../query/h2/database/H2PkHashIndex.java | 54 +-
.../query/h2/database/H2TreeFilterClosure.java | 22 +-
.../query/h2/opt/GridH2PrimaryScanIndex.java | 7 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 44 +-
.../query/h2/twostep/GridReduceQueryExecutor.java | 19 +-
.../query/h2/twostep/MapQueryResult.java | 99 ++--
.../query/h2/twostep/ReduceQueryRun.java | 27 +-
.../query/h2/twostep/msg/GridH2DmlRequest.java | 9 +
.../query/h2/twostep/msg/GridH2QueryRequest.java | 58 ++
.../processors/cache/index/H2RowCacheSelfTest.java | 15 +-
.../query/h2/GridIndexingSpiAbstractSelfTest.java | 2 +-
.../processors/query/h2/QueryDataPageScanTest.java | 610 +++++++++++++++++++++
.../IgniteBinaryCacheQueryTestSuite.java | 4 +
48 files changed, 2045 insertions(+), 366 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
index c7fa3d2..7c118da 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java
@@ -19,6 +19,8 @@ package org.apache.ignite.cache.query;
import javax.cache.Cache;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.jetbrains.annotations.Nullable;
@@ -38,6 +40,9 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** */
private Integer part;
+ /** */
+ private Boolean dataPageScanEnabled;
+
/**
* Create scan query returning all entries.
*/
@@ -118,6 +123,32 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> {
return part;
}
+ /**
+ * Sets data page scan enabled or disabled.
+ *
+ * Makes sense only with enabled {@link DataRegionConfiguration#setPersistenceEnabled persistence}
+ * and generally improves performance.
+ * When enabled, result may miss some concurrent updates or produce duplicates for the same key.
+ * To avoid these issues use with {@link CacheAtomicityMode#TRANSACTIONAL_SNAPSHOT}.
+ *
+ * @param dataPageScanEnabled {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
+ * @return {@code this} for chaining.
+ */
+ public ScanQuery<K, V> setDataPageScanEnabled(Boolean dataPageScanEnabled) {
+ this.dataPageScanEnabled = dataPageScanEnabled;
+
+ return this;
+ }
+
+ /**
+ * Checks if data page scan enabled.
+ *
+ * @return {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
+ */
+ public Boolean isDataPageScanEnabled() {
+ return dataPageScanEnabled;
+ }
+
/** {@inheritDoc} */
@Override public ScanQuery<K, V> setPageSize(int pageSize) {
return (ScanQuery<K, V>)super.setPageSize(pageSize);
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 6945f25..fe283e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -20,6 +20,8 @@ package org.apache.ignite.cache.query;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -80,6 +82,9 @@ public class SqlFieldsQuery extends Query<List<?>> {
/** Schema. */
private String schema;
+ /** */
+ private Boolean dataPageScanEnabled;
+
/**
* Copy constructs SQL fields query.
*
@@ -96,6 +101,7 @@ public class SqlFieldsQuery extends Query<List<?>> {
lazy = qry.lazy;
parts = qry.parts;
schema = qry.schema;
+ dataPageScanEnabled = qry.dataPageScanEnabled;
}
/**
@@ -374,6 +380,32 @@ public class SqlFieldsQuery extends Query<List<?>> {
}
/**
+ * Sets data page scan enabled or disabled.
+ *
+ * Makes sense only with enabled {@link DataRegionConfiguration#setPersistenceEnabled persistence}
+ * and generally improves performance of full-scan SQL queries.
+ * When enabled, result may miss some concurrent updates or produce duplicates for the same key.
+ * To avoid these issues use with {@link CacheAtomicityMode#TRANSACTIONAL_SNAPSHOT}.
+ *
+ * @param dataPageScanEnabled {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
+ * @return {@code this} for chaining.
+ */
+ public SqlFieldsQuery setDataPageScanEnabled(Boolean dataPageScanEnabled) {
+ this.dataPageScanEnabled = dataPageScanEnabled;
+
+ return this;
+ }
+
+ /**
+ * Checks if data page scan enabled.
+ *
+ * @return {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
+ */
+ public Boolean isDataPageScanEnabled() {
+ return dataPageScanEnabled;
+ }
+
+ /**
* @return Copy of this query.
*/
public SqlFieldsQuery copy() {
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
index 5b0667c..7cb97c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
@@ -20,6 +20,8 @@ package org.apache.ignite.cache.query;
import java.util.concurrent.TimeUnit;
import javax.cache.Cache;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -60,6 +62,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** Partitions for query */
private int[] parts;
+ /** */
+ private Boolean dataPageScanEnabled;
+
/**
* Constructs query for the given type name and SQL query.
*
@@ -280,6 +285,32 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
return this;
}
+ /**
+ * Sets data page scan enabled or disabled.
+ *
+ * Makes sense only with enabled {@link DataRegionConfiguration#setPersistenceEnabled persistence}
+ * and generally improves performance of full-scan SQL queries.
+ * When enabled, result may miss some concurrent updates or produce duplicates for the same key.
+ * To avoid these issues use with {@link CacheAtomicityMode#TRANSACTIONAL_SNAPSHOT}.
+ *
+ * @param dataPageScanEnabled {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
+ * @return {@code this} for chaining.
+ */
+ public SqlQuery<K,V> setDataPageScanEnabled(Boolean dataPageScanEnabled) {
+ this.dataPageScanEnabled = dataPageScanEnabled;
+
+ return this;
+ }
+
+ /**
+ * Checks if data page scan enabled.
+ *
+ * @return {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
+ */
+ public Boolean isDataPageScanEnabled() {
+ return dataPageScanEnabled;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SqlQuery.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 10cbedd1a..ccbf35b 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -2073,6 +2073,8 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
* @return {@code this} for chaining.
*/
public CacheConfiguration<K, V> setQueryParallelism(int qryParallelism) {
+ A.ensure(qryParallelism > 0, "Query parallelism must be positive.");
+
this.qryParallelism = qryParallelism;
return this;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index a3e1707..c2c5e6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -784,8 +784,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
MvccSnapshot mvccSnapshot = ctx.mvccEnabled() ? MvccUtils.MVCC_MAX_SNAPSHOT : null;
- its.add(offheapMgr
- .cacheEntriesIterator(ctx, modes.primary, modes.backup, topVer, ctx.keepBinary(), mvccSnapshot));
+ its.add(offheapMgr.cacheEntriesIterator(ctx, modes.primary, modes.backup, topVer, ctx.keepBinary(),
+ mvccSnapshot, null));
}
}
else if (modes.heap) {
@@ -3138,8 +3138,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
List<K> keys = new ArrayList<>(Math.min(REMOVE_ALL_KEYS_BATCH, size()));
do {
- for (Iterator<CacheDataRow> it = ctx.offheap().cacheIterator(ctx.cacheId(), true, true, null, null);
- it.hasNext() && keys.size() < REMOVE_ALL_KEYS_BATCH; )
+ Iterator<CacheDataRow> it = ctx.offheap().cacheIterator(ctx.cacheId(),
+ true, true, null, null, null);
+
+ while (it.hasNext() && keys.size() < REMOVE_ALL_KEYS_BATCH)
keys.add((K)it.next().key());
removeAll(keys);
@@ -4093,7 +4095,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final CacheOperationContext opCtx = ctx.operationContextPerCall();
- final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(p, null, keepBinary)
+ final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(p, null, keepBinary, null)
.executeScanQuery();
return ctx.itHolder().iterator(iter, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index d307772..c0c81c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -418,6 +418,7 @@ public interface IgniteCacheOffheapManager {
* @param backup Backup entries flag.
* @param topVer Topology version.
* @param mvccSnapshot MVCC snapshot.
+ * @param dataPageScanEnabled Flag to enable data page scan.
* @return Rows iterator.
* @throws IgniteCheckedException If failed.
*/
@@ -425,29 +426,20 @@ public interface IgniteCacheOffheapManager {
boolean primary,
boolean backup,
AffinityTopologyVersion topVer,
- @Nullable MvccSnapshot mvccSnapshot)
- throws IgniteCheckedException;
-
- /**
- * @param cacheId Cache ID.
- * @param part Partition.
- * @return Partition data iterator.
- * @throws IgniteCheckedException If failed.
- */
- public default GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, final int part)
- throws IgniteCheckedException {
- return cachePartitionIterator(cacheId, part, null);
- }
+ @Nullable MvccSnapshot mvccSnapshot,
+ Boolean dataPageScanEnabled
+ ) throws IgniteCheckedException;
/**
* @param cacheId Cache ID.
* @param part Partition.
* @param mvccSnapshot MVCC snapshot.
+ * @param dataPageScanEnabled Flag to enable data page scan.
* @return Partition data iterator.
* @throws IgniteCheckedException If failed.
*/
public GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, final int part,
- @Nullable MvccSnapshot mvccSnapshot) throws IgniteCheckedException;
+ @Nullable MvccSnapshot mvccSnapshot, Boolean dataPageScanEnabled) throws IgniteCheckedException;
/**
* @param part Partition number.
@@ -481,6 +473,7 @@ public interface IgniteCacheOffheapManager {
* @param topVer Topology version.
* @param keepBinary Keep binary flag.
* @param mvccSnapshot MVCC snapshot.
+ * @param dataPageScanEnabled Flag to enable data page scan.
* @return Entries iterator.
* @throws IgniteCheckedException If failed.
*/
@@ -490,7 +483,9 @@ public interface IgniteCacheOffheapManager {
final boolean backup,
final AffinityTopologyVersion topVer,
final boolean keepBinary,
- @Nullable final MvccSnapshot mvccSnapshot) throws IgniteCheckedException;
+ @Nullable final MvccSnapshot mvccSnapshot,
+ Boolean dataPageScanEnabled
+ ) throws IgniteCheckedException;
/**
* @param cacheId Cache ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 9e7b3ed..6653dee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -80,6 +80,7 @@ import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateDataRow;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.ResultType;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccDataPageClosure;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccFirstRowTreeClosure;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccMaxSearchRow;
@@ -88,6 +89,7 @@ import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccSnapshot
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccTreeClosure;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
+import org.apache.ignite.internal.transactions.IgniteTxMvccVersionCheckedException;
import org.apache.ignite.internal.stat.IoStatisticsHolder;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
@@ -101,6 +103,7 @@ import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -110,6 +113,7 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.lang.Boolean.TRUE;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
@@ -729,7 +733,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
@Override public void clearCache(GridCacheContext cctx, boolean readers) {
GridCacheVersion obsoleteVer = null;
- try (GridCloseableIterator<CacheDataRow> it = grp.isLocal() ? iterator(cctx.cacheId(), cacheDataStores().iterator(), null) :
+ try (GridCloseableIterator<CacheDataRow> it = grp.isLocal() ?
+ iterator(cctx.cacheId(), cacheDataStores().iterator(), null, null) :
evictionSafeIterator(cctx.cacheId(), cacheDataStores().iterator())) {
while (it.hasNext()) {
cctx.shared().database().checkpointReadLock();
@@ -782,8 +787,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
final boolean backup,
final AffinityTopologyVersion topVer,
final boolean keepBinary,
- @Nullable final MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
- final Iterator<CacheDataRow> it = cacheIterator(cctx.cacheId(), primary, backup, topVer, mvccSnapshot);
+ @Nullable final MvccSnapshot mvccSnapshot,
+ Boolean dataPageScanEnabled
+ ) {
+ final Iterator<CacheDataRow> it = cacheIterator(cctx.cacheId(), primary, backup,
+ topVer, mvccSnapshot, dataPageScanEnabled);
return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() {
/** */
@@ -867,30 +875,31 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
boolean primary,
boolean backups,
final AffinityTopologyVersion topVer,
- @Nullable MvccSnapshot mvccSnapshot)
- throws IgniteCheckedException {
- return iterator(cacheId, cacheData(primary, backups, topVer), mvccSnapshot);
+ @Nullable MvccSnapshot mvccSnapshot,
+ Boolean dataPageScanEnabled
+ ) {
+ return iterator(cacheId, cacheData(primary, backups, topVer), mvccSnapshot, dataPageScanEnabled);
}
/** {@inheritDoc} */
@Override public GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, int part,
- @Nullable MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
+ @Nullable MvccSnapshot mvccSnapshot, Boolean dataPageScanEnabled) {
CacheDataStore data = partitionData(part);
if (data == null)
return new GridEmptyCloseableIterator<>();
- return iterator(cacheId, singletonIterator(data), mvccSnapshot);
+ return iterator(cacheId, singletonIterator(data), mvccSnapshot, dataPageScanEnabled);
}
/** {@inheritDoc} */
- @Override public GridIterator<CacheDataRow> partitionIterator(int part) throws IgniteCheckedException {
+ @Override public GridIterator<CacheDataRow> partitionIterator(int part) {
CacheDataStore data = partitionData(part);
if (data == null)
return new GridEmptyCloseableIterator<>();
- return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null);
+ return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, null);
}
/**
@@ -898,12 +907,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
* @param cacheId Cache ID.
* @param dataIt Data store iterator.
* @param mvccSnapshot Mvcc snapshot.
+ * @param dataPageScanEnabled Flag to enable data page scan.
* @return Rows iterator
*/
private GridCloseableIterator<CacheDataRow> iterator(final int cacheId,
final Iterator<CacheDataStore> dataIt,
- final MvccSnapshot mvccSnapshot)
- {
+ final MvccSnapshot mvccSnapshot,
+ Boolean dataPageScanEnabled
+ ) {
return new GridCloseableIteratorAdapter<CacheDataRow>() {
/** */
private GridCursor<? extends CacheDataRow> cur;
@@ -933,11 +944,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
curPart = ds.partId();
- if (mvccSnapshot == null)
- cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId);
- else {
- cur = cacheId == CU.UNDEFINED_CACHE_ID ?
- ds.cursor(mvccSnapshot) : ds.cursor(cacheId, mvccSnapshot);
+ // Data page scan is disabled by default for scan queries.
+ CacheDataTree.setDataPageScanEnabled(dataPageScanEnabled == TRUE);
+
+ try {
+ if (mvccSnapshot == null)
+ cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId);
+ else {
+ cur = cacheId == CU.UNDEFINED_CACHE_ID ?
+ ds.cursor(mvccSnapshot) : ds.cursor(cacheId, mvccSnapshot);
+ }
+ }
+ finally {
+ CacheDataTree.setDataPageScanEnabled(false);
}
}
else
@@ -2916,7 +2935,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** */
- private final class MvccFirstVisibleRowTreeClosure implements MvccTreeClosure {
+ private final class MvccFirstVisibleRowTreeClosure implements MvccTreeClosure, MvccDataPageClosure {
/** */
private final GridCacheContext cctx;
@@ -2946,6 +2965,24 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return isVisible(cctx, snapshot, rowCrdVer, rowCntr, rowOpCntr, rowIo.getLink(pageAddr, idx));
}
+
+ /** {@inheritDoc} */
+ @Override public boolean applyMvcc(DataPageIO io, long dataPageAddr, int itemId, int pageSize)
+ throws IgniteCheckedException {
+ try {
+ return isVisible(cctx, snapshot, io, dataPageAddr, itemId, pageSize);
+ }
+ catch (IgniteTxMvccVersionCheckedException e) {
+ // TODO this catch must not be needed if we switch Vacuum to data page scan
+ // We expect the active tx state can be observed by read tx only in the cases when tx has been aborted
+ // asynchronously and node hasn't received finish message yet but coordinator has already removed it from
+ // the active txs map. Rows written by this tx are invisible to anyone and will be removed by the vacuum.
+ if (log.isDebugEnabled())
+ log.debug( "Unexpected tx state on index lookup. " + X.getFullStackTrace(e));
+
+ return false;
+ }
+ }
}
/**
@@ -3127,7 +3164,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
ctx.wal().log(new DataPageMvccMarkUpdatedRecord(cacheId, pageId, itemId,
newVer.coordinatorVersion(), newVer.counter(), newVer.operationCounter()));
- return Boolean.TRUE;
+ return TRUE;
}
}
@@ -3180,7 +3217,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
// We do not throw an exception here because new version may be updated by active Tx at this moment.
}
- return Boolean.TRUE;
+ return TRUE;
}
}
@@ -3245,7 +3282,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
ctx.wal().log(new DataPageMvccUpdateNewTxStateHintRecord(cacheId, pageId, itemId, newRow.newMvccTxState()));
}
- return Boolean.TRUE;
+ return TRUE;
}
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 24b7f16..a3ca605 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -496,7 +496,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
IgniteBiPredicate<K, V> p = scanQry.getFilter();
final CacheQuery<R> qry = ctx.queries().createScanQuery(
- p, transformer, scanQry.getPartition(), isKeepBinary, scanQry.isLocal());
+ p, transformer, scanQry.getPartition(), isKeepBinary, scanQry.isLocal(), scanQry.isDataPageScanEnabled());
if (scanQry.getPageSize() > 0)
qry.pageSize(scanQry.getPageSize());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java
index 666fc27..7c24c12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java
@@ -24,6 +24,9 @@ import java.nio.ByteBuffer;
*/
public class IncompleteObject<T> {
/** */
+ private long nextLink;
+
+ /** */
protected byte[] data;
/** */
@@ -42,7 +45,7 @@ public class IncompleteObject<T> {
/**
* Constructor.
*/
- protected IncompleteObject() {
+ public IncompleteObject() {
// No-op.
}
@@ -86,4 +89,18 @@ public class IncompleteObject<T> {
off += len;
}
+
+ /**
+ * @return Next data page link for fragmented rows.
+ */
+ public long getNextLink() {
+ return nextLink;
+ }
+
+ /**
+ * @param nextLink Next data page link for fragmented rows.
+ */
+ public void setNextLink(long nextLink) {
+ this.nextLink = nextLink;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index 73e9bba..ac2818d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -588,7 +588,10 @@ public class MvccUtils {
PageMemory pageMem = cctx.dataRegion().pageMemory();
int grpId = cctx.groupId();
+ int pageSize = pageMem.realPageSize(grpId);
+
long pageId = pageId(link);
+ int itemId = itemId(link);
long page = pageMem.acquirePage(grpId, pageId);
try {
@@ -597,23 +600,7 @@ public class MvccUtils {
try{
DataPageIO dataIo = DataPageIO.VERSIONS.forPage(pageAddr);
- int offset = dataIo.getPayloadOffset(pageAddr, itemId(link), pageMem.realPageSize(grpId),
- MVCC_INFO_SIZE);
-
- long mvccCrd = dataIo.mvccCoordinator(pageAddr, offset);
- long mvccCntr = dataIo.mvccCounter(pageAddr, offset);
- int mvccOpCntr = dataIo.mvccOperationCounter(pageAddr, offset) & ~MVCC_KEY_ABSENT_BEFORE_MASK;
-
- assert mvccVersionIsValid(mvccCrd, mvccCntr, mvccOpCntr) : mvccVersion(mvccCrd, mvccCntr, mvccOpCntr);
-
- long newMvccCrd = dataIo.newMvccCoordinator(pageAddr, offset);
- long newMvccCntr = dataIo.newMvccCounter(pageAddr, offset);
- int newMvccOpCntr = dataIo.newMvccOperationCounter(pageAddr, offset) & ~MVCC_KEY_ABSENT_BEFORE_MASK;
-
- assert newMvccCrd == MVCC_CRD_COUNTER_NA || mvccVersionIsValid(newMvccCrd, newMvccCntr, newMvccOpCntr)
- : mvccVersion(newMvccCrd, newMvccCntr, newMvccOpCntr);
-
- return clo.apply(cctx, snapshot, mvccCrd, mvccCntr, mvccOpCntr, newMvccCrd, newMvccCntr, newMvccOpCntr);
+ return invoke(cctx, dataIo, pageAddr, itemId, pageSize, clo, snapshot);
}
finally {
pageMem.readUnlock(grpId, pageId, page);
@@ -625,6 +612,52 @@ public class MvccUtils {
}
/**
+ * @param cctx Cache context.
+ * @param dataIo Data page IO.
+ * @param pageAddr Page address.
+ * @param itemId Item Id.
+ * @param pageSize Page size.
+ * @param clo Closure.
+ * @param snapshot Mvcc snapshot.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private static <R> R invoke(GridCacheContext cctx, DataPageIO dataIo, long pageAddr, int itemId, int pageSize,
+ MvccClosure<R> clo, MvccSnapshot snapshot) throws IgniteCheckedException {
+ int offset = dataIo.getPayloadOffset(pageAddr, itemId, pageSize, MVCC_INFO_SIZE);
+
+ long mvccCrd = dataIo.mvccCoordinator(pageAddr, offset);
+ long mvccCntr = dataIo.mvccCounter(pageAddr, offset);
+ int mvccOpCntr = dataIo.mvccOperationCounter(pageAddr, offset) & ~MVCC_KEY_ABSENT_BEFORE_MASK;
+
+ assert mvccVersionIsValid(mvccCrd, mvccCntr, mvccOpCntr) : mvccVersion(mvccCrd, mvccCntr, mvccOpCntr);
+
+ long newMvccCrd = dataIo.newMvccCoordinator(pageAddr, offset);
+ long newMvccCntr = dataIo.newMvccCounter(pageAddr, offset);
+ int newMvccOpCntr = dataIo.newMvccOperationCounter(pageAddr, offset) & ~MVCC_KEY_ABSENT_BEFORE_MASK;
+
+ assert newMvccCrd == MVCC_CRD_COUNTER_NA || mvccVersionIsValid(newMvccCrd, newMvccCntr, newMvccOpCntr)
+ : mvccVersion(newMvccCrd, newMvccCntr, newMvccOpCntr);
+
+ return clo.apply(cctx, snapshot, mvccCrd, mvccCntr, mvccOpCntr, newMvccCrd, newMvccCntr, newMvccOpCntr);
+ }
+
+ /**
+ * @param cctx Cache context.
+ * @param snapshot Mvcc snapshot.
+ * @param dataIo Data page IO.
+ * @param pageAddr Page address.
+ * @param itemId Item Id.
+ * @param pageSize Page size.
+ * @return {@code true} If the row is visible.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static boolean isVisible(GridCacheContext cctx, MvccSnapshot snapshot, DataPageIO dataIo,
+ long pageAddr, int itemId, int pageSize) throws IgniteCheckedException {
+ return invoke(cctx, dataIo, pageAddr, itemId, pageSize, isVisible, snapshot);
+ }
+
+ /**
* Throw an {@link UnsupportedOperationException} if this cache is transactional and MVCC is enabled with
* appropriate message about corresponding operation type.
* @param cctx Cache context.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 85be9d2..1889a99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -35,8 +35,8 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVers
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.stat.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.stat.IoStatisticsHolder;
+import org.apache.ignite.internal.stat.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -48,6 +48,7 @@ import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.KEY_ONLY;
import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.LINK_WITH_HEADER;
/**
@@ -124,29 +125,87 @@ public class CacheDataRowAdapter implements CacheDataRow {
@Nullable CacheGroupContext grp,
GridCacheSharedContext<?, ?> sharedCtx,
PageMemory pageMem,
- RowData rowData)
- throws IgniteCheckedException {
- assert link != 0 : "link";
- assert key == null : "key";
+ RowData rowData
+ ) throws IgniteCheckedException {
+ // Group is null if try evict page, with persistence evictions should be disabled.
+ assert grp != null || pageMem instanceof PageMemoryNoStoreImpl;
CacheObjectContext coctx = grp != null ? grp.cacheObjectContext() : null;
+ boolean readCacheId = grp == null || grp.storeCacheIdInDataPage();
+ int grpId = grp != null ? grp.groupId() : 0;
+ IoStatisticsHolder statHolder = grp != null ? grp.statisticsHolderData() : IoStatisticsHolderNoOp.INSTANCE;
+
+ doInitFromLink(link, sharedCtx, coctx, pageMem, grpId, statHolder, readCacheId, rowData, null);
+ }
+ /**
+ * @param io Data page IO.
+ * @param pageAddr Data page address.
+ * @param itemId Row item Id.
+ * @param grp Cache group.
+ * @param sharedCtx Cache shared context.
+ * @param pageMem Page memory.
+ * @param rowData Required row data.
+ * @throws IgniteCheckedException If failed.
+ */
+ public final void initFromDataPage(
+ DataPageIO io,
+ long pageAddr,
+ int itemId,
+ @Nullable CacheGroupContext grp,
+ GridCacheSharedContext<?, ?> sharedCtx,
+ PageMemory pageMem,
+ RowData rowData
+ ) throws IgniteCheckedException {
+ // Group is null if try evict page, with persistence evictions should be disabled.
+ assert grp != null || pageMem instanceof PageMemoryNoStoreImpl;
+
+ CacheObjectContext coctx = grp != null ? grp.cacheObjectContext() : null;
boolean readCacheId = grp == null || grp.storeCacheIdInDataPage();
+ int grpId = grp != null ? grp.groupId() : 0;
+ IoStatisticsHolder statHolder = grp != null ? grp.statisticsHolderData() : IoStatisticsHolderNoOp.INSTANCE;
- long nextLink = link;
- IncompleteObject<?> incomplete = null;
- boolean first = true;
+ IncompleteObject<?> incomplete = readIncomplete(null, sharedCtx, coctx, pageMem,
+ grpId, pageAddr, itemId, io, rowData, readCacheId);
- do {
- final long pageId = pageId(nextLink);
+ if (incomplete != null) {
+ // Initialize the remaining part of the large row from other pages.
+ long nextLink = incomplete.getNextLink();
- // Group is null if try evict page, with persistence evictions should be disabled.
- assert grp != null || pageMem instanceof PageMemoryNoStoreImpl;
+ if (nextLink != 0L)
+ doInitFromLink(nextLink, sharedCtx, coctx, pageMem, grpId, statHolder, readCacheId, rowData, incomplete);
+ }
+ }
- int grpId = grp != null ? grp.groupId() : 0;
+ /**
+ * @param link Link.
+ * @param sharedCtx Cache shared context.
+ * @param coctx Cache object context.
+ * @param pageMem Page memory.
+ * @param grpId Cache group Id.
+ * @param readCacheId {@code true} If need to read cache ID.
+ * @param rowData Required row data.
+ * @param incomplete Incomplete object.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void doInitFromLink(
+ long link,
+ GridCacheSharedContext<?, ?> sharedCtx,
+ CacheObjectContext coctx,
+ PageMemory pageMem,
+ int grpId,
+ IoStatisticsHolder statHolder,
+ boolean readCacheId,
+ RowData rowData,
+ IncompleteObject<?> incomplete
+ ) throws IgniteCheckedException {
+ assert link != 0 : "link";
+ assert key == null : "key";
- final IoStatisticsHolder statHolder = (grp != null) ?
- grp.statisticsHolderData() : IoStatisticsHolderNoOp.INSTANCE;
+ long nextLink = link;
+
+ do {
+ final long pageId = pageId(nextLink);
final long page = pageMem.acquirePage(grpId, pageId, statHolder);
@@ -158,45 +217,15 @@ public class CacheDataRowAdapter implements CacheDataRow {
try {
DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
- DataPagePayload data = io.readPayload(pageAddr,
- itemId(nextLink),
- pageMem.realPageSize(grpId));
-
- nextLink = data.nextLink();
-
- int hdrLen = 0;
-
- if (first) {
- if (nextLink == 0) {
- // Fast path for a single page row.
- readFullRow(sharedCtx, coctx, pageAddr + data.offset(), rowData, readCacheId);
-
- return;
- }
-
- first = false;
-
- // Assume that row header is always located entirely on the very first page.
- hdrLen = readHeader(pageAddr, data.offset());
-
- if (rowData == LINK_WITH_HEADER)
- return;
- }
-
- ByteBuffer buf = pageMem.pageBuffer(pageAddr);
-
- int off = data.offset() + hdrLen;
- int payloadSize = data.payloadSize() - hdrLen;
-
- buf.position(off);
- buf.limit(off + payloadSize);
-
- boolean keyOnly = rowData == RowData.KEY_ONLY;
+ int itemId = itemId(nextLink);
- incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, readCacheId, incomplete);
+ incomplete = readIncomplete(incomplete, sharedCtx, coctx, pageMem,
+ grpId, pageAddr, itemId, io, rowData, readCacheId);
- if (keyOnly && key != null)
+ if (incomplete == null || (rowData == KEY_ONLY && key != null))
return;
+
+ nextLink = incomplete.getNextLink();
}
finally {
pageMem.readUnlock(grpId, pageId, page);
@@ -212,6 +241,70 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
/**
+ * @param incomplete Incomplete object.
+ * @param sharedCtx Cache shared context.
+ * @param coctx Cache object context.
+ * @param pageMem Page memory.
+ * @param grpId Cache group Id.
+ * @param pageAddr Page address.
+ * @param io Page IO.
+ * @param rowData Required row data.
+ * @param readCacheId {@code true} If need to read cache ID.
+ * @return Incomplete object.
+ * @throws IgniteCheckedException If failed.
+ */
+ private IncompleteObject<?> readIncomplete(
+ IncompleteObject<?> incomplete,
+ GridCacheSharedContext<?, ?> sharedCtx,
+ CacheObjectContext coctx,
+ PageMemory pageMem,
+ int grpId,
+ long pageAddr,
+ int itemId,
+ DataPageIO io,
+ RowData rowData,
+ boolean readCacheId
+ ) throws IgniteCheckedException {
+ DataPagePayload data = io.readPayload(pageAddr, itemId, pageMem.realPageSize(grpId));
+
+ long nextLink = data.nextLink();
+
+ int hdrLen = 0;
+
+ if (incomplete == null) {
+ if (nextLink == 0) {
+ // Fast path for a single page row.
+ readFullRow(sharedCtx, coctx, pageAddr + data.offset(), rowData, readCacheId);
+
+ return null;
+ }
+
+ // Assume that row header is always located entirely on the very first page.
+ hdrLen = readHeader(pageAddr, data.offset());
+
+ if (rowData == LINK_WITH_HEADER)
+ return null;
+ }
+
+ ByteBuffer buf = pageMem.pageBuffer(pageAddr);
+
+ int off = data.offset() + hdrLen;
+ int payloadSize = data.payloadSize() - hdrLen;
+
+ buf.position(off);
+ buf.limit(off + payloadSize);
+
+ boolean keyOnly = rowData == RowData.KEY_ONLY;
+
+ incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, readCacheId, incomplete);
+
+ if (incomplete != null)
+ incomplete.setNextLink(nextLink);
+
+ return incomplete;
+ }
+
+ /**
* Reads row header (i.e. MVCC info) which should be located on the very first page od data.
*
* @param addr Address.
@@ -244,8 +337,10 @@ public class CacheDataRowAdapter implements CacheDataRow {
if (readCacheId && cacheId == 0) {
incomplete = readIncompleteCacheId(buf, incomplete);
- if (cacheId == 0)
+ if (cacheId == 0) {
+ assert incomplete != null;
return incomplete;
+ }
incomplete = null;
}
@@ -262,8 +357,13 @@ public class CacheDataRowAdapter implements CacheDataRow {
if (key == null) {
incomplete = readIncompleteKey(coctx, buf, (IncompleteCacheObject)incomplete);
- if (key == null || keyOnly)
- return incomplete;
+ if (key == null) {
+ assert incomplete != null;
+ return incomplete; // Need to finish reading the key.
+ }
+
+ if (keyOnly)
+ return null; // Key is ready - we are done!
incomplete = null;
}
@@ -271,8 +371,10 @@ public class CacheDataRowAdapter implements CacheDataRow {
if (expireTime == -1) {
incomplete = readIncompleteExpireTime(buf, incomplete);
- if (expireTime == -1)
+ if (expireTime == -1) {
+ assert incomplete != null;
return incomplete;
+ }
incomplete = null;
}
@@ -281,16 +383,21 @@ public class CacheDataRowAdapter implements CacheDataRow {
if (val == null) {
incomplete = readIncompleteValue(coctx, buf, (IncompleteCacheObject)incomplete);
- if (val == null)
+ if (val == null) {
+ assert incomplete != null;
return incomplete;
+ }
incomplete = null;
}
// Read version.
- if (ver == null)
+ if (ver == null) {
incomplete = readIncompleteVersion(buf, incomplete);
+ assert ver != null || incomplete != null;
+ }
+
return incomplete;
}
@@ -371,10 +478,6 @@ public class CacheDataRowAdapter implements CacheDataRow {
) {
if (incomplete == null) {
int remaining = buf.remaining();
-
- if (remaining == 0)
- return null;
-
int size = 4;
if (remaining >= size) {
@@ -386,6 +489,9 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
incomplete = new IncompleteObject<>(new byte[size]);
+
+ if (remaining == 0)
+ return incomplete;
}
incomplete.readData(buf);
@@ -464,10 +570,6 @@ public class CacheDataRowAdapter implements CacheDataRow {
) {
if (incomplete == null) {
int remaining = buf.remaining();
-
- if (remaining == 0)
- return null;
-
int size = 8;
if (remaining >= size) {
@@ -479,6 +581,9 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
incomplete = new IncompleteObject<>(new byte[size]);
+
+ if (remaining == 0)
+ return incomplete;
}
incomplete.readData(buf);
@@ -506,11 +611,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
ByteBuffer buf,
IncompleteObject<?> incomplete
) throws IgniteCheckedException {
- if (incomplete == null) {
+ if (incomplete == null || incomplete.data() == null) {
int remaining = buf.remaining();
if (remaining == 0)
- return null;
+ return new IncompleteObject<>(); // Just to pass the next link.
int size = CacheVersionIO.readSize(buf, false);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 51410ef..1486463 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1990,6 +1990,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * @param grpId Cache group id.
+ * @param partId Partition ID.
+ * @return Page store.
+ * @throws IgniteCheckedException If failed.
+ */
+ public PageStore getPageStore(int grpId, int partId) throws IgniteCheckedException {
+ return storeMgr.getStore(grpId, partId);
+ }
+
+ /**
* @param gctx Group context.
* @param f Consumer.
* @return Accumulated result for all page stores.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
index a3842a7..6932d0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
@@ -29,10 +29,19 @@ import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.jetbrains.annotations.Nullable;
/**
- *
+ * Page memory with some persistence related additions.
*/
public interface PageMemoryEx extends PageMemory {
/**
+ * @param absPtr Absolute pointer to read lock.
+ * @param pageId Page ID.
+ * @param force Force flag.
+ * @param touch Update page timestamp.
+ * @return Pointer to the page read buffer.
+ */
+ long readLock(long absPtr, long pageId, boolean force, boolean touch);
+
+ /**
*
* @param grpId Group ID.
* @param pageId Page ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index 06bec8c..88f7ec2 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -435,7 +435,7 @@ public class PageMemoryImpl implements PageMemoryEx {
/** {@inheritDoc} */
@Override public long readLock(int grpId, long pageId, long page) {
- return readLockPage(page, new FullPageId(pageId, grpId), false);
+ return readLock(page, pageId, false);
}
/** {@inheritDoc} */
@@ -1392,23 +1392,17 @@ public class PageMemoryImpl implements PageMemoryEx {
/**
* @param absPtr Absolute pointer to read lock.
- * @param fullId Full page ID.
+ * @param pageId Page ID.
* @param force Force flag.
* @return Pointer to the page read buffer.
*/
- private long readLockPage(long absPtr, FullPageId fullId, boolean force) {
- return readLockPage(absPtr, fullId, force, true);
+ private long readLock(long absPtr, long pageId, boolean force) {
+ return readLock(absPtr, pageId, force, true);
}
- /**
- * @param absPtr Absolute pointer to read lock.
- * @param fullId Full page ID.
- * @param force Force flag.
- * @param touch Update page timestamp.
- * @return Pointer to the page read buffer.
- */
- private long readLockPage(long absPtr, FullPageId fullId, boolean force, boolean touch) {
- int tag = force ? -1 : PageIdUtils.tag(fullId.pageId());
+ /** {@inheritDoc} */
+ @Override public long readLock(long absPtr, long pageId, boolean force, boolean touch) {
+ int tag = force ? -1 : PageIdUtils.tag(pageId);
boolean locked = rwLock.readLock(absPtr + PAGE_LOCK_OFFSET, tag);
@@ -1425,7 +1419,7 @@ public class PageMemoryImpl implements PageMemoryEx {
/** {@inheritDoc} */
@Override public long readLockForce(int grpId, long pageId, long page) {
- return readLockPage(page, new FullPageId(pageId, grpId), true);
+ return readLock(page, pageId, true);
}
/**
@@ -2137,7 +2131,7 @@ public class PageMemoryImpl implements PageMemoryEx {
if (ctx.kernalContext().query() == null || !ctx.kernalContext().query().moduleEnabled())
return;
- long pageAddr = readLockPage(absPtr, fullPageId, true, false);
+ long pageAddr = PageMemoryImpl.this.readLock(absPtr, fullPageId.pageId(), true, false);
try {
if (PageIO.getType(pageAddr) != PageIO.T_DATA)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index e9b7248..54d9816 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -978,13 +978,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
/**
* Check if the tree is getting destroyed.
*/
- private void checkDestroyed() {
+ protected final void checkDestroyed() {
if (destroyed.get())
throw new IllegalStateException("Tree is being concurrently destroyed: " + getName());
}
/** {@inheritDoc} */
- @Override public GridCursor<T> find(L lower, L upper) throws IgniteCheckedException {
+ @Override public final GridCursor<T> find(L lower, L upper) throws IgniteCheckedException {
return find(lower, upper, null);
}
@@ -1001,7 +1001,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
* @return Cursor.
* @throws IgniteCheckedException If failed.
*/
- public final GridCursor<T> find(L lower, L upper, TreeRowClosure<L, T> c, Object x) throws IgniteCheckedException {
+ public GridCursor<T> find(L lower, L upper, TreeRowClosure<L, T> c, Object x) throws IgniteCheckedException {
checkDestroyed();
try {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java
index 6176eeb..78752bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java
@@ -254,6 +254,14 @@ public abstract class AbstractDataPageIO<T extends Storable> extends PageIO impl
/**
* @param pageAddr Page address.
+ * @return Rows number in the given data page.
+ */
+ public int getRowsCount(long pageAddr) {
+ return getDirectCount(pageAddr);
+ }
+
+ /**
+ * @param pageAddr Page address.
* @param c Closure.
* @param <T> Closure return type.
* @return Collection of closure results for all items in page.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 0f9ee28..1e10d60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -281,7 +281,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
req.keepBinary(),
req.subjectId(),
req.taskHash(),
- req.mvccSnapshot()
+ req.mvccSnapshot(),
+ req.isDataPageScanEnabled()
);
return new GridCacheQueryInfo(
@@ -534,7 +535,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
qry.query().validate();
String clsName = qry.query().queryClassName();
-
+ Boolean dataPageScanEnabled = qry.query().isDataPageScanEnabled();
MvccSnapshot mvccSnapshot = qry.query().mvccSnapshot();
final GridCacheQueryRequest req = new GridCacheQueryRequest(
@@ -559,7 +560,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
queryTopologyVersion(),
mvccSnapshot,
// Force deployment anyway if scan query is used.
- cctx.deploymentEnabled() || (qry.query().scanFilter() != null && cctx.gridDeploy().enabled()));
+ cctx.deploymentEnabled() || (qry.query().scanFilter() != null && cctx.gridDeploy().enabled()),
+ dataPageScanEnabled);
addQueryFuture(req.id(), fut);
@@ -690,7 +692,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
qry.taskHash(),
queryTopologyVersion(),
// Force deployment anyway if scan query is used.
- cctx.deploymentEnabled() || (qry.scanFilter() != null && cctx.gridDeploy().enabled()));
+ cctx.deploymentEnabled() || (qry.scanFilter() != null && cctx.gridDeploy().enabled()),
+ qry.isDataPageScanEnabled());
sendRequest(fut, req, nodes);
}
@@ -757,7 +760,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
qry.query().taskHash(),
queryTopologyVersion(),
null,
- cctx.deploymentEnabled());
+ cctx.deploymentEnabled(),
+ qry.query().isDataPageScanEnabled());
addQueryFuture(req.id(), fut);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 0dbdef1..9fc02d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -141,6 +141,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** */
private MvccSnapshot mvccSnapshot;
+ /** */
+ private Boolean dataPageScanEnabled;
+
/**
* @param cctx Context.
* @param type Query type.
@@ -148,14 +151,18 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param part Partition.
* @param keepBinary Keep binary flag.
* @param forceLocal Flag to force local query.
+ * @param dataPageScanEnabled Flag to enable data page scan.
*/
- public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx,
+ public GridCacheQueryAdapter(
+ GridCacheContext<?, ?> cctx,
GridCacheQueryType type,
@Nullable IgniteBiPredicate<Object, Object> filter,
@Nullable IgniteClosure<Map.Entry, Object> transform,
@Nullable Integer part,
boolean keepBinary,
- boolean forceLocal) {
+ boolean forceLocal,
+ Boolean dataPageScanEnabled
+ ) {
assert cctx != null;
assert type != null;
assert part == null || part >= 0;
@@ -167,6 +174,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
this.part = part;
this.keepBinary = keepBinary;
this.forceLocal = forceLocal;
+ this.dataPageScanEnabled = dataPageScanEnabled;
log = cctx.logger(getClass());
@@ -186,15 +194,19 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param part Partition.
* @param incMeta Include metadata flag.
* @param keepBinary Keep binary flag.
+ * @param dataPageScanEnabled Flag to enable data page scan.
*/
- public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx,
+ public GridCacheQueryAdapter(
+ GridCacheContext<?, ?> cctx,
GridCacheQueryType type,
@Nullable String clsName,
@Nullable String clause,
@Nullable IgniteBiPredicate<Object, Object> filter,
@Nullable Integer part,
boolean incMeta,
- boolean keepBinary) {
+ boolean keepBinary,
+ Boolean dataPageScanEnabled
+ ) {
assert cctx != null;
assert type != null;
assert part == null || part >= 0;
@@ -207,6 +219,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
this.part = part;
this.incMeta = incMeta;
this.keepBinary = keepBinary;
+ this.dataPageScanEnabled = dataPageScanEnabled;
log = cctx.logger(getClass());
@@ -231,8 +244,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
* @param subjId Security subject ID.
* @param taskHash Task hash.
* @param mvccSnapshot Mvcc version.
+ * @param dataPageScanEnabled Flag to enable data page scan.
*/
- public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx,
+ public GridCacheQueryAdapter(
+ GridCacheContext<?, ?> cctx,
GridCacheQueryType type,
IgniteLogger log,
int pageSize,
@@ -248,7 +263,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
boolean keepBinary,
UUID subjId,
int taskHash,
- MvccSnapshot mvccSnapshot) {
+ MvccSnapshot mvccSnapshot,
+ Boolean dataPageScanEnabled
+ ) {
this.cctx = cctx;
this.type = type;
this.log = log;
@@ -266,6 +283,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
this.subjId = subjId;
this.taskHash = taskHash;
this.mvccSnapshot = mvccSnapshot;
+ this.dataPageScanEnabled = dataPageScanEnabled;
+ }
+
+ /**
+ * @return Flag to enable data page scan.
+ */
+ public Boolean isDataPageScanEnabled() {
+ return dataPageScanEnabled;
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 7f4699f..ae5f7df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -748,8 +748,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param qry Query.
* @return Cache set items iterator.
*/
- private GridCloseableIterator<IgniteBiTuple<K, V>> sharedCacheSetIterator(
- GridCacheQueryAdapter<?> qry) throws IgniteCheckedException {
+ private GridCloseableIterator<IgniteBiTuple<K, V>> sharedCacheSetIterator(GridCacheQueryAdapter<?> qry)
+ throws IgniteCheckedException {
final GridSetQueryPredicate filter = (GridSetQueryPredicate)qry.scanFilter();
IgniteUuid id = filter.setId();
@@ -768,7 +768,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
},
qry.partition(),
false,
- true);
+ true,
+ qry.isDataPageScanEnabled());
return scanQueryLocal(qry0, false);
}
@@ -826,12 +827,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
locPart = locPart0;
- it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part, qry.mvccSnapshot());
+ it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part, qry.mvccSnapshot(),
+ qry.isDataPageScanEnabled());
}
else {
locPart = null;
- it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer, qry.mvccSnapshot());
+ it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer,
+ qry.mvccSnapshot(), qry.isDataPageScanEnabled());
}
return new ScanQueryIterator(it, qry, topVer, locPart, keyValFilter, transformer, locNode, cctx, log);
@@ -2688,7 +2691,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null,
null,
false,
- keepBinary);
+ keepBinary,
+ null);
}
/**
@@ -2697,11 +2701,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param filter Scan filter.
* @param part Partition.
* @param keepBinary Keep binary flag.
+ * @param dataPageScanEnabled Flag to enable data page scan.
* @return Created query.
*/
public <R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
- @Nullable Integer part, boolean keepBinary) {
- return createScanQuery(filter, null, part, keepBinary, false);
+ @Nullable Integer part, boolean keepBinary, Boolean dataPageScanEnabled) {
+ return createScanQuery(filter, null, part, keepBinary, false, dataPageScanEnabled);
}
/**
@@ -2712,19 +2717,26 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @param part Partition.
* @param keepBinary Keep binary flag.
* @param forceLocal Flag to force local scan.
+ * @param dataPageScanEnabled Flag to enable data page scan.
* @return Created query.
*/
- public <T, R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter,
+ @SuppressWarnings("unchecked")
+ public <T, R> CacheQuery<R> createScanQuery(
+ @Nullable IgniteBiPredicate<K, V> filter,
@Nullable IgniteClosure<T, R> trans,
- @Nullable Integer part, boolean keepBinary, boolean forceLocal) {
-
+ @Nullable Integer part,
+ boolean keepBinary,
+ boolean forceLocal,
+ Boolean dataPageScanEnabled
+ ) {
return new GridCacheQueryAdapter(cctx,
SCAN,
filter,
trans,
part,
keepBinary,
- forceLocal);
+ forceLocal,
+ dataPageScanEnabled);
}
/**
@@ -2748,28 +2760,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
null,
null,
false,
- keepBinary);
- }
-
- /**
- * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery}
- * documentation.
- *
- * @param qry Query.
- * @param keepBinary Keep binary flag.
- * @return Created query.
- */
- public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean keepBinary) {
- A.notNull(qry, "qry");
-
- return new GridCacheQueryAdapter<>(cctx,
- SQL_FIELDS,
- null,
- qry,
- null,
- null,
- false,
- keepBinary);
+ keepBinary,
+ null);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 21c6363..7b513d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -53,6 +53,18 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
private static final long serialVersionUID = 0L;
/** */
+ private static final int FLAG_DATA_PAGE_SCAN_DFLT = 0b00;
+
+ /** */
+ private static final int FLAG_DATA_PAGE_SCAN_ENABLED = 0b01;
+
+ /** */
+ private static final int FLAG_DATA_PAGE_SCAN_DISABLED = 0b10;
+
+ /** */
+ private static final int FLAG_DATA_PAGE_SCAN_MASK = 0b11;
+
+ /** */
private long id;
/** */
@@ -132,6 +144,9 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
/** */
private MvccSnapshot mvccSnapshot;
+ /** */
+ private byte flags;
+
/**
* Required by {@link Externalizable}
*/
@@ -148,11 +163,13 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
* @param topVer Topology version.
* @param addDepInfo Deployment info flag.
*/
- public GridCacheQueryRequest(int cacheId,
+ public GridCacheQueryRequest(
+ int cacheId,
long id,
boolean fields,
AffinityTopologyVersion topVer,
- boolean addDepInfo) {
+ boolean addDepInfo
+ ) {
this.cacheId = cacheId;
this.id = id;
this.fields = fields;
@@ -177,6 +194,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
* @param taskHash Task name hash code.
* @param topVer Topology version.
* @param addDepInfo Deployment info flag.
+ * @param dataPageScanEnabled Flag to enable data page scan.
*/
public GridCacheQueryRequest(
int cacheId,
@@ -190,7 +208,8 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
UUID subjId,
int taskHash,
AffinityTopologyVersion topVer,
- boolean addDepInfo
+ boolean addDepInfo,
+ Boolean dataPageScanEnabled
) {
this.cacheId = cacheId;
this.id = id;
@@ -204,6 +223,8 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
this.taskHash = taskHash;
this.topVer = topVer;
this.addDepInfo = addDepInfo;
+
+ flags = setDataPageScanEnabled(flags, dataPageScanEnabled);
}
/**
@@ -250,7 +271,8 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
int taskHash,
AffinityTopologyVersion topVer,
MvccSnapshot mvccSnapshot,
- boolean addDepInfo
+ boolean addDepInfo,
+ Boolean dataPageScanEnabled
) {
assert type != null || fields;
assert clause != null || (type == SCAN || type == SET || type == SPI);
@@ -277,6 +299,23 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
this.topVer = topVer;
this.mvccSnapshot = mvccSnapshot;
this.addDepInfo = addDepInfo;
+
+ flags = setDataPageScanEnabled(flags, dataPageScanEnabled);
+ }
+
+ /**
+ * @param flags Flags.
+ * @param enabled If data page scan enabled.
+ * @return Updated flags.
+ */
+ private static byte setDataPageScanEnabled(int flags, Boolean enabled) {
+ int x = enabled == null ? FLAG_DATA_PAGE_SCAN_DFLT :
+ enabled ? FLAG_DATA_PAGE_SCAN_ENABLED : FLAG_DATA_PAGE_SCAN_DISABLED;
+
+ flags &= ~FLAG_DATA_PAGE_SCAN_MASK; // Clear old bits.
+ flags |= x; // Set new bits.
+
+ return (byte)flags;
}
/**
@@ -492,6 +531,21 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
}
/**
+ * @return Flag to enable data page scan.
+ */
+ public Boolean isDataPageScanEnabled() {
+ switch (flags & FLAG_DATA_PAGE_SCAN_MASK) {
+ case FLAG_DATA_PAGE_SCAN_ENABLED:
+ return true;
+
+ case FLAG_DATA_PAGE_SCAN_DISABLED:
+ return false;
+ }
+
+ return null;
+ }
+
+ /**
* @return partition.
*/
@Override public int partition() {
@@ -556,84 +610,90 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
writer.incrementState();
case 11:
- if (!writer.writeLong("id", id))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 12:
- if (!writer.writeBoolean("incBackups", incBackups))
+ if (!writer.writeLong("id", id))
return false;
writer.incrementState();
case 13:
- if (!writer.writeBoolean("incMeta", incMeta))
+ if (!writer.writeBoolean("incBackups", incBackups))
return false;
writer.incrementState();
case 14:
- if (!writer.writeBoolean("keepBinary", keepBinary))
+ if (!writer.writeBoolean("incMeta", incMeta))
return false;
writer.incrementState();
case 15:
- if (!writer.writeByteArray("keyValFilterBytes", keyValFilterBytes))
+ if (!writer.writeBoolean("keepBinary", keepBinary))
return false;
writer.incrementState();
case 16:
- if (!writer.writeMessage("mvccSnapshot", mvccSnapshot))
+ if (!writer.writeByteArray("keyValFilterBytes", keyValFilterBytes))
return false;
writer.incrementState();
case 17:
- if (!writer.writeInt("pageSize", pageSize))
+ if (!writer.writeMessage("mvccSnapshot", mvccSnapshot))
return false;
writer.incrementState();
case 18:
- if (!writer.writeInt("part", part))
+ if (!writer.writeInt("pageSize", pageSize))
return false;
writer.incrementState();
case 19:
- if (!writer.writeByteArray("rdcBytes", rdcBytes))
+ if (!writer.writeInt("part", part))
return false;
writer.incrementState();
case 20:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeByteArray("rdcBytes", rdcBytes))
return false;
writer.incrementState();
case 21:
- if (!writer.writeInt("taskHash", taskHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 22:
- if (!writer.writeAffinityTopologyVersion("topVer", topVer))
+ if (!writer.writeInt("taskHash", taskHash))
return false;
writer.incrementState();
case 23:
- if (!writer.writeByteArray("transBytes", transBytes))
+ if (!writer.writeAffinityTopologyVersion("topVer", topVer))
return false;
writer.incrementState();
case 24:
+ if (!writer.writeByteArray("transBytes", transBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 25:
if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1))
return false;
@@ -712,7 +772,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 11:
- id = reader.readLong("id");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -720,7 +780,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 12:
- incBackups = reader.readBoolean("incBackups");
+ id = reader.readLong("id");
if (!reader.isLastRead())
return false;
@@ -728,7 +788,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 13:
- incMeta = reader.readBoolean("incMeta");
+ incBackups = reader.readBoolean("incBackups");
if (!reader.isLastRead())
return false;
@@ -736,7 +796,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 14:
- keepBinary = reader.readBoolean("keepBinary");
+ incMeta = reader.readBoolean("incMeta");
if (!reader.isLastRead())
return false;
@@ -744,7 +804,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 15:
- keyValFilterBytes = reader.readByteArray("keyValFilterBytes");
+ keepBinary = reader.readBoolean("keepBinary");
if (!reader.isLastRead())
return false;
@@ -752,7 +812,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 16:
- mvccSnapshot = reader.readMessage("mvccSnapshot");
+ keyValFilterBytes = reader.readByteArray("keyValFilterBytes");
if (!reader.isLastRead())
return false;
@@ -760,7 +820,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 17:
- pageSize = reader.readInt("pageSize");
+ mvccSnapshot = reader.readMessage("mvccSnapshot");
if (!reader.isLastRead())
return false;
@@ -768,7 +828,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 18:
- part = reader.readInt("part");
+ pageSize = reader.readInt("pageSize");
if (!reader.isLastRead())
return false;
@@ -776,7 +836,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 19:
- rdcBytes = reader.readByteArray("rdcBytes");
+ part = reader.readInt("part");
if (!reader.isLastRead())
return false;
@@ -784,7 +844,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 20:
- subjId = reader.readUuid("subjId");
+ rdcBytes = reader.readByteArray("rdcBytes");
if (!reader.isLastRead())
return false;
@@ -792,7 +852,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 21:
- taskHash = reader.readInt("taskHash");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -800,7 +860,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 22:
- topVer = reader.readAffinityTopologyVersion("topVer");
+ taskHash = reader.readInt("taskHash");
if (!reader.isLastRead())
return false;
@@ -808,7 +868,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 23:
- transBytes = reader.readByteArray("transBytes");
+ topVer = reader.readAffinityTopologyVersion("topVer");
if (!reader.isLastRead())
return false;
@@ -816,6 +876,14 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
reader.incrementState();
case 24:
+ transBytes = reader.readByteArray("transBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 25:
byte typeOrd;
typeOrd = reader.readByte("type");
@@ -839,7 +907,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 25;
+ return 26;
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index d688a52..5b93a7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -799,10 +799,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
if (notifyExisting) {
assert locLsnr != null : "Local listener can't be null if notification for existing entries are enabled";
- final Iterator<CacheDataRow> it = cctx.offheap().cacheIterator(cctx.cacheId(),
+ final Iterator<CacheDataRow> it = cctx.offheap().cacheIterator(
+ cctx.cacheId(),
true,
true,
AffinityTopologyVersion.NONE,
+ null,
null);
locLsnr.onUpdated(new Iterable<CacheEntryEvent>() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
index 7514798..7aa7068 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
@@ -49,6 +49,13 @@ public class CacheDataRowStore extends RowStore {
}
/**
+ * @return Partition Id.
+ */
+ public int getPartitionId() {
+ return partId;
+ }
+
+ /**
* @param cacheId Cache ID.
* @param hash Hash code.
* @param link Link.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index eba0e7f..4d01494 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -19,36 +19,57 @@ package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataInnerIO;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataLeafIO;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataInnerIO;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataLeafIO;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccDataPageClosure;
import org.apache.ignite.internal.stat.IoStatisticsHolder;
import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.TRUE;
import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO.MVCC_INFO_SIZE;
+import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.T_DATA;
+import static org.apache.ignite.internal.util.GridArrays.clearTail;
/**
*
*/
public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
/** */
+ private static CacheDataRow[] EMPTY_ROWS = {};
+
+ /** */
+ private static Boolean lastFindWithDataPageScan;
+
+ /** */
+ private static final ThreadLocal<Boolean> dataPageScanEnabled =
+ ThreadLocal.withInitial(() -> false);
+
+ /** */
private final CacheDataRowStore rowStore;
/** */
@@ -93,6 +114,178 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
}
/**
+ * Enable or disable data page scan.
+ * @param enabled {code true} If enabled.
+ */
+ public static void setDataPageScanEnabled(boolean enabled) {
+ dataPageScanEnabled.set(enabled);
+ }
+
+ /**
+ * @return {@code true} If data page scan is enabled.
+ */
+ public static boolean isDataPageScanEnabled() {
+ return dataPageScanEnabled.get();
+ }
+
+ /**
+ * @return {@code true} If the last observed call to the method {@code find(...)} used data page scan.
+ */
+ public static Boolean isLastFindWithDataPageScan() {
+ Boolean res = lastFindWithDataPageScan;
+ lastFindWithDataPageScan = null;
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCursor<CacheDataRow> find(
+ CacheSearchRow lower,
+ CacheSearchRow upper,
+ TreeRowClosure<CacheSearchRow,CacheDataRow> c,
+ Object x
+ ) throws IgniteCheckedException {
+ // If there is a group of caches, lower and upper bounds will not be null here.
+ if (lower == null && upper == null && grp.persistenceEnabled() && dataPageScanEnabled.get() &&
+ (c == null || c instanceof MvccDataPageClosure))
+ return scanDataPages(asRowData(x), (MvccDataPageClosure)c);
+
+ lastFindWithDataPageScan = FALSE;
+ return super.find(lower, upper, c, x);
+ }
+
+ /**
+ * @param rowData Required row data.
+ * @param c Optional MVCC closure.
+ * @return Cache row cursor.
+ * @throws IgniteCheckedException If failed.
+ */
+ private GridCursor<CacheDataRow> scanDataPages(CacheDataRowAdapter.RowData rowData, MvccDataPageClosure c)
+ throws IgniteCheckedException {
+ lastFindWithDataPageScan = TRUE;
+
+ checkDestroyed();
+
+ assert rowData != null;
+ assert grp.persistenceEnabled();
+
+ int partId = rowStore.getPartitionId();
+ GridCacheSharedContext shared = grp.shared();
+ GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)shared.database();
+ PageStore pageStore = db.getPageStore(grpId, partId);
+ boolean mvccEnabled = grp.mvccEnabled();
+ int pageSize = pageSize();
+
+ long startPageId = ((PageMemoryEx)pageMem).partitionMetaPageId(grp.groupId(), partId);
+
+ final class DataPageScanCursor implements GridCursor<CacheDataRow> {
+ /** */
+ int pagesCnt = pageStore.pages();
+
+ /** */
+ int curPage = -1;
+
+ /** */
+ CacheDataRow[] rows = EMPTY_ROWS;
+
+ /** */
+ int curRow = -1;
+
+ /** {@inheritDoc} */
+ @Override public boolean next() throws IgniteCheckedException {
+ if (rows == null)
+ return false;
+
+ if (++curRow < rows.length && rows[curRow] != null)
+ return true;
+
+ return readNextDataPage();
+ }
+
+ /**
+ * @return {@code true} If new rows were fetched.
+ * @throws IgniteCheckedException If failed.
+ */
+ private boolean readNextDataPage() throws IgniteCheckedException {
+ for (;;) {
+ if (++curPage >= pagesCnt) {
+ // Reread number of pages when we reach it (it may grow).
+ int newPagesCnt = pageStore.pages();
+
+ if (newPagesCnt <= pagesCnt) {
+ rows = null;
+ return false;
+ }
+
+ pagesCnt = newPagesCnt;
+ }
+
+ long pageId = startPageId + curPage;
+ long page = pageMem.acquirePage(grpId, pageId);
+
+ try {
+ long pageAddr = ((PageMemoryEx)pageMem).readLock(page, pageId, true, false);
+
+ try {
+ if (PageIO.getType(pageAddr) != T_DATA)
+ continue; // Not a data page.
+
+ DataPageIO io = PageIO.getPageIO(T_DATA, PageIO.getVersion(pageAddr));
+
+ int rowsCnt = io.getRowsCount(pageAddr);
+
+ if (rowsCnt == 0)
+ continue; // Empty page.
+
+ if (rowsCnt > rows.length)
+ rows = new CacheDataRow[rowsCnt];
+ else
+ clearTail(rows, rowsCnt);
+
+ int r = 0;
+
+ for (int i = 0; i < rowsCnt; i++) {
+ if (c == null || c.applyMvcc(io, pageAddr, i, pageSize)) {
+ DataRow row = mvccEnabled ? new MvccDataRow() : new DataRow();
+ row.initFromDataPage(io, pageAddr, i, grp, shared, pageMem, rowData);
+ rows[r++] = row;
+ }
+ }
+
+ if (r == 0)
+ continue; // No rows fetched in this page.
+
+ curRow = 0;
+ return true;
+ }
+ finally {
+ pageMem.readUnlock(grpId, pageId, page);
+ }
+ }
+ finally{
+ pageMem.releasePage(grpId, pageId, page);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheDataRow get() {
+ return rows[curRow];
+ }
+ }
+
+ return new DataPageScanCursor();
+ }
+
+ /**
+ * @param flags Flags.
+ * @return Row data.
+ */
+ private static CacheDataRowAdapter.RowData asRowData(Object flags) {
+ return flags != null ? (CacheDataRowAdapter.RowData)flags :
+ CacheDataRowAdapter.RowData.FULL;
+ }
+
+ /**
* @param grp Cache group.
* @return Tree inner IO.
*/
@@ -177,8 +370,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
}
/** {@inheritDoc} */
- @Override public CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, Object flags)
- throws IgniteCheckedException {
+ @Override public CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, Object flags) {
RowLinkIO rowIo = (RowLinkIO)io;
long link = rowIo.getLink(pageAddr, idx);
@@ -186,9 +378,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
int cacheId = grp.sharedGroup() ? rowIo.getCacheId(pageAddr, idx) : CU.UNDEFINED_CACHE_ID;
- CacheDataRowAdapter.RowData x = flags != null ?
- (CacheDataRowAdapter.RowData)flags :
- CacheDataRowAdapter.RowData.FULL;
+ CacheDataRowAdapter.RowData x = asRowData(flags);
if (grp.mvccEnabled()) {
long mvccCrdVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
index 806f030..ca6061b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
@@ -92,7 +92,7 @@ public class DataRow extends CacheDataRowAdapter {
/**
*
*/
- DataRow() {
+ public DataRow() {
super(0);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccDataRow.java
index bdd3166..cb4bc87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccDataRow.java
@@ -100,7 +100,8 @@ public class MvccDataRow extends DataRow {
RowData rowData,
long crdVer,
long mvccCntr,
- int mvccOpCntr) {
+ int mvccOpCntr
+ ) {
super(grp, hash, link, part, rowData);
assert MvccUtils.mvccVersionIsValid(crdVer, mvccCntr, mvccOpCntr);
@@ -118,6 +119,13 @@ public class MvccDataRow extends DataRow {
}
/**
+ *
+ */
+ public MvccDataRow() {
+ super(0);
+ }
+
+ /**
* @param key Key.
* @param val Value.
* @param ver Version.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/search/MvccDataPageClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/search/MvccDataPageClosure.java
new file mode 100644
index 0000000..7b5816f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/search/MvccDataPageClosure.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree.mvcc.search;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+
+/**
+ * Data page MVCC filter.
+ */
+public interface MvccDataPageClosure {
+ /**
+ * @param io Data page IO.
+ * @param dataPageAddr Data page address.
+ * @param itemId Item Id.
+ * @param pageSize Page size.
+ * @return {@code true} If the row is visible.
+ * @throws IgniteCheckedException If failed.
+ */
+ boolean applyMvcc(DataPageIO io, long dataPageAddr, int itemId, int pageSize) throws IgniteCheckedException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index 97d34f0..19bb26d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -158,7 +158,8 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
}
CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
- new GridSetQueryPredicate<>(id, collocated), collocated ? hdrPart : null, false, false);
+ new GridSetQueryPredicate<>(id, collocated), collocated ? hdrPart : null,
+ false, false, null);
Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
@@ -427,7 +428,8 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
@SuppressWarnings("unchecked")
private WeakReferenceCloseableIterator<T> sharedCacheIterator() throws IgniteCheckedException {
CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null,
- new GridSetQueryPredicate<>(id, collocated), collocated ? hdrPart : null, false, false);
+ new GridSetQueryPredicate<>(id, collocated), collocated ? hdrPart : null,
+ false, false, null);
Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
index 13ae9bf..6a40359 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
import org.jetbrains.annotations.Nullable;
/**
@@ -148,6 +149,12 @@ public interface GridQueryTypeDescriptor {
public int typeId();
/**
+ * @param val Value cache object.
+ * @return {@code true} If the type of the given value cache object matches this descriptor.
+ */
+ public boolean matchType(CacheObject val);
+
+ /**
* Gets key field name.
* @return Key field name.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java
index 865c5df..d39ec37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java
@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
@@ -241,6 +242,18 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
return typeId;
}
+ /** {@inheritDoc} */
+ @Override public boolean matchType(CacheObject val) {
+ if (val instanceof BinaryObject)
+ return ((BinaryObject)val).type().typeId() == typeId;
+
+ // Value type name can be manually set in QueryEntity to any random value,
+ // also for some reason our conversion from setIndexedTypes sets a full class name
+ // instead of a simple name there, thus we can have a typeId mismatch.
+ // Also, if the type is not in binary format, we always must have it's class available.
+ return val.value(coCtx, false).getClass() == valCls;
+ }
+
/**
* @param typeId Type ID.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
index 10782c6..5b008e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
@@ -42,6 +42,9 @@ public class GridQueryNextPageRequest implements Message {
/** */
private int pageSize;
+ /** */
+ private byte flags;
+
/**
* Default constructor.
*/
@@ -54,12 +57,21 @@ public class GridQueryNextPageRequest implements Message {
* @param qry Query.
* @param segmentId Index segment ID.
* @param pageSize Page size.
+ * @param flags Flags.
*/
- public GridQueryNextPageRequest(long qryReqId, int qry, int segmentId, int pageSize) {
+ public GridQueryNextPageRequest(long qryReqId, int qry, int segmentId, int pageSize, byte flags) {
this.qryReqId = qryReqId;
this.qry = qry;
this.segmentId = segmentId;
this.pageSize = pageSize;
+ this.flags = flags;
+ }
+
+ /**
+ * @return Flags.
+ */
+ public byte getFlags() {
+ return flags;
}
/**
@@ -88,6 +100,8 @@ public class GridQueryNextPageRequest implements Message {
return pageSize;
}
+
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridQueryNextPageRequest.class, this);
@@ -111,24 +125,30 @@ public class GridQueryNextPageRequest implements Message {
switch (writer.state()) {
case 0:
- if (!writer.writeInt("pageSize", pageSize))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 1:
- if (!writer.writeInt("qry", qry))
+ if (!writer.writeInt("pageSize", pageSize))
return false;
writer.incrementState();
case 2:
- if (!writer.writeLong("qryReqId", qryReqId))
+ if (!writer.writeInt("qry", qry))
return false;
writer.incrementState();
case 3:
+ if (!writer.writeLong("qryReqId", qryReqId))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
if (!writer.writeInt("segmentId", segmentId))
return false;
@@ -148,7 +168,7 @@ public class GridQueryNextPageRequest implements Message {
switch (reader.state()) {
case 0:
- pageSize = reader.readInt("pageSize");
+ flags = reader.readByte("flags");
if (!reader.isLastRead())
return false;
@@ -156,7 +176,7 @@ public class GridQueryNextPageRequest implements Message {
reader.incrementState();
case 1:
- qry = reader.readInt("qry");
+ pageSize = reader.readInt("pageSize");
if (!reader.isLastRead())
return false;
@@ -164,7 +184,7 @@ public class GridQueryNextPageRequest implements Message {
reader.incrementState();
case 2:
- qryReqId = reader.readLong("qryReqId");
+ qry = reader.readInt("qry");
if (!reader.isLastRead())
return false;
@@ -172,6 +192,14 @@ public class GridQueryNextPageRequest implements Message {
reader.incrementState();
case 3:
+ qryReqId = reader.readLong("qryReqId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
segmentId = reader.readInt("segmentId");
if (!reader.isLastRead())
@@ -191,6 +219,6 @@ public class GridQueryNextPageRequest implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 4;
+ return 5;
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index f5057cf..c3f5d92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1461,7 +1461,7 @@ public class GridServiceProcessor extends ServiceProcessorAdapter implements Ign
GridCacheQueryManager qryMgr = cache.context().queries();
- CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, null, false);
+ CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, null, false, null);
DiscoveryDataClusterState clusterState = ctx.state().clusterState();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheDataPageScanQueryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheDataPageScanQueryTest.java
new file mode 100644
index 0000000..db41225
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/CacheDataPageScanQueryTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
+import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE;
+
+/**
+ */
+@RunWith(JUnit4.class)
+public class CacheDataPageScanQueryTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE = "test";
+
+ /** */
+ private static final int PARTS = 1;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setMaxSize(DFLT_DATA_REGION_INITIAL_SIZE)
+ ));
+
+ cfg.setCacheConfiguration(
+ new CacheConfiguration(CACHE)
+ .setAtomicityMode(ATOMIC)
+ .setAffinity(
+ new RendezvousAffinityFunction()
+ .setPartitions(PARTS)
+ )
+ );
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ @Test
+ public void testDataPageScanWithRestart() throws Exception {
+ IgniteEx ignite = startGrid(0);
+ ignite.cluster().active(true);
+
+ IgniteInternalCache<Long, String> cache = ignite.cachex(CACHE);
+ CacheGroupMetricsMXBean gmx = cache.context().group().mxBean();
+ DataRegionMetricsImpl rmx = cache.context().dataRegion().memoryMetrics();
+
+ long maxKey = 100_000;
+
+ Map<Long,String> map = new ConcurrentHashMap<>();
+
+ int threads = 16;
+ AtomicInteger threadShift = new AtomicInteger();
+
+ multithreaded((Callable<Void>)() -> {
+ int shift = threadShift.getAndIncrement();
+
+ for (int i = shift; i < maxKey; i += threads) {
+ Long k = (long)i;
+ String v = String.valueOf(i);
+
+ cache.put(k, v);
+ map.put(k, v);
+ }
+ return null;
+ }, threads);
+
+// forceCheckpoint(ignite);
+
+ assertEquals(map.size(), cache.size());
+
+ info("Page mem : " + rmx.getPhysicalMemorySize());
+ info("Alloc size: " + gmx.getTotalAllocatedSize());
+ info("Store size: " + gmx.getStorageSize());
+
+ HashMap<Long,String> map2 = new HashMap<>(map);
+
+ IgniteCache<Long,String> c = ignite.cache(CACHE);
+ for (Cache.Entry<Long,String> e : c.query(new ScanQuery<Long,String>().setDataPageScanEnabled(true)).getAll())
+ assertEquals(e.getValue(), map.remove(e.getKey()));
+
+ assertTrue(map.isEmpty());
+ assertTrue(CacheDataTree.isLastFindWithDataPageScan());
+
+ stopAllGrids(true);
+
+ ignite = startGrid(0);
+ ignite.cluster().active(true);
+
+ c = ignite.cache(CACHE);
+ for (Cache.Entry<Long,String> e : c.query(new ScanQuery<Long,String>().setDataPageScanEnabled(true)).getAll())
+ assertEquals(e.getValue(), map2.remove(e.getKey()));
+
+ assertTrue(map2.isEmpty());
+ assertTrue(CacheDataTree.isLastFindWithDataPageScan());
+ }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 920c0f0..5873bd7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -456,7 +456,7 @@ public class DmlStatementsProcessor {
if (!F.isEmpty(plan.selectQuery())) {
GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()),
plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)),
- null, false, false, 0, null);
+ null, false, false, 0, null, null);
it = res.iterator();
}
@@ -568,7 +568,8 @@ public class DmlStatementsProcessor {
.setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
.setLocal(fieldsQry.isLocal())
.setPageSize(fieldsQry.getPageSize())
- .setTimeout((int)timeout, TimeUnit.MILLISECONDS);
+ .setTimeout((int)timeout, TimeUnit.MILLISECONDS)
+ .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
FieldsQueryCursor<List<?>> cur = idx.querySqlFields(schemaName, newFieldsQry, null,
true, true, mvccTracker(cctx, tx), cancel, false).get(0);
@@ -597,6 +598,9 @@ public class DmlStatementsProcessor {
if (distributedPlan.isReplicatedOnly())
flags |= GridH2QueryRequest.FLAG_REPLICATED;
+ flags = GridH2QueryRequest.setDataPageScanEnabled(flags,
+ fieldsQry.isDataPageScanEnabled());
+
int[] parts = fieldsQry.getPartitions();
IgniteInternalFuture<Long> fut = tx.updateAsync(
@@ -656,7 +660,8 @@ public class DmlStatementsProcessor {
.setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
.setLocal(fieldsQry.isLocal())
.setPageSize(fieldsQry.getPageSize())
- .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
+ .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS)
+ .setDataPageScanEnabled(fieldsQry.isDataPageScanEnabled());
cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, newFieldsQry, null, true, true,
null, cancel, false).get(0);
@@ -666,7 +671,7 @@ public class DmlStatementsProcessor {
else {
final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQuery(),
F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), false, fieldsQry.getTimeout(),
- cancel);
+ cancel, null);
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
@@ -1176,7 +1181,8 @@ public class DmlStatementsProcessor {
.setEnforceJoinOrder(qry.isEnforceJoinOrder())
.setLocal(qry.isLocal())
.setPageSize(qry.getPageSize())
- .setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
+ .setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS)
+ .setDataPageScanEnabled(qry.isDataPageScanEnabled());
cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schema, newFieldsQry, null, true, true,
new StaticMvccQueryTracker(cctx, mvccSnapshot), cancel, false).get(0);
@@ -1184,7 +1190,7 @@ public class DmlStatementsProcessor {
else {
final GridQueryFieldsResult res = idx.queryLocalSqlFields(schema, plan.selectQuery(),
F.asList(qry.getArgs()), filter, qry.isEnforceJoinOrder(), false, qry.getTimeout(), cancel,
- new StaticMvccQueryTracker(cctx, mvccSnapshot));
+ new StaticMvccQueryTracker(cctx, mvccSnapshot), null);
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java
index 5a0cc44..0358c98 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2TableDescriptor.java
@@ -437,7 +437,8 @@ public class H2TableDescriptor implements GridH2SystemIndexFactory {
if (cacheInfo.affinityNode()) {
assert pkHashIdx == null : pkHashIdx;
- pkHashIdx = new H2PkHashIndex(cacheInfo.cacheContext(), tbl, PK_HASH_IDX_NAME, cols);
+ pkHashIdx = new H2PkHashIndex(cacheInfo.cacheContext(), tbl, PK_HASH_IDX_NAME, cols,
+ tbl.rowDescriptor().context().config().getQueryParallelism());
return pkHashIdx;
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 5e00aea..a926c23 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.cache.query.RegisteredQueryCursor;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.query.CacheQueryObjectValueContext;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
@@ -172,6 +173,7 @@ import org.h2.util.JdbcUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import static java.lang.Boolean.FALSE;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkActive;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
@@ -183,6 +185,7 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.isDataPageScanEnabled;
/**
* Indexing implementation based on H2 database engine. In this implementation main query language is SQL,
@@ -487,13 +490,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param startTx Start transaction flag.
* @param timeout Query timeout in milliseconds.
* @param cancel Query cancel.
+ * @param dataPageScanEnabled If data page scan is enabled.
* @return Query result.
* @throws IgniteCheckedException If failed.
*/
- public GridQueryFieldsResult queryLocalSqlFields(String schemaName, String qry, @Nullable Collection<Object> params,
- IndexingQueryFilter filter, boolean enforceJoinOrder, boolean startTx, int timeout,
- GridQueryCancel cancel) throws IgniteCheckedException {
- return queryLocalSqlFields(schemaName, qry, params, filter, enforceJoinOrder, startTx, timeout, cancel, null);
+ public GridQueryFieldsResult queryLocalSqlFields(
+ String schemaName,
+ String qry,
+ @Nullable Collection<Object> params,
+ IndexingQueryFilter filter,
+ boolean enforceJoinOrder,
+ boolean startTx,
+ int timeout,
+ GridQueryCancel cancel,
+ Boolean dataPageScanEnabled
+ ) throws IgniteCheckedException {
+ return queryLocalSqlFields(schemaName, qry, params, filter, enforceJoinOrder, startTx, timeout, cancel, null, dataPageScanEnabled);
}
/**
@@ -508,14 +520,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param qryTimeout Query timeout in milliseconds.
* @param cancel Query cancel.
* @param mvccTracker Query tracker.
+ * @param dataPageScanEnabled If data page scan is enabled.
* @return Query result.
* @throws IgniteCheckedException If failed.
*/
- GridQueryFieldsResult queryLocalSqlFields(final String schemaName, String qry,
- @Nullable final Collection<Object> params, final IndexingQueryFilter filter, boolean enforceJoinOrder,
- boolean startTx, int qryTimeout, final GridQueryCancel cancel,
- MvccQueryTracker mvccTracker) throws IgniteCheckedException {
-
+ GridQueryFieldsResult queryLocalSqlFields(
+ final String schemaName,
+ String qry,
+ @Nullable final Collection<Object> params,
+ final IndexingQueryFilter filter,
+ boolean enforceJoinOrder,
+ boolean startTx,
+ int qryTimeout,
+ final GridQueryCancel cancel,
+ MvccQueryTracker mvccTracker,
+ Boolean dataPageScanEnabled
+ ) throws IgniteCheckedException {
GridNearTxLocal tx = null;
boolean mvccEnabled = mvccEnabled(kernalContext());
@@ -542,6 +562,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fldsQry.setEnforceJoinOrder(enforceJoinOrder);
fldsQry.setTimeout(qryTimeout, TimeUnit.MILLISECONDS);
+ fldsQry.setDataPageScanEnabled(dataPageScanEnabled);
return dmlProc.updateSqlFieldsLocal(schemaName, conn, p, fldsQry, filter, cancel);
}
@@ -643,7 +664,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable detachedConn = connMgr.detachThreadConnection();
try {
- ResultSet rs = executeSqlQueryWithTimer(stmt0, conn, qry0, params, timeout0, cancel);
+ ResultSet rs = executeSqlQueryWithTimer(stmt0, conn, qry0, params, timeout0, cancel, dataPageScanEnabled);
if (sfuFut0 != null) {
assert tx0.mvccSnapshot() != null;
@@ -911,13 +932,28 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param params Parameters.
* @param timeoutMillis Query timeout.
* @param cancel Query cancel.
+ * @param dataPageScanEnabled If data page scan is enabled.
* @return Result.
* @throws IgniteCheckedException If failed.
*/
- public ResultSet executeSqlQueryWithTimer(Connection conn, String sql, @Nullable Collection<Object> params,
- int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
+ public ResultSet executeSqlQueryWithTimer(
+ Connection conn,
+ String sql,
+ @Nullable Collection<Object> params,
+ int timeoutMillis,
+ @Nullable GridQueryCancel cancel,
+ Boolean dataPageScanEnabled
+ ) throws IgniteCheckedException {
return executeSqlQueryWithTimer(preparedStatementWithParams(conn, sql, params, false),
- conn, sql, params, timeoutMillis, cancel);
+ conn, sql, params, timeoutMillis, cancel, dataPageScanEnabled);
+ }
+
+ /**
+ * @param dataPageScanEnabled If data page scan is enabled.
+ */
+ public void enableDataPageScan(Boolean dataPageScanEnabled) {
+ // Data page scan is enabled by default for SQL.
+ CacheDataTree.setDataPageScanEnabled(dataPageScanEnabled != FALSE);
}
/**
@@ -929,14 +965,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param params Parameters.
* @param timeoutMillis Query timeout.
* @param cancel Query cancel.
+ * @param dataPageScanEnabled If data page scan is enabled.
* @return Result.
* @throws IgniteCheckedException If failed.
*/
- public ResultSet executeSqlQueryWithTimer(PreparedStatement stmt, Connection conn, String sql,
- @Nullable Collection<Object> params, int timeoutMillis, @Nullable GridQueryCancel cancel)
- throws IgniteCheckedException {
+ public ResultSet executeSqlQueryWithTimer(
+ PreparedStatement stmt,
+ Connection conn,
+ String sql,
+ @Nullable Collection<Object> params,
+ int timeoutMillis,
+ @Nullable GridQueryCancel cancel,
+ Boolean dataPageScanEnabled
+ ) throws IgniteCheckedException {
long start = U.currentTimeMillis();
+ enableDataPageScan(dataPageScanEnabled);
+
try {
ResultSet rs = executeSqlQuery(conn, stmt, timeoutMillis, cancel);
@@ -965,6 +1010,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
throw new IgniteCheckedException(e);
}
+ finally {
+ CacheDataTree.setDataPageScanEnabled(false);
+ }
}
/**
@@ -994,7 +1042,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
int timeout = qry.getTimeout();
final GridQueryFieldsResult res = queryLocalSqlFields(schemaName, sql, params, filter,
- enforceJoinOrder, startTx, timeout, cancel);
+ enforceJoinOrder, startTx, timeout, cancel, qry.isDataPageScanEnabled());
Iterable<List<?>> iter = () -> {
try {
@@ -1101,7 +1149,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
stmtEx.putMeta(MVCC_STATE, Boolean.TRUE);
}
else
- stmtEx.putMeta(MVCC_STATE, Boolean.FALSE);
+ stmtEx.putMeta(MVCC_STATE, FALSE);
}
return mvccEnabled;
@@ -1119,6 +1167,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param parts Partitions.
* @param lazy Lazy query execution flag.
* @param mvccTracker Query tracker.
+ * @param dataPageScanEnabled If data page scan is enabled.
* @return Iterable result.
*/
private Iterable<List<?>> runQueryTwoStep(
@@ -1132,7 +1181,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
final Object[] params,
final int[] parts,
final boolean lazy,
- MvccQueryTracker mvccTracker) {
+ MvccQueryTracker mvccTracker,
+ Boolean dataPageScanEnabled
+ ) {
assert !qry.mvccEnabled() || !F.isEmpty(qry.cacheIds());
try {
@@ -1151,7 +1202,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
@Override public Iterator<List<?>> iterator() {
try {
return rdcQryExec.query(schemaName, qry, keepCacheObj, enforceJoinOrder, opTimeout,
- cancel, params, parts, lazy, tracker);
+ cancel, params, parts, lazy, tracker, dataPageScanEnabled);
}
catch (Throwable e) {
if (tracker != null)
@@ -1220,6 +1271,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
res.setReplicatedOnly(qry.isReplicatedOnly());
res.setSchema(schemaName);
res.setSql(sql);
+ res.setDataPageScanEnabled(qry.isDataPageScanEnabled());
if (qry.getTimeout() > 0)
res.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
@@ -1591,7 +1643,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
* @return Query result.
*/
- @SuppressWarnings("unchecked")
private List<? extends FieldsQueryCursor<List<?>>> doRunPrepared(String schemaName, Prepared prepared,
SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, boolean keepBinary,
boolean startTx, MvccQueryTracker tracker, GridQueryCancel cancel, boolean registerAsNewQry) {
@@ -1944,6 +1995,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
fldsQry.setPageSize(pageSize);
fldsQry.setLocal(true);
+ fldsQry.setDataPageScanEnabled(isDataPageScanEnabled(flags));
boolean loc = true;
@@ -2045,7 +2097,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
qry.getArgs(),
parts,
qry.isLazy(),
- mvccTracker
+ mvccTracker,
+ qry.isDataPageScanEnabled()
);
QueryCursorImpl<List<?>> cursor = registerAsNewQry
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index 9a42362..1add933 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
@@ -58,18 +59,26 @@ public class H2PkHashIndex extends GridH2IndexBase {
/** */
private final GridCacheContext cctx;
+ /** */
+ private final int segments;
+
/**
* @param cctx Cache context.
* @param tbl Table.
* @param name Index name.
* @param colsList Index columns.
+ * @param segments Segments.
*/
public H2PkHashIndex(
GridCacheContext<?, ?> cctx,
GridH2Table tbl,
String name,
- List<IndexColumn> colsList
+ List<IndexColumn> colsList,
+ int segments
) {
+ assert segments > 0: segments;
+
+ this.segments = segments;
IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);
@@ -83,7 +92,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
/** {@inheritDoc} */
@Override public int segmentsCount() {
- return 1;
+ return segments;
}
/** {@inheritDoc} */
@@ -93,10 +102,13 @@ public class H2PkHashIndex extends GridH2IndexBase {
GridH2QueryContext qctx = GridH2QueryContext.get();
+ int seg = 0;
+
if (qctx != null) {
IndexingQueryFilter f = qctx.filter();
filter = f != null ? f.forCache(getTable().cacheName()) : null;
mvccSnapshot = qctx.mvccSnapshot();
+ seg = qctx.segment();
}
assert !cctx.mvccEnabled() || mvccSnapshot != null;
@@ -107,11 +119,17 @@ public class H2PkHashIndex extends GridH2IndexBase {
try {
Collection<GridCursor<? extends CacheDataRow>> cursors = new ArrayList<>();
- for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores())
- if (filter == null || filter.applyPartition(store.partId()))
+ for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores()) {
+ int part = store.partId();
+
+ if (segmentForPartition(part) != seg)
+ continue;
+
+ if (filter == null || filter.applyPartition(part))
cursors.add(store.cursor(cctx.cacheId(), lowerObj, upperObj, null, mvccSnapshot));
+ }
- return new H2Cursor(cursors.iterator());
+ return new H2PkHashIndexCursor(cursors.iterator());
}
catch (IgniteCheckedException e) {
throw DbException.convert(e);
@@ -191,7 +209,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
/**
* Cursor.
*/
- private class H2Cursor implements Cursor {
+ private class H2PkHashIndexCursor implements Cursor {
/** */
private final GridH2RowDescriptor desc;
@@ -204,7 +222,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
/**
* @param iter Cursors iterator.
*/
- private H2Cursor(Iterator<GridCursor<? extends CacheDataRow>> iter) {
+ private H2PkHashIndexCursor(Iterator<GridCursor<? extends CacheDataRow>> iter) {
assert iter != null;
this.iter = iter;
@@ -230,17 +248,23 @@ public class H2PkHashIndex extends GridH2IndexBase {
/** {@inheritDoc} */
@Override public boolean next() {
try {
- if (curr != null && curr.next())
- return true;
+ GridQueryTypeDescriptor type = desc.type();
- while (iter.hasNext()) {
- curr = iter.next();
+ for (;;) {
+ if (curr != null) {
+ while (curr.next()) {
+ // Need to filter rows by value type because in a single cache
+ // we can have multiple indexed types.
+ if (type.matchType(curr.get().value()))
+ return true;
+ }
+ }
- if (curr.next())
- return true;
- }
+ if (!iter.hasNext())
+ return false;
- return false;
+ curr = iter.next();
+ }
}
catch (IgniteCheckedException e) {
throw DbException.convert(e);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeFilterClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeFilterClosure.java
index 99d0894..ca662ac 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeFilterClosure.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeFilterClosure.java
@@ -24,6 +24,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccDataPageClosure;
import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
@@ -39,7 +41,7 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccVer
/**
*
*/
-public class H2TreeFilterClosure implements H2Tree.TreeRowClosure<GridH2SearchRow, GridH2Row> {
+public class H2TreeFilterClosure implements H2Tree.TreeRowClosure<GridH2SearchRow, GridH2Row>, MvccDataPageClosure {
/** */
private final MvccSnapshot mvccSnapshot;
@@ -105,6 +107,24 @@ public class H2TreeFilterClosure implements H2Tree.TreeRowClosure<GridH2SearchRo
return isVisible(cctx, mvccSnapshot, rowCrdVer, rowCntr, rowOpCntr, io.getLink(pageAddr, idx));
}
catch (IgniteTxMvccVersionCheckedException e) {
+ // TODO this catch must not be needed if we switch Vacuum to data page scan
+ // We expect the active tx state can be observed by read tx only in the cases when tx has been aborted
+ // asynchronously and node hasn't received finish message yet but coordinator has already removed it from
+ // the active txs map. Rows written by this tx are invisible to anyone and will be removed by the vacuum.
+ if (log.isDebugEnabled())
+ log.debug( "Unexpected tx state on index lookup. " + X.getFullStackTrace(e));
+
+ return false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean applyMvcc(DataPageIO io, long dataPageAddr, int itemId, int pageSize) throws IgniteCheckedException {
+ try {
+ return isVisible(cctx, mvccSnapshot, io, dataPageAddr, itemId, pageSize);
+ }
+ catch (IgniteTxMvccVersionCheckedException e) {
+ // TODO this catch must not be needed if we switch Vacuum to data page scan
// We expect the active tx state can be observed by read tx only in the cases when tx has been aborted
// asynchronously and node hasn't received finish message yet but coordinator has already removed it from
// the active txs map. Rows written by this tx are invisible to anyone and will be removed by the vacuum.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PrimaryScanIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PrimaryScanIndex.java
index d10a79b..753b73b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PrimaryScanIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PrimaryScanIndex.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.h2.opt;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.h2.engine.Session;
import org.h2.result.SortOrder;
import org.h2.table.Column;
@@ -56,8 +57,10 @@ public class GridH2PrimaryScanIndex extends GridH2ScanIndex<GridH2IndexBase> {
@Override protected GridH2IndexBase delegate() {
boolean rebuildFromHashInProgress = tbl.rebuildFromHashInProgress();
- if (hashIdx != null)
- return rebuildFromHashInProgress ? hashIdx : super.delegate();
+ if (hashIdx != null) {
+ return rebuildFromHashInProgress || CacheDataTree.isDataPageScanEnabled() ?
+ hashIdx : super.delegate();
+ }
else {
assert !rebuildFromHashInProgress;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 4c3b2ce..9625898 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -115,6 +115,7 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.isDataPageScanEnabled;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
/**
@@ -570,6 +571,7 @@ public class GridMapQueryExecutor {
final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
final boolean lazy = (FORCE_LAZY && req.queries().size() == 1) || req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
+ final Boolean dataPageScanEnabled = req.isDataPageScanEnabled();
final List<Integer> cacheIds = req.caches();
@@ -668,7 +670,8 @@ public class GridMapQueryExecutor {
tx,
txReq,
lockFut,
- runCntr);
+ runCntr,
+ dataPageScanEnabled);
}
else {
ctx.closure().callLocal(
@@ -694,7 +697,8 @@ public class GridMapQueryExecutor {
tx,
txReq,
lockFut,
- runCntr);
+ runCntr,
+ dataPageScanEnabled);
return null;
}
@@ -722,7 +726,9 @@ public class GridMapQueryExecutor {
req.mvccSnapshot(),
tx,
txReq,
- lockFut, runCntr);
+ lockFut,
+ runCntr,
+ dataPageScanEnabled);
}
/**
@@ -743,6 +749,7 @@ public class GridMapQueryExecutor {
* @param txDetails TX details, if it's a {@code FOR UPDATE} request, or {@code null}.
* @param lockFut Lock future.
* @param runCntr Counter which counts remaining queries in case segmented index is used.
+ * @param dataPageScanEnabled If data page scan is enabled.
*/
private void onQueryRequest0(
final ClusterNode node,
@@ -765,7 +772,9 @@ public class GridMapQueryExecutor {
@Nullable final GridDhtTxLocalAdapter tx,
@Nullable final GridH2SelectForUpdateTxDetails txDetails,
@Nullable final CompoundLockFuture lockFut,
- @Nullable final AtomicInteger runCntr) {
+ @Nullable final AtomicInteger runCntr,
+ Boolean dataPageScanEnabled
+ ) {
MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
// In presence of TX, we also must always have matching details.
@@ -801,7 +810,8 @@ public class GridMapQueryExecutor {
tx,
txDetails,
lockFut,
- runCntr);
+ runCntr,
+ dataPageScanEnabled);
}
});
@@ -928,7 +938,7 @@ public class GridMapQueryExecutor {
int opTimeout = IgniteH2Indexing.operationTimeout(timeout, tx);
- rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qr.queryCancel(qryIdx));
+ rs = h2.executeSqlQueryWithTimer(stmt, conn, sql, params0, opTimeout, qr.queryCancel(qryIdx), dataPageScanEnabled);
if (inTx) {
ResultSetEnlistFuture enlistFut = ResultSetEnlistFuture.future(
@@ -991,9 +1001,9 @@ public class GridMapQueryExecutor {
// Send the first page.
if (lockFut == null)
- sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping);
+ sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping, dataPageScanEnabled);
else {
- GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping);
+ GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize, removeMapping, dataPageScanEnabled);
if (msg != null) {
lockFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
@@ -1130,6 +1140,7 @@ public class GridMapQueryExecutor {
fldsQry.setTimeout(req.timeout(), TimeUnit.MILLISECONDS);
fldsQry.setPageSize(req.pageSize());
fldsQry.setLocal(true);
+ fldsQry.setDataPageScanEnabled(req.isDataPageScanEnabled());
boolean local = true;
@@ -1263,17 +1274,19 @@ public class GridMapQueryExecutor {
else if (qr.cancelled())
sendError(node, req.queryRequestId(), new QueryCancelledException());
else {
+ Boolean dataPageScanEnabled = isDataPageScanEnabled(req.getFlags());
+
MapQueryLazyWorker lazyWorker = qr.lazyWorker();
if (lazyWorker != null) {
lazyWorker.submit(new Runnable() {
@Override public void run() {
- sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false);
+ sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false, dataPageScanEnabled);
}
});
}
else
- sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false);
+ sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize(), false, dataPageScanEnabled);
}
}
@@ -1285,11 +1298,12 @@ public class GridMapQueryExecutor {
* @param segmentId Index segment ID.
* @param pageSize Page size.
* @param removeMapping Remove mapping flag.
+ * @param dataPageScanEnabled If data page scan is enabled.
* @return Next page.
* @throws IgniteCheckedException If failed.
*/
private GridQueryNextPageResponse prepareNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
- int pageSize, boolean removeMapping) throws IgniteCheckedException {
+ int pageSize, boolean removeMapping, Boolean dataPageScanEnabled) throws IgniteCheckedException {
MapQueryResult res = qr.result(qry);
assert res != null;
@@ -1301,7 +1315,7 @@ public class GridMapQueryExecutor {
List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize));
- boolean last = res.fetchNextPage(rows, pageSize);
+ boolean last = res.fetchNextPage(rows, pageSize, dataPageScanEnabled);
if (last) {
res.close();
@@ -1341,11 +1355,13 @@ public class GridMapQueryExecutor {
* @param segmentId Index segment ID.
* @param pageSize Page size.
* @param removeMapping Remove mapping flag.
+ * @param dataPageScanEnabled If data page scan is enabled.
*/
private void sendNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId,
- int pageSize, boolean removeMapping) {
+ int pageSize, boolean removeMapping, Boolean dataPageScanEnabled) {
try {
- GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, removeMapping);
+ GridQueryNextPageResponse msg = prepareNextPage(nodeRess, node, qr, qry, segmentId, pageSize, removeMapping,
+ dataPageScanEnabled);
if (msg != null) {
if (node.isLocal())
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 15788f2..41d1f61 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -110,6 +110,7 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuer
import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest.setDataPageScanEnabled;
/**
* Reduce query executor.
@@ -322,7 +323,8 @@ public class GridReduceQueryExecutor {
}
try {
- GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, seg, pageSize);
+ GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, seg, pageSize,
+ (byte)setDataPageScanEnabled(0, r.isDataPageScanEnabled()));
if (node.isLocal())
h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
@@ -383,6 +385,7 @@ public class GridReduceQueryExecutor {
* @param parts Partitions.
* @param lazy Lazy execution flag.
* @param mvccTracker Query tracker.
+ * @param dataPageScanEnabled If data page scan is enabled.
* @return Rows iterator.
*/
public Iterator<List<?>> query(
@@ -395,7 +398,9 @@ public class GridReduceQueryExecutor {
Object[] params,
int[] parts,
boolean lazy,
- MvccQueryTracker mvccTracker) {
+ MvccQueryTracker mvccTracker,
+ Boolean dataPageScanEnabled
+ ) {
if (qry.isLocal() && parts != null)
parts = null;
@@ -480,7 +485,7 @@ public class GridReduceQueryExecutor {
long qryReqId = qryIdGen.incrementAndGet();
final ReduceQueryRun r = new ReduceQueryRun(h2.connections().connectionForThread().connection(schemaName),
- qry.mapQueries().size(), qry.pageSize(), sfuFut);
+ qry.mapQueries().size(), qry.pageSize(), sfuFut, dataPageScanEnabled);
Collection<ClusterNode> nodes;
@@ -647,6 +652,8 @@ public class GridReduceQueryExecutor {
if (lazy && mapQrys.size() == 1)
flags |= GridH2QueryRequest.FLAG_LAZY;
+ flags = setDataPageScanEnabled(flags, dataPageScanEnabled);
+
GridH2QueryRequest req = new GridH2QueryRequest()
.requestId(qryReqId)
.topologyVersion(topVer)
@@ -761,7 +768,8 @@ public class GridReduceQueryExecutor {
rdc.query(),
F.asList(rdc.parameters(params)),
timeoutMillis,
- cancel);
+ cancel,
+ dataPageScanEnabled);
resIter = new H2FieldsIterator(res, mvccTracker, false, null);
@@ -1122,7 +1130,7 @@ public class GridReduceQueryExecutor {
for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) {
ResultSet rs =
- h2.executeSqlQueryWithTimer(c, "SELECT PLAN FROM " + mergeTableIdentifier(i), null, 0, null);
+ h2.executeSqlQueryWithTimer(c, "SELECT PLAN FROM " + mergeTableIdentifier(i), null, 0, null, null);
lists.add(F.asList(getPlan(rs)));
}
@@ -1141,6 +1149,7 @@ public class GridReduceQueryExecutor {
"EXPLAIN " + rdc.query(),
F.asList(rdc.parameters(params)),
0,
+ null,
null);
lists.add(F.asList(getPlan(rs)));
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index fb928c4..c823023 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
import org.apache.ignite.internal.util.typedef.F;
@@ -171,9 +172,10 @@ class MapQueryResult {
/**
* @param rows Collection to fetch into.
* @param pageSize Page size.
+ * @param dataPageScanEnabled If data page scan is enabled.
* @return {@code true} If there are no more rows available.
*/
- synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
+ synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize, Boolean dataPageScanEnabled) {
assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
if (closed)
@@ -183,63 +185,70 @@ class MapQueryResult {
page++;
- for (int i = 0 ; i < pageSize; i++) {
- if (!res.next())
- return true;
+ h2.enableDataPageScan(dataPageScanEnabled);
- Value[] row = res.currentRow();
+ try {
+ for (int i = 0; i < pageSize; i++) {
+ if (!res.next())
+ return true;
+
+ Value[] row = res.currentRow();
- if (cpNeeded) {
- boolean copied = false;
+ if (cpNeeded) {
+ boolean copied = false;
- for (int j = 0; j < row.length; j++) {
- Value val = row[j];
+ for (int j = 0; j < row.length; j++) {
+ Value val = row[j];
- if (val instanceof GridH2ValueCacheObject) {
- GridH2ValueCacheObject valCacheObj = (GridH2ValueCacheObject)val;
+ if (val instanceof GridH2ValueCacheObject) {
+ GridH2ValueCacheObject valCacheObj = (GridH2ValueCacheObject)val;
- row[j] = new GridH2ValueCacheObject(valCacheObj.getCacheObject(), h2.objectContext()) {
- @Override public Object getObject() {
- return getObject(true);
- }
- };
+ row[j] = new GridH2ValueCacheObject(valCacheObj.getCacheObject(), h2.objectContext()) {
+ @Override public Object getObject() {
+ return getObject(true);
+ }
+ };
- copied = true;
+ copied = true;
+ }
}
+
+ if (i == 0 && !copied)
+ cpNeeded = false; // No copy on read caches, skip next checks.
}
- if (i == 0 && !copied)
- cpNeeded = false; // No copy on read caches, skip next checks.
- }
+ assert row != null;
+
+ if (readEvt) {
+ GridKernalContext ctx = h2.kernalContext();
+
+ ctx.event().record(new CacheQueryReadEvent<>(
+ ctx.discovery().localNode(),
+ "SQL fields query result set row read.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.SQL.name(),
+ cctx.name(),
+ null,
+ qry.query(),
+ null,
+ null,
+ params,
+ qrySrcNodeId,
+ null,
+ null,
+ null,
+ null,
+ row(row)));
+ }
- assert row != null;
-
- if (readEvt) {
- GridKernalContext ctx = h2.kernalContext();
-
- ctx.event().record(new CacheQueryReadEvent<>(
- ctx.discovery().localNode(),
- "SQL fields query result set row read.",
- EVT_CACHE_QUERY_OBJECT_READ,
- CacheQueryType.SQL.name(),
- cctx.name(),
- null,
- qry.query(),
- null,
- null,
- params,
- qrySrcNodeId,
- null,
- null,
- null,
- null,
- row(row)));
+ rows.add(res.currentRow());
}
- rows.add(res.currentRow());
+ return !res.hasNext();
+ }
+ finally {
+ CacheDataTree.setDataPageScanEnabled(false);
}
-
- return !res.hasNext();
}
/**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
index b488bc3..d2f7e9a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -48,6 +48,9 @@ class ReduceQueryRun {
private final int pageSize;
/** */
+ private final Boolean dataPageScanEnabled;
+
+ /** */
private final AtomicReference<State> state = new AtomicReference<>();
/** Future controlling {@code SELECT FOR UPDATE} query execution. */
@@ -59,17 +62,27 @@ class ReduceQueryRun {
* @param idxsCnt Number of indexes.
* @param pageSize Page size.
* @param selectForUpdateFut Future controlling {@code SELECT FOR UPDATE} query execution.
- */
- ReduceQueryRun(Connection conn, int idxsCnt, int pageSize,
- GridNearTxSelectForUpdateFuture selectForUpdateFut) {
-
+ * @param dataPageScanEnabled If data page scan is enabled.
+ */
+ ReduceQueryRun(
+ Connection conn,
+ int idxsCnt,
+ int pageSize,
+ GridNearTxSelectForUpdateFuture selectForUpdateFut,
+ Boolean dataPageScanEnabled
+ ) {
this.conn = (JdbcConnection)conn;
-
this.idxs = new ArrayList<>(idxsCnt);
-
this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
-
this.selectForUpdateFut = selectForUpdateFut;
+ this.dataPageScanEnabled = dataPageScanEnabled;
+ }
+
+ /**
+ * @return {@code true} If data page scan is enabled.
+ */
+ public Boolean isDataPageScanEnabled() {
+ return dataPageScanEnabled;
}
/**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
index e40bc2d..6602d46 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
@@ -254,6 +254,15 @@ public class GridH2DmlRequest implements Message, GridCacheQueryMarshallable {
}
/**
+ * Checks if data page scan enabled.
+ *
+ * @return {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
+ */
+ public Boolean isDataPageScanEnabled() {
+ return GridH2QueryRequest.isDataPageScanEnabled(flags);
+ }
+
+ /**
* @return Timeout.
*/
public int timeout() {
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index cca366a..93e27f0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -88,6 +88,22 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
public static final int FLAG_LAZY = 1 << 5;
/** */
+ private static final int FLAG_DATA_PAGE_SCAN_SHIFT = 6;
+
+ /** */
+ private static final int FLAG_DATA_PAGE_SCAN_MASK = 0b11 << FLAG_DATA_PAGE_SCAN_SHIFT;
+
+ /** */
+ @SuppressWarnings("PointlessBitwiseExpression")
+ private static final int FLAG_DATA_PAGE_SCAN_DFLT = 0b00 << FLAG_DATA_PAGE_SCAN_SHIFT;
+
+ /** */
+ private static final int FLAG_DATA_PAGE_SCAN_ENABLED = 0b01 << FLAG_DATA_PAGE_SCAN_SHIFT;
+
+ /** */
+ private static final int FLAG_DATA_PAGE_SCAN_DISABLED = 0b10 << FLAG_DATA_PAGE_SCAN_SHIFT;
+
+ /** */
private long reqId;
/** */
@@ -416,6 +432,48 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
this.txReq = txDetails;
}
+ /**
+ * @param flags Flags.
+ * @param dataPageScanEnabled {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
+ * @return Updated flags.
+ */
+ public static int setDataPageScanEnabled(int flags, Boolean dataPageScanEnabled) {
+ int x = dataPageScanEnabled == null ? FLAG_DATA_PAGE_SCAN_DFLT :
+ dataPageScanEnabled ? FLAG_DATA_PAGE_SCAN_ENABLED : FLAG_DATA_PAGE_SCAN_DISABLED;
+
+ flags &= ~FLAG_DATA_PAGE_SCAN_MASK; // Clear old bits.
+ flags |= x; // Set new bits.
+
+ return flags;
+ }
+
+ /**
+ * Checks if data page scan enabled.
+ *
+ * @return {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
+ */
+ public Boolean isDataPageScanEnabled() {
+ return isDataPageScanEnabled(flags);
+ }
+
+ /**
+ * Checks if data page scan enabled.
+ *
+ * @param flags Flags.
+ * @return {@code true} If data page scan enabled, {@code false} if not, and {@code null} if not set.
+ */
+ public static Boolean isDataPageScanEnabled(int flags) {
+ switch (flags & FLAG_DATA_PAGE_SCAN_MASK) {
+ case FLAG_DATA_PAGE_SCAN_ENABLED:
+ return true;
+
+ case FLAG_DATA_PAGE_SCAN_DISABLED:
+ return false;
+ }
+
+ return null;
+ }
+
/** {@inheritDoc} */
@Override public void marshall(Marshaller m) {
if (paramsBytes != null)
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2RowCacheSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2RowCacheSelfTest.java
index 26ff6d0..af235b9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2RowCacheSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2RowCacheSelfTest.java
@@ -176,12 +176,14 @@ public class H2RowCacheSelfTest extends AbstractIndexingCommonTest {
assertEquals(0, rowCache.size());
// Warmup cache.
- cache.query(new SqlFieldsQuery("SELECT * FROM Value")).getAll();
+ cache.query(new SqlFieldsQuery("SELECT * FROM Value")
+ .setDataPageScanEnabled(false)).getAll();
assertEquals(maxSize / 2, rowCache.size());
// Query again - are there any leaks?
- cache.query(new SqlFieldsQuery("SELECT * FROM Value")).getAll();
+ cache.query(new SqlFieldsQuery("SELECT * FROM Value")
+ .setDataPageScanEnabled(false)).getAll();
assertEquals(maxSize / 2, rowCache.size());
@@ -191,7 +193,8 @@ public class H2RowCacheSelfTest extends AbstractIndexingCommonTest {
assertEquals(maxSize / 2, rowCache.size());
- cache.query(new SqlFieldsQuery("SELECT * FROM Value")).getAll();
+ cache.query(new SqlFieldsQuery("SELECT * FROM Value")
+ .setDataPageScanEnabled(false)).getAll();
assertEquals(maxSize, rowCache.size());
@@ -201,16 +204,16 @@ public class H2RowCacheSelfTest extends AbstractIndexingCommonTest {
assertEquals(maxSize, rowCache.size());
- cache.query(new SqlFieldsQuery("SELECT * FROM Value")).getAll();
+ cache.query(new SqlFieldsQuery("SELECT * FROM Value").setDataPageScanEnabled(false)).getAll();
assertEquals(maxSize, rowCache.size());
// Delete all.
- cache.query(new SqlFieldsQuery("DELETE FROM Value")).getAll();
+ cache.query(new SqlFieldsQuery("DELETE FROM Value").setDataPageScanEnabled(false)).getAll();
assertEquals(0, rowCache.size());
- cache.query(new SqlFieldsQuery("SELECT * FROM Value")).getAll();
+ cache.query(new SqlFieldsQuery("SELECT * FROM Value").setDataPageScanEnabled(false)).getAll();
assertEquals(0, rowCache.size());
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 2c7eb62..56d600a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -159,7 +159,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends AbstractIndexingCo
range *= 3;
GridQueryFieldsResult res = spi.queryLocalSqlFields(spi.schema("A"), sql, Arrays.<Object>asList(1,
- range), null, false, false, 0, null);
+ range), null, false, false, 0, null, null);
assert res.iterator().hasNext();
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
new file mode 100644
index 0000000..14fec0a
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static java.lang.Boolean.FALSE;
+
+/**
+ */
+@RunWith(JUnit4.class)
+public class QueryDataPageScanTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(true)));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMultipleIndexedTypes() throws Exception {
+ final String cacheName = "test_multi_type";
+
+ IgniteEx server = startGrid(0);
+ server.cluster().active(true);
+
+ CacheConfiguration<Object,Object> ccfg = new CacheConfiguration<>(cacheName);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 1));
+ ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+ ccfg.setIndexedTypes(
+ Integer.class, Integer.class,
+ Long.class, String.class,
+ Long.class, TestData.class
+ );
+ ccfg.setQueryEntities(
+ Arrays.asList(
+ new QueryEntity()
+ .setValueType(UUID.class.getName())
+ .setKeyType(Integer.class.getName())
+ .setTableName("Uuids"),
+ new QueryEntity()
+ .setValueType(Person.class.getName())
+ .setKeyType(Integer.class.getName())
+ .setTableName("My_Persons")
+ .setFields(Person.getFields())
+ )
+ );
+
+ IgniteCache<Object,Object> cache = server.createCache(ccfg);
+
+ cache.put(1L, "bla-bla");
+ cache.put(2L, new TestData(777L));
+ cache.put(3, 3);
+ cache.put(7, UUID.randomUUID());
+ cache.put(9, new Person("Vasya", 99));
+
+ CacheDataTree.isLastFindWithDataPageScan();
+
+ List<List<?>> res = cache.query(new SqlFieldsQuery("select z, _key, _val from TestData use index()")
+ .setDataPageScanEnabled(true)).getAll();
+ assertEquals(1, res.size());
+ assertEquals(777L, res.get(0).get(0));
+ assertTrue(CacheDataTree.isLastFindWithDataPageScan());
+
+ res = cache.query(new SqlFieldsQuery("select _val, _key from String use index()")
+ .setDataPageScanEnabled(true)).getAll();
+ assertEquals(1, res.size());
+ assertEquals("bla-bla", res.get(0).get(0));
+ assertTrue(CacheDataTree.isLastFindWithDataPageScan());
+
+ res = cache.query(new SqlFieldsQuery("select _key, _val from Integer use index()")
+ .setDataPageScanEnabled(true)).getAll();
+ assertEquals(1, res.size());
+ assertEquals(3, res.get(0).get(0));
+ assertTrue(CacheDataTree.isLastFindWithDataPageScan());
+
+ res = cache.query(new SqlFieldsQuery("select _key, _val from uuids use index()")
+ .setDataPageScanEnabled(true)).getAll();
+ assertEquals(1, res.size());
+ assertEquals(7, res.get(0).get(0));
+ assertTrue(CacheDataTree.isLastFindWithDataPageScan());
+
+ res = cache.query(new SqlFieldsQuery("select age, name from my_persons use index()")
+ .setDataPageScanEnabled(true)).getAll();
+ assertEquals(1, res.size());
+ assertEquals(99, res.get(0).get(0));
+ assertEquals("Vasya", res.get(0).get(1));
+ assertTrue(CacheDataTree.isLastFindWithDataPageScan());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testConcurrentUpdatesWithMvcc() throws Exception {
+ doTestConcurrentUpdates(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testConcurrentUpdatesNoMvcc() throws Exception {
+ try {
+ doTestConcurrentUpdates(false);
+
+ throw new IllegalStateException("Expected to detect data inconsistency.");
+ }
+ catch (AssertionError e) {
+ assertTrue(e.getMessage().startsWith("wrong sum!"));
+ }
+ }
+
+ private void doTestConcurrentUpdates(boolean enableMvcc) throws Exception {
+ final String cacheName = "test_updates";
+
+ IgniteEx server = startGrid(0);
+ server.cluster().active(true);
+
+ CacheConfiguration<Long,Long> ccfg = new CacheConfiguration<>(cacheName);
+ ccfg.setIndexedTypes(Long.class, Long.class);
+ ccfg.setAtomicityMode(enableMvcc ?
+ CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT :
+ CacheAtomicityMode.TRANSACTIONAL);
+
+ IgniteCache<Long,Long> cache = server.createCache(ccfg);
+
+ long accounts = 100;
+ long initialBalance = 100;
+
+ for (long i = 0; i < accounts; i++)
+ cache.put(i, initialBalance);
+
+ assertEquals(accounts * initialBalance,((Number)
+ cache.query(new SqlFieldsQuery("select sum(_val) from Long use index()")
+ .setDataPageScanEnabled(true)).getAll().get(0).get(0)).longValue());
+ assertTrue(CacheDataTree.isLastFindWithDataPageScan());
+
+ AtomicBoolean cancel = new AtomicBoolean();
+
+ IgniteInternalFuture<?> updFut = multithreadedAsync(() -> {
+ Random rnd = ThreadLocalRandom.current();
+
+ while (!cancel.get() && !Thread.interrupted()) {
+ long accountId1 = rnd.nextInt((int)accounts);
+ long accountId2 = rnd.nextInt((int)accounts);
+
+ if (accountId1 == accountId2)
+ continue;
+
+ // Sort to avoid MVCC deadlock.
+ if (accountId1 > accountId2) {
+ long tmp = accountId1;
+ accountId1 = accountId2;
+ accountId2 = tmp;
+ }
+
+ try (
+ Transaction tx = server.transactions().txStart()
+ ) {
+ long balance1 = cache.get(accountId1);
+ long balance2 = cache.get(accountId2);
+
+ if (balance1 <= balance2) {
+ if (balance2 == 0)
+ continue; // balance1 is 0 as well here
+
+ long transfer = rnd.nextInt((int)balance2);
+
+ if (transfer == 0)
+ transfer = balance2;
+
+ balance1 += transfer;
+ balance2 -= transfer;
+ }
+ else {
+ long transfer = rnd.nextInt((int)balance1);
+
+ if (transfer == 0)
+ transfer = balance1;
+
+ balance1 -= transfer;
+ balance2 += transfer;
+ }
+
+ cache.put(accountId1, balance1);
+ cache.put(accountId2, balance2);
+
+ tx.commit();
+ }
+ catch (CacheException e) {
+ if (!e.getMessage().contains(
+ "Cannot serialize transaction due to write conflict (transaction is marked for rollback)"))
+ throw new IllegalStateException(e);
+// else
+// U.warn(log, "Failed to commit TX, will ignore!");
+ }
+ }
+ }, 16, "updater");
+
+ IgniteInternalFuture<?> qryFut = multithreadedAsync(() -> {
+ while (!cancel.get() && !Thread.interrupted()) {
+ assertEquals("wrong sum!", accounts * initialBalance, ((Number)
+ cache.query(new SqlFieldsQuery("select sum(_val) from Long use index()")
+ .setDataPageScanEnabled(true)).getAll().get(0).get(0)).longValue());
+// info("query ok!");
+ }
+ }, 2, "query");
+
+ qryFut.listen((f) -> cancel.set(true));
+ updFut.listen((f) -> cancel.set(true));
+
+ long start = U.currentTimeMillis();
+
+ while (!cancel.get() && U.currentTimeMillis() - start < 15_000)
+ doSleep(100);
+
+ cancel.set(true);
+
+ qryFut.get(3000);
+ updFut.get(1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDataPageScan() throws Exception {
+ final String cacheName = "test";
+
+ GridQueryProcessor.idxCls = DirectPageScanIndexing.class;
+ IgniteEx server = startGrid(0);
+ server.cluster().active(true);
+
+ Ignition.setClientMode(true);
+ IgniteEx client = startGrid(1);
+
+ CacheConfiguration<Long,TestData> ccfg = new CacheConfiguration<>(cacheName);
+ ccfg.setIndexedTypes(Long.class, TestData.class);
+ ccfg.setSqlFunctionClasses(QueryDataPageScanTest.class);
+
+ IgniteCache<Long,TestData> clientCache = client.createCache(ccfg);
+
+ final int keysCnt = 1000;
+
+ for (long i = 0; i < keysCnt; i++)
+ clientCache.put(i, new TestData(i));
+
+ IgniteCache<Long,TestData> serverCache = server.cache(cacheName);
+
+ doTestScanQuery(clientCache, keysCnt);
+ doTestScanQuery(serverCache, keysCnt);
+
+ doTestSqlQuery(clientCache);
+ doTestSqlQuery(serverCache);
+
+ doTestDml(clientCache);
+ doTestDml(serverCache);
+
+ doTestLazySql(clientCache, keysCnt);
+ doTestLazySql(serverCache, keysCnt);
+ }
+
+ private void doTestLazySql(IgniteCache<Long,TestData> cache, int keysCnt) {
+ checkLazySql(cache, false, keysCnt);
+ checkLazySql(cache, true, keysCnt);
+ checkLazySql(cache, null, keysCnt);
+ }
+
+ private void checkLazySql(IgniteCache<Long,TestData> cache, Boolean dataPageScanEnabled, int keysCnt) {
+ CacheDataTree.isLastFindWithDataPageScan();
+
+ DirectPageScanIndexing.expectedDataPageScanEnabled = dataPageScanEnabled;
+
+ final int expNestedLoops = 5;
+
+ try (
+ FieldsQueryCursor<List<?>> cursor = cache.query(
+ new SqlFieldsQuery(
+ "select 1 " +
+ "from TestData a use index(), TestData b use index() " +
+ "where a.z between ? and ? " +
+ "and check_scan_flag(?,true)")
+ .setLazy(true)
+ .setDataPageScanEnabled(DirectPageScanIndexing.expectedDataPageScanEnabled)
+ .setArgs(1, expNestedLoops, DirectPageScanIndexing.expectedDataPageScanEnabled)
+ .setPageSize(keysCnt / 2) // Must be less than keysCnt.
+ )
+ ) {
+ int nestedLoops = 0;
+ int rowCnt = 0;
+
+ for (List<?> row : cursor) {
+ if (dataPageScanEnabled == FALSE)
+ assertNull(CacheDataTree.isLastFindWithDataPageScan()); // HashIndex was never used.
+ else {
+ Boolean x = CacheDataTree.isLastFindWithDataPageScan();
+
+ if (x != null) {
+ assertTrue(x);
+ nestedLoops++;
+ }
+ }
+
+ rowCnt++;
+ }
+
+ assertEquals(keysCnt * expNestedLoops, rowCnt);
+ assertEquals(dataPageScanEnabled == FALSE ? 0 : expNestedLoops, nestedLoops);
+ }
+ }
+
+ private void doTestDml(IgniteCache<Long,TestData> cache) {
+ // SQL query (data page scan must be enabled by default).
+ DirectPageScanIndexing.callsCnt.set(0);
+ int callsCnt = 0;
+
+ checkDml(cache, null);
+ assertEquals(++callsCnt, DirectPageScanIndexing.callsCnt.get());
+
+ checkDml(cache, true);
+ assertEquals(++callsCnt, DirectPageScanIndexing.callsCnt.get());
+
+ checkDml(cache, false);
+ assertEquals(++callsCnt, DirectPageScanIndexing.callsCnt.get());
+
+ checkDml(cache, null);
+ assertEquals(++callsCnt, DirectPageScanIndexing.callsCnt.get());
+ }
+
+ private void checkDml(IgniteCache<Long,TestData> cache, Boolean dataPageScanEnabled) {
+ DirectPageScanIndexing.expectedDataPageScanEnabled = dataPageScanEnabled;
+
+ assertEquals(0L, cache.query(new SqlFieldsQuery(
+ "update TestData set z = z + 1 where check_scan_flag(?,false)")
+ .setDataPageScanEnabled(DirectPageScanIndexing.expectedDataPageScanEnabled)
+ .setArgs(DirectPageScanIndexing.expectedDataPageScanEnabled)
+ ).getAll().get(0).get(0));
+
+ checkSqlLastFindDataPageScan(dataPageScanEnabled);
+ }
+
+ private void checkSqlLastFindDataPageScan(Boolean dataPageScanEnabled) {
+ if (dataPageScanEnabled == FALSE)
+ assertNull(CacheDataTree.isLastFindWithDataPageScan()); // HashIdx was not used.
+ else
+ assertTrue(CacheDataTree.isLastFindWithDataPageScan());
+ }
+
+ private void doTestSqlQuery(IgniteCache<Long,TestData> cache) {
+ // SQL query (data page scan must be enabled by default).
+ DirectPageScanIndexing.callsCnt.set(0);
+ int callsCnt = 0;
+
+ checkSqlQuery(cache, null);
+ assertEquals(++callsCnt, DirectPageScanIndexing.callsCnt.get());
+
+ checkSqlQuery(cache, true);
+ assertEquals(++callsCnt, DirectPageScanIndexing.callsCnt.get());
+
+ checkSqlQuery(cache, false);
+ assertEquals(++callsCnt, DirectPageScanIndexing.callsCnt.get());
+
+ checkSqlQuery(cache, null);
+ assertEquals(++callsCnt, DirectPageScanIndexing.callsCnt.get());
+ }
+
+ private void checkSqlQuery(IgniteCache<Long,TestData> cache, Boolean dataPageScanEnabled) {
+ DirectPageScanIndexing.expectedDataPageScanEnabled = dataPageScanEnabled;
+
+ assertTrue(cache.query(new SqlQuery<>(TestData.class,
+ "from TestData use index() where check_scan_flag(?,false)") // Force full scan with USE INDEX()
+ .setArgs(DirectPageScanIndexing.expectedDataPageScanEnabled)
+ .setDataPageScanEnabled(DirectPageScanIndexing.expectedDataPageScanEnabled))
+ .getAll().isEmpty());
+
+ checkSqlLastFindDataPageScan(dataPageScanEnabled);
+ }
+
+ private void doTestScanQuery(IgniteCache<Long,TestData> cache, int keysCnt) {
+ // Scan query (data page scan must be disabled by default).
+ TestPredicate.callsCnt.set(0);
+ int callsCnt = 0;
+
+ assertTrue(cache.query(new ScanQuery<>(new TestPredicate())).getAll().isEmpty());
+ assertFalse(CacheDataTree.isLastFindWithDataPageScan());
+ assertEquals(callsCnt += keysCnt, TestPredicate.callsCnt.get());
+
+ checkScanQuery(cache, true, true);
+ assertEquals(callsCnt += keysCnt, TestPredicate.callsCnt.get());
+
+ checkScanQuery(cache, false, false);
+ assertEquals(callsCnt += keysCnt, TestPredicate.callsCnt.get());
+
+ checkScanQuery(cache, true, true);
+ assertEquals(callsCnt += keysCnt, TestPredicate.callsCnt.get());
+
+ checkScanQuery(cache, null, false);
+ assertEquals(callsCnt += keysCnt, TestPredicate.callsCnt.get());
+ }
+
+ private void checkScanQuery(IgniteCache<Long,TestData> cache, Boolean dataPageScanEnabled, Boolean expLastDataPageScan) {
+ assertTrue(cache.query(new ScanQuery<>(new TestPredicate())
+ .setDataPageScanEnabled(dataPageScanEnabled)).getAll().isEmpty());
+ assertEquals(expLastDataPageScan, CacheDataTree.isLastFindWithDataPageScan());
+ }
+
+ /**
+ * @param exp Expected flag value.
+ * @param res Result to return.
+ * @return The given result..
+ */
+ @QuerySqlFunction(alias = "check_scan_flag")
+ public static boolean checkScanFlagFromSql(Boolean exp, boolean res) {
+ assertEquals(exp != FALSE, CacheDataTree.isDataPageScanEnabled());
+
+ return res;
+ }
+
+ /**
+ */
+ static class DirectPageScanIndexing extends IgniteH2Indexing {
+ /** */
+ static volatile Boolean expectedDataPageScanEnabled;
+
+ /** */
+ static final AtomicInteger callsCnt = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override public ResultSet executeSqlQueryWithTimer(
+ PreparedStatement stmt,
+ Connection conn,
+ String sql,
+ @Nullable Collection<Object> params,
+ int timeoutMillis,
+ @Nullable GridQueryCancel cancel,
+ Boolean dataPageScanEnabled
+ ) throws IgniteCheckedException {
+ callsCnt.incrementAndGet();
+ assertEquals(expectedDataPageScanEnabled, dataPageScanEnabled);
+
+ return super.executeSqlQueryWithTimer(stmt, conn, sql, params, timeoutMillis,
+ cancel, dataPageScanEnabled);
+ }
+ }
+
+ /**
+ */
+ static class TestPredicate implements IgniteBiPredicate<Long,TestData> {
+ /** */
+ static final AtomicInteger callsCnt = new AtomicInteger();
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(Long k, TestData v) {
+ callsCnt.incrementAndGet();
+ return false;
+ }
+ }
+
+ /**
+ */
+ static class TestData implements Serializable {
+ /** */
+ static final long serialVersionUID = 42L;
+
+ /** */
+ @QuerySqlField
+ long z;
+
+ /**
+ */
+ TestData(long z) {
+ this.z = z;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TestData testData = (TestData)o;
+
+ return z == testData.z;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (int)(z ^ (z >>> 32));
+ }
+ }
+
+ /**
+ * Externalizable class to make it non-binary.
+ */
+ static class Person implements Externalizable {
+ String name;
+ int age;
+
+ public Person() {
+ // No-op
+ }
+
+ Person(String name, int age) {
+ this.name = Objects.requireNonNull(name);
+ this.age = age;
+ }
+
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(name);
+ out.writeInt(age);
+ }
+
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ name = in.readUTF();
+ age = in.readInt();
+ }
+
+ static LinkedHashMap<String,String> getFields() {
+ LinkedHashMap<String,String> m = new LinkedHashMap<>();
+
+ m.put("age", "INT");
+ m.put("name", "VARCHAR");
+
+ return m;
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 5ffd7fa..9a40309 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -162,6 +162,7 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQueryCancelOrTimeoutSelfTest;
import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.CacheDataPageScanQueryTest;
import org.apache.ignite.internal.processors.cache.query.CacheScanQueryFailoverTest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryTransformerSelfTest;
import org.apache.ignite.internal.processors.cache.query.GridCircularQueueTest;
@@ -206,6 +207,7 @@ import org.apache.ignite.internal.processors.query.h2.H2StatementCacheSelfTest;
import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest;
import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryMinMaxTest;
import org.apache.ignite.internal.processors.query.h2.PreparedStatementExSelfTest;
+import org.apache.ignite.internal.processors.query.h2.QueryDataPageScanTest;
import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPoolSelfTest;
import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest;
import org.apache.ignite.internal.processors.query.h2.sql.ExplainSelfTest;
@@ -350,6 +352,8 @@ import org.junit.runners.Suite;
IgniteCrossCachesJoinsQueryTest.class,
IgniteCacheMultipleIndexedTypesTest.class,
+ CacheDataPageScanQueryTest.class,
+ QueryDataPageScanTest.class,
// DML.
IgniteCacheMergeSqlQuerySelfTest.class,