You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2023/02/16 19:47:44 UTC
[ignite] branch master updated: IGNITE-18721 Disable WAL for index partition during full rebuild (#10533)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 904eeb769b6 IGNITE-18721 Disable WAL for index partition during full rebuild (#10533)
904eeb769b6 is described below
commit 904eeb769b6aa0934dbe7796a2d0833898be56ed
Author: Nikolay <ni...@apache.org>
AuthorDate: Thu Feb 16 22:47:35 2023 +0300
IGNITE-18721 Disable WAL for index partition during full rebuild (#10533)
---
.../sorted/maintenance/RebuildIndexAction.java | 2 +-
.../managers/indexing/IndexesRebuildTask.java | 43 ++-
.../pagemem/store/IgnitePageStoreManager.java | 4 +-
.../pagemem/wal/IgniteWriteAheadLogManager.java | 3 +-
.../processors/cache/CacheGroupContext.java | 15 +
.../internal/processors/cache/WalStateManager.java | 8 +-
.../persistence/file/FilePageStoreManager.java | 46 ++-
.../cache/persistence/pagemem/PageMemoryImpl.java | 4 +-
.../cache/persistence/tree/util/PageHandler.java | 2 +-
.../persistence/wal/FileWriteAheadLogManager.java | 4 +-
.../processors/query/GridQueryProcessor.java | 21 +-
.../query/aware/IndexBuildStatusHolder.java | 18 +-
.../query/aware/IndexBuildStatusStorage.java | 38 +-
.../query/aware/IndexRebuildCacheInfo.java | 23 +-
.../management/SortedIndexDescriptorFactory.java | 5 +-
.../cache/persistence/pagemem/NoOpWALManager.java | 2 +-
.../WalDisabledDuringIndexRecreateTest.java | 381 +++++++++++++++++++++
.../testsuites/IgnitePdsWithIndexingTestSuite.java | 2 +
18 files changed, 584 insertions(+), 37 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/maintenance/RebuildIndexAction.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/maintenance/RebuildIndexAction.java
index 5bd696805d3..255e1c7dab2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/maintenance/RebuildIndexAction.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/maintenance/RebuildIndexAction.java
@@ -172,7 +172,7 @@ public class RebuildIndexAction implements MaintenanceAction<Boolean> {
/** {@inheritDoc} */
@Override public void visit(SchemaIndexCacheVisitorClosure clo) {
// Rebuild index after it is created.
- storage.onStartRebuildIndexes(cctx);
+ storage.onStartRebuildIndexes(cctx, false);
try {
super.visit(clo);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
index 898de50d033..85046fbe4b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
@@ -68,7 +68,9 @@ public class IndexesRebuildTask {
String cacheName = cctx.name();
- if (pageStore == null || !pageStore.hasIndexStore(cctx.groupId())) {
+ boolean recreate = pageStore == null || !pageStore.hasIndexStore(cctx.groupId());
+
+ if (recreate) {
boolean mvccEnabled = cctx.mvccEnabled();
// If there are no index store, rebuild all indexes.
@@ -86,13 +88,24 @@ public class IndexesRebuildTask {
// Closure prepared, do rebuild.
cctx.kernalContext().query().markAsRebuildNeeded(cctx, true);
+ IgniteLogger log = cctx.kernalContext().grid().log();
+
+ if (recreate) {
+ cctx.kernalContext().query().markIndexRecreate(cctx);
+
+ cctx.group().indexWalEnabled(false);
+
+ if (log.isInfoEnabled()) {
+ log.info("WAL disabled for index partition " +
+ "[name=" + cctx.group().name() + ", id=" + cctx.groupId() + ']');
+ }
+ }
+
GridFutureAdapter<Void> rebuildCacheIdxFut = new GridFutureAdapter<>();
// To avoid possible data race.
GridFutureAdapter<Void> outRebuildCacheIdxFut = new GridFutureAdapter<>();
- IgniteLogger log = cctx.kernalContext().grid().log();
-
// An internal future for the ability to cancel index rebuilding.
SchemaIndexCacheFuture intRebFut = new SchemaIndexCacheFuture(cancelTok);
@@ -101,7 +114,7 @@ public class IndexesRebuildTask {
// Check that the previous rebuild is completed.
assert prevIntRebFut == null;
- cctx.kernalContext().query().onStartRebuildIndexes(cctx);
+ cctx.kernalContext().query().onStartRebuildIndexes(cctx, recreate);
rebuildCacheIdxFut.listen(fut -> {
Throwable err = fut.error();
@@ -121,6 +134,28 @@ public class IndexesRebuildTask {
cctx.kernalContext().query().onFinishRebuildIndexes(cctx);
idxRebuildFuts.remove(cctx.cacheId(), intRebFut);
+
+ if (recreate) {
+ boolean recreateContinues = false;
+
+ for (GridCacheContext<?, ?> cctx0 : cctx.group().caches()) {
+ if (idxRebuildFuts.containsKey(cctx0.cacheId())) {
+ recreateContinues = true;
+
+ break;
+ }
+ }
+
+ if (!recreateContinues) {
+ cctx.group().indexWalEnabled(true);
+
+ if (log.isInfoEnabled()) {
+ log.info("WAL enabled for index partition " +
+ "[name=" + cctx.group().name() + ", id=" + cctx.group().groupId() + ']');
+ }
+ }
+ }
+
intRebFut.onDone(err);
outRebuildCacheIdxFut.onDone(err);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
index 957ff0b58ac..a87a46a6b45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
@@ -48,11 +48,11 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh
*
* @param cacheId Cache id.
* @param partitions Partitions count.
- * @param workingDir Working directory.
+ * @param cacheName Cache name.
* @param pageMetrics Page metrics.
* @throws IgniteCheckedException If failed.
*/
- public void initialize(int cacheId, int partitions, String workingDir, PageMetrics pageMetrics)
+ public void initialize(int cacheId, int partitions, String cacheName, PageMetrics pageMetrics)
throws IgniteCheckedException;
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index 98d99d613cc..a2e1d481129 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -209,8 +209,9 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
* Checks WAL disabled for cache group.
*
* @param grpId Group id.
+ * @param pageId Page id.
*/
- public boolean disabled(int grpId);
+ public boolean disabled(int grpId, long pageId);
/**
* Getting local WAL segment size.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 28dc32fe20f..38b9bd1ef15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -37,6 +37,7 @@ import org.apache.ignite.configuration.TopologyValidator;
import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.metric.IoStatisticsHolderCache;
import org.apache.ignite.internal.metric.IoStatisticsHolderIndex;
@@ -176,6 +177,9 @@ public class CacheGroupContext {
/** */
private volatile boolean globalWalEnabled;
+ /** @see IndexesRebuildTask */
+ private volatile boolean idxWalEnabled;
+
/** Flag indicates that cache group is under recovering and not attached to topology. */
private final AtomicBoolean recoveryMode;
@@ -248,6 +252,7 @@ public class CacheGroupContext {
this.globalWalEnabled = walEnabled;
this.persistenceEnabled = persistenceEnabled;
this.localWalEnabled = true;
+ this.idxWalEnabled = true;
this.recoveryMode = new AtomicBoolean(recoveryMode);
this.compressHandler = compressHandler;
@@ -1235,6 +1240,16 @@ public class CacheGroupContext {
return globalWalEnabled;
}
+ /** @return {@code True} if WAL for index operations enabled. */
+ public boolean indexWalEnabled() {
+ return idxWalEnabled;
+ }
+
+ /** @param idxWalEnabled Index WAL enabled flag. */
+ public void indexWalEnabled(boolean idxWalEnabled) {
+ this.idxWalEnabled = idxWalEnabled;
+ }
+
/**
* @param enabled Global WAL enabled flag.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 75c093a34a7..1b7a104f074 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -65,6 +66,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED;
import static org.apache.ignite.internal.GridTopic.TOPIC_WAL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
@@ -1013,12 +1015,14 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
* Checks WAL disabled for cache group.
*
* @param grpId Group id.
+ * @param pageId Page id.
* @return {@code True} if WAL disable for group. {@code False} If not.
*/
- public boolean isDisabled(int grpId) {
+ public boolean isDisabled(int grpId, long pageId) {
CacheGroupContext ctx = cctx.cache().cacheGroup(grpId);
- return ctx != null && !ctx.walEnabled();
+ return ctx != null && (!ctx.walEnabled()
+ || (!ctx.indexWalEnabled() && PageIdUtils.partId(pageId) == INDEX_PARTITION));
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index b4fbc232033..726321d4e74 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.pagemem.store.PageStoreCollection;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
@@ -71,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMetri
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageReadWriteManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageReadWriteManagerImpl;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.GridStripedReadWriteLock;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -437,17 +439,23 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
}
/** {@inheritDoc} */
- @Override public void initialize(int cacheId, int partitions, String workingDir, PageMetrics pageMetrics)
+ @Override public void initialize(int cacheId, int partitions, String cacheName, PageMetrics pageMetrics)
throws IgniteCheckedException {
assert storeWorkDir != null;
if (!idxCacheStores.containsKey(cacheId)) {
+ GridCacheContext<?, ?> cctx = this.cctx.cacheContext(cacheId);
+
CacheStoreHolder holder = initDir(
- new File(storeWorkDir, workingDir),
+ new File(storeWorkDir, cacheName),
cacheId,
+ cacheName,
partitions,
pageMetrics,
- cctx.cacheContext(cacheId) != null && cctx.cacheContext(cacheId).config().isEncryptionEnabled()
+ cctx != null && cctx.config().isEncryptionEnabled(),
+ cctx != null
+ ? cctx.group().caches().stream().map(GridCacheContext::name).collect(Collectors.toSet())
+ : null
);
CacheStoreHolder old = idxCacheStores.put(cacheId, holder);
@@ -484,9 +492,11 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
CacheStoreHolder holder = initDir(
new File(storeWorkDir, MetaStorage.METASTORAGE_DIR_NAME),
grpId,
+ MetaStorage.METASTORAGE_CACHE_NAME,
MetaStorage.METASTORAGE_PARTITIONS.size(),
pageMetrics,
- false);
+ false,
+ null);
CacheStoreHolder old = idxCacheStores.put(grpId, holder);
@@ -586,9 +596,11 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
return initDir(
cacheWorkDir,
grpDesc.groupId(),
+ ccfg.getName(),
grpDesc.config().getAffinity().partitions(),
pageMetrics,
- ccfg.isEncryptionEnabled()
+ ccfg.isEncryptionEnabled(),
+ grpDesc.caches().keySet()
);
}
@@ -657,6 +669,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
/**
* @param cacheWorkDir Work directory.
* @param grpId Group ID.
+ * @param cacheName Cache name.
* @param partitions Number of partitions.
* @param pageMetrics Page metrics.
* @param encrypted {@code True} if this cache encrypted.
@@ -665,9 +678,11 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
*/
private CacheStoreHolder initDir(File cacheWorkDir,
int grpId,
+ String cacheName,
int partitions,
PageMetrics pageMetrics,
- boolean encrypted) throws IgniteCheckedException {
+ boolean encrypted,
+ Collection<String> grpCaches) throws IgniteCheckedException {
try {
boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir, log);
@@ -680,6 +695,25 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
File idxFile = new File(cacheWorkDir, INDEX_FILE_NAME);
+ GridQueryProcessor qryProc = cctx.kernalContext().query();
+
+ if (qryProc.moduleEnabled()) {
+ boolean idxRecreating = grpCaches == null
+ ? !qryProc.recreateCompleted(cacheName)
+ : grpCaches.stream().anyMatch(name -> !qryProc.recreateCompleted(name));
+
+ if (idxFile.exists() && idxRecreating) {
+ log.warning("Recreate of index.bin don't finish before node stop, index.bin can be inconsistent. " +
+ "Removing it to recreate one more time [grpId=" + grpId + ", cacheName=" + cacheName + ']');
+
+ if (!idxFile.delete()) {
+ throw new IgniteCheckedException(
+ "Failed to remove index.bin [grpId=" + grpId + ", cacheName=" + cacheName + ']'
+ );
+ }
+ }
+ }
+
if (dirExisted && !idxFile.exists())
grpsWithoutIdx.add(grpId);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index d10372b1691..ef2ac4336eb 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -621,7 +621,7 @@ public class PageMemoryImpl implements PageMemoryEx {
trackingIO.initNewPage(pageAddr, pageId, realPageSize(grpId), metrics);
- if (!ctx.wal().disabled(fullId.groupId())) {
+ if (!ctx.wal().disabled(fullId.groupId(), fullId.pageId())) {
if (!ctx.wal().isAlwaysWriteFullPages())
ctx.wal().log(
new InitNewPageRecord(
@@ -1893,7 +1893,7 @@ public class PageMemoryImpl implements PageMemoryEx {
*
*/
void beforeReleaseWrite(FullPageId pageId, long ptr, boolean pageWalRec) throws IgniteCheckedException {
- boolean walIsNotDisabled = walMgr != null && !walMgr.disabled(pageId.groupId());
+ boolean walIsNotDisabled = walMgr != null && !walMgr.disabled(pageId.groupId(), pageId.pageId());
boolean pageRecOrAlwaysWriteFullPage = walMgr != null && (pageWalRec || walMgr.isAlwaysWriteFullPages());
if (pageRecOrAlwaysWriteFullPage && walIsNotDisabled)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
index fe651ddd870..5fb36db276a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
@@ -490,7 +490,7 @@ public abstract class PageHandler<X, R> {
@Nullable Boolean walPlc) {
// If the page is clean, then it is either newly allocated or just after checkpoint.
// In both cases we have to write full page contents to WAL.
- return wal != null && !wal.isAlwaysWriteFullPages() && walPlc != TRUE && !wal.disabled(cacheId) &&
+ return wal != null && !wal.isAlwaysWriteFullPages() && walPlc != TRUE && !wal.disabled(cacheId, pageId) &&
(walPlc == FALSE || pageMem.isDirty(cacheId, pageId, page));
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 1c07f5e279c..3b98be3688e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1256,8 +1256,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
/** {@inheritDoc} */
- @Override public boolean disabled(int grpId) {
- return cctx.walState().isDisabled(grpId);
+ @Override public boolean disabled(int grpId, long pageId) {
+ return cctx.walState().isDisabled(grpId, pageId);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 0f57a3a3e56..7556fdc17d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -4311,11 +4311,20 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* the rebuilding of the indexes has been {@link #rebuildIndexesCompleted}.
*
* @param cacheCtx Cache context.
+ * @param recreate {@code True} if index.bin recreating.
* @see #onFinishRebuildIndexes
* @see #rebuildIndexesCompleted
*/
- public void onStartRebuildIndexes(GridCacheContext cacheCtx) {
- idxBuildStatusStorage.onStartRebuildIndexes(cacheCtx);
+ public void onStartRebuildIndexes(GridCacheContext cacheCtx, boolean recreate) {
+ idxBuildStatusStorage.onStartRebuildIndexes(cacheCtx, recreate);
+ }
+
+ /**
+ * Mark that index.bin recreating in progress.
+ * @param cacheCtx Cache context.
+ */
+ public void markIndexRecreate(GridCacheContext cacheCtx) {
+ idxBuildStatusStorage.markIndexRecreate(cacheCtx);
}
/**
@@ -4341,6 +4350,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
return idxBuildStatusStorage.rebuildCompleted(cacheCtx.name());
}
+ /**
+ * @param cacheName Cache name.
+ * @return {@code True} if index.bin recreating completed.
+ */
+ public boolean recreateCompleted(String cacheName) {
+ return idxBuildStatusStorage.recreateCompleted(cacheName);
+ }
+
/**
* Force a mark that the index rebuild for the cache has completed.
* <p/>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusHolder.java
index 732aa3c10db..221e6b5c3d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusHolder.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusHolder.java
@@ -54,6 +54,9 @@ public class IndexBuildStatusHolder {
/** Rebuilding indexes. Guarded by {@code this}. */
private boolean rebuild;
+ /** {@code True} if index.bin recreating, otherwise building a new index. */
+ private boolean recreate;
+
/** Count of new indexes being built. Guarded by {@code this}. */
private int newIdx;
@@ -62,22 +65,26 @@ public class IndexBuildStatusHolder {
*
* @param persistent Persistent cache.
* @param rebuild {@code True} if rebuilding indexes, otherwise building a new index.
+ * @param recreate {@code True} if index.bin recreating, otherwise building a new index.
*/
- public IndexBuildStatusHolder(boolean persistent, boolean rebuild) {
+ public IndexBuildStatusHolder(boolean persistent, boolean rebuild, boolean recreate) {
this.persistent = persistent;
- onStartOperation(rebuild);
+ onStartOperation(rebuild, recreate);
}
/**
* Callback on the start of of the operation.
*
* @param rebuild {@code True} if rebuilding indexes, otherwise building a new index.
+ * @param recreate {@code True} if index.bin recreating, {@code false} otherwise.
* @see #onFinishOperation
*/
- public synchronized void onStartOperation(boolean rebuild) {
+ public synchronized void onStartOperation(boolean rebuild, boolean recreate) {
status = INIT;
+ this.recreate = recreate;
+
if (rebuild)
this.rebuild = true;
else {
@@ -149,6 +156,11 @@ public class IndexBuildStatusHolder {
return rebuild;
}
+ /** @return {@code True} if index.bin recreating, otherwise building a new index. */
+ public synchronized boolean recreate() {
+ return recreate;
+ }
+
/**
* Getting the count of new indexes that are currently being built.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusStorage.java
index 469ece55f16..d48a79c2ab5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexBuildStatusStorage.java
@@ -107,10 +107,11 @@ public class IndexBuildStatusStorage implements MetastorageLifecycleListener, Ch
* the indexes are automatically rebuilt.
*
* @param cacheCtx Cache context.
+ * @param recreate {@code True} if index.bin recreating, otherwise building a new index.
* @see #onFinishRebuildIndexes
*/
- public void onStartRebuildIndexes(GridCacheContext cacheCtx) {
- onStartOperation(cacheCtx, true);
+ public void onStartRebuildIndexes(GridCacheContext cacheCtx, boolean recreate) {
+ onStartOperation(cacheCtx, true, recreate);
}
/**
@@ -124,7 +125,12 @@ public class IndexBuildStatusStorage implements MetastorageLifecycleListener, Ch
* @see #onFinishBuildNewIndex
*/
public void onStartBuildNewIndex(GridCacheContext cacheCtx) {
- onStartOperation(cacheCtx, false);
+ onStartOperation(cacheCtx, false, false);
+ }
+
+ /** Mark that index.bin recreate in progress. */
+ public void markIndexRecreate(GridCacheContext<?, ?> cacheCtx) {
+ onStartOperation(cacheCtx, true, true);
}
/**
@@ -169,6 +175,18 @@ public class IndexBuildStatusStorage implements MetastorageLifecycleListener, Ch
return status == null || !status.rebuild();
}
+ /**
+ * Check if index.bin recreating for the cache has been completed.
+ *
+ * @param cacheName Cache name.
+ * @return {@code True} if index.bin recreate completed.
+ */
+ public boolean recreateCompleted(String cacheName) {
+ IndexBuildStatusHolder status = statuses.get(cacheName);
+
+ return status == null || !status.recreate();
+ }
+
/** {@inheritDoc} */
@Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
@@ -188,7 +206,10 @@ public class IndexBuildStatusStorage implements MetastorageLifecycleListener, Ch
(k, v) -> {
IndexRebuildCacheInfo cacheInfo = (IndexRebuildCacheInfo)v;
- statuses.put(cacheInfo.cacheName(), new IndexBuildStatusHolder(true, true));
+ statuses.put(
+ cacheInfo.cacheName(),
+ new IndexBuildStatusHolder(true, true, cacheInfo.recreate())
+ );
},
true
);
@@ -296,9 +317,10 @@ public class IndexBuildStatusStorage implements MetastorageLifecycleListener, Ch
*
* @param cacheCtx Cache context.
* @param rebuild {@code True} if rebuilding indexes, otherwise building a new index.
+ * @param recreate {@code True} if index.bin recreating, {@code false} otherwise.
* @see #onFinishOperation
*/
- private void onStartOperation(GridCacheContext cacheCtx, boolean rebuild) {
+ private void onStartOperation(GridCacheContext cacheCtx, boolean rebuild, boolean recreate) {
if (!stopNodeLock.enterBusy())
throw new IgniteException("Node is stopping.");
@@ -308,19 +330,19 @@ public class IndexBuildStatusStorage implements MetastorageLifecycleListener, Ch
statuses.compute(cacheName, (k, prev) -> {
if (prev != null) {
- prev.onStartOperation(rebuild);
+ prev.onStartOperation(rebuild, recreate);
return prev;
}
else
- return new IndexBuildStatusHolder(persistent, rebuild);
+ return new IndexBuildStatusHolder(persistent, rebuild, recreate);
});
if (persistent) {
metaStorageOperation(metaStorage -> {
assert metaStorage != null;
- metaStorage.write(metaStorageKey(cacheName), new IndexRebuildCacheInfo(cacheName));
+ metaStorage.write(metaStorageKey(cacheName), new IndexRebuildCacheInfo(cacheName, recreate));
});
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildCacheInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildCacheInfo.java
index 4bab971876c..a072c804d2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildCacheInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/aware/IndexRebuildCacheInfo.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -36,6 +37,12 @@ public class IndexRebuildCacheInfo extends IgniteDataTransferObject {
/** Cache name. */
private String cacheName;
+ /**
+ * {@code True} if index.bin recreating, {@code false} otherwise.
+ * @see IndexesRebuildTask
+ */
+ private boolean recreate;
+
/**
* Default constructor for {@link Externalizable}.
*/
@@ -46,14 +53,17 @@ public class IndexRebuildCacheInfo extends IgniteDataTransferObject {
* Constructor.
*
* @param cacheName Cache name.
+ * @param recreate {@code True} if index.bin recreating, {@code false} otherwise.
*/
- public IndexRebuildCacheInfo(String cacheName) {
+ public IndexRebuildCacheInfo(String cacheName, boolean recreate) {
this.cacheName = cacheName;
+ this.recreate = recreate;
}
/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws IOException {
U.writeLongString(out, cacheName);
+ out.writeBoolean(recreate);
}
/** {@inheritDoc} */
@@ -62,6 +72,7 @@ public class IndexRebuildCacheInfo extends IgniteDataTransferObject {
ObjectInput in
) throws IOException, ClassNotFoundException {
cacheName = U.readLongString(in);
+ recreate = protoVer == V2 && in.readBoolean();
}
/**
@@ -73,6 +84,16 @@ public class IndexRebuildCacheInfo extends IgniteDataTransferObject {
return cacheName;
}
+ /** @return {@code True} if index.bin recreating, {@code false} otherwise. */
+ public boolean recreate() {
+ return recreate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte getProtocolVersion() {
+ return V2;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IndexRebuildCacheInfo.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/management/SortedIndexDescriptorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/management/SortedIndexDescriptorFactory.java
index 0bbeda52a0b..a5e9c83a5bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/management/SortedIndexDescriptorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/management/SortedIndexDescriptorFactory.java
@@ -44,6 +44,9 @@ public class SortedIndexDescriptorFactory extends AbstractIndexDescriptorFactory
/** */
private static final InlineIndexFactory SORTED_IDX_FACTORY = InlineIndexFactory.INSTANCE;
+ /** */
+ public static final String H2_TREE = "H2Tree";
+
/** */
private final IgniteLogger log;
@@ -97,7 +100,7 @@ public class SortedIndexDescriptorFactory extends AbstractIndexDescriptorFactory
int typeId = cctx.binaryMarshaller() ? typeDesc.typeId() : typeDesc.valueClass().hashCode();
- String treeName = BPlusTree.treeName(typeId + "_" + idxName, "H2Tree");
+ String treeName = BPlusTree.treeName(typeId + "_" + idxName, H2_TREE);
if (!ctx.indexProcessor().useUnwrappedPk(cctx, treeName))
idxCols = wrappedCols;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index eccef97f56c..1dac0437bf7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -118,7 +118,7 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager {
}
/** {@inheritDoc} */
- @Override public boolean disabled(int grpId) {
+ @Override public boolean disabled(int grpId, long pageId) {
return false;
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/WalDisabledDuringIndexRecreateTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/WalDisabledDuringIndexRecreateTest.java
new file mode 100644
index 00000000000..21d6e42d527
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/WalDisabledDuringIndexRecreateTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.database;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryBasicIdMapper;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcIndexRebuildTest.TestVal;
+import org.apache.ignite.internal.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware;
+import org.apache.ignite.internal.pagemem.wal.record.delta.InsertRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
+import org.apache.ignite.internal.processors.cache.persistence.IndexStorageImpl;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH;
+import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_PATH;
+import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheGroupId;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static org.apache.ignite.internal.processors.query.schema.management.SortedIndexDescriptorFactory.H2_TREE;
+
+/** */
+@RunWith(Parameterized.class)
+public class WalDisabledDuringIndexRecreateTest extends GridCommonAbstractTest {
+ /** Batches count. */
+ public static final int ENTRIES_CNT = 1_000;
+
+ /** */
+ public static final String GRP_NAME = "my-group";
+
+ /** */
+ public static final int GRP_CACHES_CNT = 3;
+
+ /** */
+ public static final long UNKNOWN_PAGE_ID = -1;
+
+ /** */
+ private ListeningTestLogger testLog;
+
+ /** */
+ @Parameterized.Parameter()
+ public boolean cacheGrps;
+
+ /** */
+ @Parameterized.Parameters(name = "cacheGroups={0}")
+ public static Iterable<Object> data() {
+ return Arrays.asList(true, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setGridLogger(testLog)
+ .setConsistentId(igniteInstanceName)
+ .setClusterStateOnStart(INACTIVE)
+ .setFailureHandler(new StopNodeFailureHandler())
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(true))
+ .setMaxWalArchiveSize(UNLIMITED_WAL_ARCHIVE));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ @Test
+ public void testDisabled() throws Exception {
+ WALPointer walStartPtr = createDataAndDeleteIndexBin();
+
+ testLog = new ListeningTestLogger(log);
+
+ awaitRebuild();
+
+ assertEquals(
+ 0,
+ countWalGrpRecords(wp -> wp.compareTo(walStartPtr) > 0, CU.cacheGroupId(cacheName(), cacheGroupName()))
+ );
+ }
+
+ /** */
+ @Test
+ public void testRestartInCaseOfFailure() throws Exception {
+ WALPointer walStartPtr = createDataAndDeleteIndexBin();
+
+ String treeName = BPlusTree.treeName(
+ new BinaryBasicIdMapper().typeId(TestVal.class.getName()) + "_"
+ + TestVal.class.getSimpleName().toUpperCase() + "_F0_IDX",
+ H2_TREE
+ );
+
+ AtomicInteger errCntr = new AtomicInteger(10);
+
+ // Setting up wrapper that will fail on 10 put operation.
+ BPlusTree.testHndWrapper = (tree, hnd) -> {
+ if (!tree.name().equals(treeName))
+ return hnd;
+
+ PageHandler<Object, BPlusTree.Result> delegate = (PageHandler<Object, BPlusTree.Result>)hnd;
+
+ return new PageHandler<BPlusTree.Get, BPlusTree.Result>() {
+ @Override public BPlusTree.Result run(
+ int cacheId,
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIO io,
+ Boolean walPlc,
+ BPlusTree.Get arg,
+ int lvl,
+ IoStatisticsHolder statHolder
+ ) throws IgniteCheckedException {
+ if (arg instanceof BPlusTree.Put && errCntr.decrementAndGet() == 0)
+ throw new IgniteCheckedException("Test error on 10 put"); // Node failure during index rebuild.
+
+ return delegate.run(cacheId, pageId, page, pageAddr, io, walPlc, arg, lvl, statHolder);
+ }
+
+ @Override public boolean releaseAfterWrite(
+ int cacheId, long pageId, long page, long pageAddr, BPlusTree.Get g, int lvl
+ ) {
+ return g.canRelease(pageId, lvl);
+ }
+ };
+ };
+
+ try {
+ testLog = new ListeningTestLogger(log);
+
+ awaitRebuild();
+
+ assertTrue("Rebuild must not succeed", false);
+ }
+ catch (Exception ignore) {
+ // No-op
+ }
+ finally {
+ BPlusTree.testHndWrapper = null;
+
+ stopAllGrids();
+ }
+
+ testLog = new ListeningTestLogger(log);
+
+ LogListener lsnr = LogListener.matches(
+ "Recreate of index.bin don't finish before node stop, index.bin can be inconsistent. " +
+ "Removing it to recreate one more time " +
+ "[grpId=" + cacheGroupId(cacheName(), cacheGroupName())
+ ).build();
+
+ testLog.registerListener(lsnr);
+
+ awaitRebuild();
+
+ assertTrue(lsnr.check());
+
+ assertEquals(
+ 0,
+ countWalGrpRecords(wp -> wp.compareTo(walStartPtr) > 0, cacheGroupId(cacheName(), cacheGroupName()))
+ );
+ }
+
+ /** */
+ private void awaitRebuild() throws Exception {
+ LogListener walDisabledLsnr = LogListener.matches(
+ "WAL disabled for index partition " +
+ "[name=" + cacheGroupName() + ", id=" + cacheGroupId(cacheName(), cacheGroupName()) + ']'
+ ).build();
+
+ LogListener walEnabledLsnr = LogListener.matches(
+ "WAL enabled for index partition " +
+ "[name=" + cacheGroupName() + ", id=" + cacheGroupId(cacheName(), cacheGroupName()) + ']'
+ ).build();
+
+ testLog.registerListener(walDisabledLsnr);
+ testLog.registerListener(walEnabledLsnr);
+
+ IgniteEx srv = startGrid(0);
+
+ srv.cluster().state(ACTIVE);
+
+ for (int i = 0; i < cachesCnt(); i++) {
+ IgniteInternalFuture<?> rbldFut = indexRebuildFuture(srv, cacheId(DEFAULT_CACHE_NAME + i));
+
+ if (rbldFut != null)
+ rbldFut.get(10_000);
+ }
+
+ assertTrue(srv.<Integer, TestVal>cache(cacheName()).containsKey(0));
+
+ assertTrue(walDisabledLsnr.check());
+ assertTrue(walEnabledLsnr.check());
+
+ checkIdxFile();
+ }
+
+ /** */
+ private WALPointer createDataAndDeleteIndexBin() throws Exception {
+ IgniteEx srv = startGrid(0);
+
+ srv.cluster().state(ACTIVE);
+
+ for (int i = 0; i < cachesCnt(); i++)
+ produceData(srv, DEFAULT_CACHE_NAME + i);
+
+ WALPointer walPrtBefore = srv.context().cache().context().wal().lastWritePointer();
+
+ File idx = checkIdxFile();
+
+ stopGrid(0);
+
+ U.delete(idx);
+
+ return walPrtBefore;
+ }
+
+ /** */
+ private void produceData(IgniteEx srv, String cacheName) {
+ IgniteCache<Integer, TestVal> cache = srv.getOrCreateCache(
+ new CacheConfiguration<Integer, TestVal>(cacheName)
+ .setGroupName(cacheGroupName())
+ .setIndexedTypes(Integer.class, TestVal.class));
+
+ for (int i = 0; i < ENTRIES_CNT; i++)
+ cache.put(i, new TestVal());
+
+ assertEquals(
+ TestVal.class.getDeclaredFields().length + 1,
+ srv.context().indexProcessor().indexes(cacheName).size()
+ );
+ }
+
+ /** */
+ private File checkIdxFile() throws IgniteCheckedException {
+ String dirName = cacheGrps ? (CACHE_GRP_DIR_PREFIX + cacheGroupName()) : (CACHE_DIR_PREFIX + cacheName());
+
+ File idxFile = new File(U.resolveWorkDirectory(
+ U.defaultWorkDirectory(),
+ DFLT_STORE_DIR + File.separatorChar + grid(0).name().replace(".", "_") + File.separatorChar + dirName,
+ false
+ ), INDEX_FILE_NAME);
+
+ assertTrue("Index file not found", idxFile.exists());
+
+ return idxFile;
+ }
+
+ /** @param filter Predicate. */
+ private long countWalGrpRecords(
+ Predicate<WALPointer> filter,
+ long grpId
+ ) throws IgniteCheckedException {
+ String dir = grid(0).name().replace(".", "_");
+
+ IteratorParametersBuilder walIterBldr = new IteratorParametersBuilder()
+ .filesOrDirs(
+ U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_WAL_PATH + "/" + dir, false),
+ U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_WAL_ARCHIVE_PATH + "/" + dir, false)
+ )
+ .filter((rt, ptr) -> filter.test(ptr));
+
+ long cntGrpRecs = 0;
+
+ try (WALIterator walIter = new IgniteWalIteratorFactory(log).iterator(walIterBldr)) {
+ while (walIter.hasNext()) {
+ IgniteBiTuple<WALPointer, WALRecord> rec = walIter.next();
+
+ if (!filter.test(rec.get1()))
+ continue;
+
+ if (rec.get2() instanceof InsertRecord
+ && ((WalRecordCacheGroupAware)rec.get2()).groupId() == grpId) {
+
+ BPlusIO<?> io = ((InsertRecord)rec.get2()).io();
+
+ if (io instanceof IndexStorageImpl.MetaStoreLeafIO
+ || io instanceof IndexStorageImpl.MetaStoreInnerIO)
+ continue;
+
+ long pageId = pageId(rec.get2());
+
+ if (pageId != UNKNOWN_PAGE_ID && PageIdUtils.partId(pageId) != INDEX_PARTITION)
+ continue;
+
+ cntGrpRecs++;
+ }
+ }
+ }
+
+ return cntGrpRecs;
+ }
+
+ /** */
+ public long pageId(WALRecord rec) {
+ if (rec instanceof PageDeltaRecord)
+ return ((PageDeltaRecord)rec).pageId();
+ else if (rec instanceof PageSnapshot)
+ return ((PageSnapshot)rec).fullPageId().pageId();
+
+ return UNKNOWN_PAGE_ID;
+ }
+
+ /** */
+ private String cacheName() {
+ return DEFAULT_CACHE_NAME + (cacheGrps ? Integer.toString(GRP_CACHES_CNT - 1) : "0");
+ }
+
+ /** */
+ private String cacheGroupName() {
+ return cacheGrps ? GRP_NAME : null;
+ }
+
+ /** */
+ private int cachesCnt() {
+ return cacheGrps ? GRP_CACHES_CNT : 1;
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index f267b04029b..a127384a1b9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.database.IgniteTwoRegionsRebuildInd
import org.apache.ignite.internal.processors.database.RebuildIndexTest;
import org.apache.ignite.internal.processors.database.RebuildIndexWithHistoricalRebalanceTest;
import org.apache.ignite.internal.processors.database.RebuildIndexWithMVCCTest;
+import org.apache.ignite.internal.processors.database.WalDisabledDuringIndexRecreateTest;
import org.apache.ignite.internal.processors.query.h2.maintenance.MaintenanceRebuildIndexUtilsSelfTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -61,6 +62,7 @@ import org.junit.runners.Suite;
IndexingMultithreadedLoadContinuousRestartTest.class,
LongDestroyDurableBackgroundTaskTest.class,
RebuildIndexTest.class,
+ WalDisabledDuringIndexRecreateTest.class,
RebuildIndexWithMVCCTest.class,
ClientReconnectWithSqlTableConfiguredTest.class,
MultipleParallelCacheDeleteDeadlockTest.class,