You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2020/12/03 07:39:01 UTC
[ignite] branch master updated: IGNITE-13190 Native Persistence
Defragmentation core functionality - Fixes #7984.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 09d5c73 IGNITE-13190 Native Persistence Defragmentation core functionality - Fixes #7984.
09d5c73 is described below
commit 09d5c73c467acf13408d22ab4198bff6c2c7d229
Author: ibessonov <be...@gmail.com>
AuthorDate: Thu Dec 3 10:34:58 2020 +0300
IGNITE-13190 Native Persistence Defragmentation core functionality - Fixes #7984.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../org/apache/ignite/IgniteSystemProperties.java | 14 +
.../internal/maintenance/MaintenanceProcessor.java | 2 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 2 +-
.../GridCacheDatabaseSharedManager.java | 199 ++++-
.../cache/persistence/GridCacheOffheapManager.java | 17 +-
.../IgniteCacheDatabaseSharedManager.java | 2 +-
.../checkpoint/LightweightCheckpointManager.java | 6 +-
.../CachePartitionDefragmentationManager.java | 827 +++++++++++++++++++++
.../defragmentation/DefragmentationFileUtils.java | 401 ++++++++++
.../DefragmentationPageReadWriteManager.java | 37 +
.../cache/persistence/defragmentation/LinkMap.java | 276 +++++++
.../persistence/defragmentation/PageStoreMap.java | 106 +++
.../persistence/defragmentation/TreeIterator.java | 109 +++
.../maintenance/DefragmentationParameters.java | 78 ++
.../DefragmentationWorkflowCallback.java | 66 ++
.../maintenance/ExecuteDefragmentationAction.java | 74 ++
.../persistence/file/FilePageStoreManager.java | 9 +
.../cache/persistence/tree/io/PageIO.java | 13 +
.../processors/query/GridQueryIndexing.java | 24 +
.../internal/util/collection/IntHashMap.java | 26 +
.../ignite/internal/util/collection/IntMap.java | 6 +
.../internal/util/collection/IntRWHashMap.java | 22 +
.../util/tostring/GridToStringBuilder.java | 46 ++
.../ignite/maintenance/MaintenanceRegistry.java | 19 +
.../IgnitePdsDefragmentationEncryptionTest.java | 43 ++
...itePdsDefragmentationRandomLruEvictionTest.java | 35 +
.../persistence/IgnitePdsDefragmentationTest.java | 541 ++++++++++++++
.../db/checkpoint/LightweightCheckpointTest.java | 4 +-
.../persistence/defragmentation/LinkMapTest.java | 83 +++
.../processors/query/DummyQueryIndexing.java | 16 +
.../ignite/testsuites/IgniteBasicTestSuite.java | 3 +
.../ignite/testsuites/IgnitePdsMvccTestSuite4.java | 8 +
.../ignite/testsuites/IgnitePdsTestSuite4.java | 8 +
.../processors/query/h2/IgniteH2Indexing.java | 19 +
.../defragmentation/IndexingDefragmentation.java | 430 +++++++++++
.../cache/IgniteCacheUpdateSqlQuerySelfTest.java | 4 +-
.../IgnitePdsIndexingDefragmentationTest.java | 316 ++++++++
.../testsuites/IgnitePdsWithIndexingTestSuite.java | 4 +-
38 files changed, 3877 insertions(+), 18 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 148e86d..aa12e54 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -81,6 +81,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.preloa
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition.DFLT_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition.DFLT_CACHE_REMOVE_ENTRIES_TTL;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager.DFLT_MVCC_TX_SIZE_CACHING_THRESHOLD;
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE;
import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_PDS_WAL_REBALANCE_THRESHOLD;
import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory.DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointWorkflow.DFLT_CHECKPOINT_PARALLEL_SORT_THRESHOLD;
@@ -1949,6 +1950,19 @@ public final class IgniteSystemProperties {
public static final String IGNITE_TEST_ENV = "IGNITE_TEST_ENV";
/**
+ * Defragmentation region size percentage of configured region size.
+ * This percentage will be calculated from largest configured region size and then proportionally subtracted
+ * from all configured regions.
+ */
+ @SystemProperty(value = "Defragmentation region size percentage of configured region size. " +
+ "This percentage will be calculated from largest configured region size and then proportionally subtracted " +
+ "from all configured regions",
+ type = Integer.class,
+ defaults = "" + DFLT_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE)
+ public static final String IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE =
+ "IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE";
+
+ /**
* Enforces singleton.
*/
private IgniteSystemProperties() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java
index 347b328..063bd47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/maintenance/MaintenanceProcessor.java
@@ -190,7 +190,7 @@ public class MaintenanceProcessor extends GridProcessorAdapter implements Mainte
*/
private void proceedWithMaintenance() {
for (Map.Entry<String, MaintenanceWorkflowCallback> cbE : workflowCallbacks.entrySet()) {
- MaintenanceAction mntcAct = cbE.getValue().automaticAction();
+ MaintenanceAction<?> mntcAct = cbE.getValue().automaticAction();
if (mntcAct != null) {
try {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 024287f..773297f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1596,7 +1596,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return grp.mvccEnabled() ? dataTree.isEmpty() : storageSize.get() == 0;
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to perform operation.", e);
+ U.error(grp.shared().logger(IgniteCacheOffheapManagerImpl.class), "Failed to perform operation.", e);
return false;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 8f6f683..2c366eb 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -117,7 +117,11 @@ import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkp
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointStatus;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.Checkpointer;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.LightweightCheckpointManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.ReservationReason;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationPageReadWriteManager;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationWorkflowCallback;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
@@ -145,6 +149,7 @@ import org.apache.ignite.internal.util.lang.GridInClosure3X;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
@@ -162,6 +167,7 @@ import org.jetbrains.annotations.Nullable;
import static java.util.Objects.nonNull;
import static java.util.function.Function.identity;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_RECOVERY_SEMAPHORE_PERMITS;
@@ -178,7 +184,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.topolo
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_RELEASED;
import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_LOCK_HOLD_COUNT;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.DEFRAGMENTATION_MNTC_TASK_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters.fromStore;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CORRUPTED_DATA_FILES_MNTC_TASK_NAME;
+import static org.apache.ignite.internal.util.IgniteUtils.GB;
import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize;
/**
@@ -207,6 +216,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Description of the system view for a {@link MetaStorage}. */
public static final String METASTORE_VIEW_DESC = "Local metastorage data";
+ /** */
+ public static final String DEFRAGMENTATION_PART_REGION_NAME = "defragPartitionsDataRegion";
+
+ /** */
+ public static final String DEFRAGMENTATION_MAPPING_REGION_NAME = "defragMappingDataRegion";
+
/**
* Threshold to calculate limit for pages list on-heap caches.
* <p>
@@ -223,6 +238,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** @see IgniteSystemProperties#IGNITE_PDS_WAL_REBALANCE_THRESHOLD */
public static final int DFLT_PDS_WAL_REBALANCE_THRESHOLD = 500;
+ /** @see IgniteSystemProperties#IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE */
+ public static final int DFLT_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE = 60;
+
/** */
private final int walRebalanceThreshold =
getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, DFLT_PDS_WAL_REBALANCE_THRESHOLD);
@@ -234,6 +252,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private final String throttlingPolicyOverride = IgniteSystemProperties.getString(
IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED);
+ /** Defragmentation regions size percentage of configured ones. */
+ private final int defragmentationRegionSizePercentageOfConfiguredSize =
+ getInteger(IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE, DFLT_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE);
+
/** */
private static final String MBEAN_NAME = "DataStorageMetrics";
@@ -318,6 +340,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Lock for releasing history for preloading. */
private ReentrantLock releaseHistForPreloadingLock = new ReentrantLock();
+ /** */
+ private CachePartitionDefragmentationManager defrgMgr;
+
/** Data regions which should be checkpointed. */
protected final Set<DataRegion> checkpointedDataRegions = new GridConcurrentHashSet<>();
@@ -447,6 +472,32 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
return cfg;
}
+ /** */
+ private DataRegionConfiguration createDefragmentationDataRegionConfig(long regionSize) {
+ DataRegionConfiguration cfg = new DataRegionConfiguration();
+
+ cfg.setName(DEFRAGMENTATION_PART_REGION_NAME);
+ cfg.setInitialSize(regionSize);
+ cfg.setMaxSize(regionSize);
+ cfg.setPersistenceEnabled(true);
+ cfg.setLazyMemoryAllocation(false);
+
+ return cfg;
+ }
+
+ /** */
+ private DataRegionConfiguration createDefragmentationMappingRegionConfig(long regionSize) {
+ DataRegionConfiguration cfg = new DataRegionConfiguration();
+
+ cfg.setName(DEFRAGMENTATION_MAPPING_REGION_NAME);
+ cfg.setInitialSize(regionSize);
+ cfg.setMaxSize(regionSize);
+ cfg.setPersistenceEnabled(true);
+ cfg.setLazyMemoryAllocation(false);
+
+ return cfg;
+ }
+
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
super.start0();
@@ -497,6 +548,99 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
+ /** {@inheritDoc} */
+ @Override protected void initDataRegions(DataStorageConfiguration memCfg) throws IgniteCheckedException {
+ if (isDefragmentationScheduled() && !dataRegionsInitialized) {
+ //Region size configuration will be changed for defragmentation needs.
+ memCfg = configureDataRegionForDefragmentation(memCfg);
+ }
+
+ super.initDataRegions(memCfg);
+ }
+
+ /**
+ * Configure data regions:
+ * <p> Size of configured cache data regions will be decreased in order of freeing space for</p>
+ * <p>defragmentation needs. * New defragmentation regions will be created which size would be based on freed space
+ * from previous step.</p>
+ *
+ * @param memCfg Data storage configuration with data region configurations.
+ * @return New data storage configuration which contains data regions with changed size.
+ * @throws IgniteCheckedException If fail.
+ */
+ private DataStorageConfiguration configureDataRegionForDefragmentation(
+ DataStorageConfiguration memCfg
+ ) throws IgniteCheckedException {
+ List<DataRegionConfiguration> regionConfs = new ArrayList<>();
+
+ DataStorageConfiguration dataConf = memCfg;//not do the changes in-place it's better to make the copy of memCfg.
+
+ regionConfs.add(dataConf.getDefaultDataRegionConfiguration());
+
+ if (dataConf.getDataRegionConfigurations() != null)
+ regionConfs.addAll(Arrays.asList(dataConf.getDataRegionConfigurations()));
+
+ long totalDefrRegionSize = 0;
+ long totalRegionsSize = 0;
+
+ for (DataRegionConfiguration regionCfg : regionConfs) {
+ totalDefrRegionSize = Math.max(
+ totalDefrRegionSize,
+ (long)(regionCfg.getMaxSize() * 0.01 * defragmentationRegionSizePercentageOfConfiguredSize)
+ );
+
+ totalRegionsSize += regionCfg.getMaxSize();
+ }
+
+ double shrinkPercentage = 1d * (totalRegionsSize - totalDefrRegionSize) / totalRegionsSize;
+
+ for (DataRegionConfiguration region : regionConfs) {
+ long newSize = (long)(region.getMaxSize() * shrinkPercentage);
+ long newInitSize = Math.min(region.getInitialSize(), newSize);
+
+ log.info("Region size was reassigned by defragmentation reason: " +
+ "region = '" + region.getName() + "', " +
+ "oldInitialSize = '" + region.getInitialSize() + "', " +
+ "newInitialSize = '" + newInitSize + "', " +
+ "oldMaxSize = '" + region.getMaxSize() + "', " +
+ "newMaxSize = '" + newSize
+ );
+
+ region.setMaxSize(newSize);
+ region.setInitialSize(newInitSize);
+ region.setCheckpointPageBufferSize(0);
+ }
+
+ long mappingRegionSize = Math.min(GB, (long)(totalDefrRegionSize * 0.1));
+
+ checkpointedDataRegions.remove(
+ addDataRegion(
+ memCfg,
+ createDefragmentationDataRegionConfig(totalDefrRegionSize - mappingRegionSize),
+ true,
+ new DefragmentationPageReadWriteManager(cctx.kernalContext(), "defrgPartitionsStore")
+ )
+ );
+
+ checkpointedDataRegions.remove(
+ addDataRegion(
+ memCfg,
+ createDefragmentationMappingRegionConfig(mappingRegionSize),
+ true,
+ new DefragmentationPageReadWriteManager(cctx.kernalContext(), "defrgLinkMappingStore")
+ )
+ );
+
+ return dataConf;
+ }
+
+ /**
+ * @return {@code true} if maintenance mode is on and defragmentation task exists.
+ */
+ private boolean isDefragmentationScheduled() {
+ return cctx.kernalContext().maintenanceRegistry().activeMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME) != null;
+ }
+
/** */
public Collection<DataRegion> checkpointedDataRegions() {
return checkpointedDataRegions;
@@ -603,10 +747,51 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
fileLockHolder.close();
}
+ /** */
+ private void prepareCacheDefragmentation(List<Integer> cacheGroupIds) throws IgniteCheckedException {
+ GridKernalContext kernalCtx = cctx.kernalContext();
+ DataStorageConfiguration dsCfg = kernalCtx.config().getDataStorageConfiguration();
+
+ assert CU.isPersistenceEnabled(dsCfg);
+
+ List<DataRegion> regions = Arrays.asList(
+ dataRegion(DEFRAGMENTATION_MAPPING_REGION_NAME),
+ dataRegion(DEFRAGMENTATION_PART_REGION_NAME)
+ );
+
+ LightweightCheckpointManager lightCheckpointMgr = new LightweightCheckpointManager(
+ kernalCtx::log,
+ cctx.igniteInstanceName(),
+ "db-checkpoint-thread-defrag",
+ kernalCtx.workersRegistry(),
+ persistenceCfg,
+ () -> regions,
+ this::getPageMemoryForCacheGroup,
+ resolveThrottlingPolicy(),
+ snapshotMgr,
+ persistentStoreMetricsImpl(),
+ kernalCtx.longJvmPauseDetector(),
+ kernalCtx.failure(),
+ kernalCtx.cache()
+ );
+
+ lightCheckpointMgr.start();
+
+ defrgMgr = new CachePartitionDefragmentationManager(
+ cacheGroupIds,
+ cctx,
+ this,
+ (FilePageStoreManager)cctx.pageStore(),
+ checkpointManager,
+ lightCheckpointMgr,
+ persistenceCfg.getPageSize()
+ );
+ }
+
/** {@inheritDoc} */
@Override public DataRegion addDataRegion(DataStorageConfiguration dataStorageCfg, DataRegionConfiguration dataRegionCfg,
- boolean trackable) throws IgniteCheckedException {
- DataRegion region = super.addDataRegion(dataStorageCfg, dataRegionCfg, trackable);
+ boolean trackable, PageReadWriteManager pmPageMgr) throws IgniteCheckedException {
+ DataRegion region = super.addDataRegion(dataStorageCfg, dataRegionCfg, trackable, pmPageMgr);
checkpointedDataRegions.add(region);
@@ -636,6 +821,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
registerSystemView();
notifyMetastorageReadyForRead();
+
+ cctx.kernalContext().maintenanceRegistry()
+ .registerWorkflowCallbackIfTaskExists(
+ DEFRAGMENTATION_MNTC_TASK_NAME,
+ task -> {
+ prepareCacheDefragmentation(fromStore(task).cacheGroupIds());
+
+ return new DefragmentationWorkflowCallback(cctx.kernalContext()::log, defrgMgr);
+ }
+ );
}
finally {
metaStorage = null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index ad062a8..50c3039 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -247,7 +247,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
boolean exists = ctx.pageStore() != null && ctx.pageStore().exists(grp.groupId(), p);
- return new GridCacheDataStore(grp, p, exists, busyLock, log);
+ return createGridCacheDataStore(grp, p, exists, log);
}
/** {@inheritDoc} */
@@ -1357,8 +1357,19 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** */
- public GridSpinBusyLock busyLock() {
- return busyLock;
+ public GridCacheDataStore createGridCacheDataStore(
+ CacheGroupContext grpCtx,
+ int partId,
+ boolean exists,
+ IgniteLogger log
+ ) {
+ return new GridCacheDataStore(
+ grpCtx,
+ partId,
+ exists,
+ busyLock,
+ log
+ );
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index bfadeb2..346b842 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -147,7 +147,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
protected final Map<String, DataRegionMetrics> memMetricsMap = new ConcurrentHashMap<>();
/** */
- private volatile boolean dataRegionsInitialized;
+ protected volatile boolean dataRegionsInitialized;
/** */
private volatile boolean dataRegionsStarted;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
index 9e7b3dc..73bec40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
@@ -34,7 +34,6 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
-import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
@@ -106,8 +105,7 @@ public class LightweightCheckpointManager {
DataStorageMetricsImpl persStoreMetrics,
LongJVMPauseDetector longJvmPauseDetector,
FailureProcessor failureProcessor,
- GridCacheProcessor cacheProcessor,
- FilePageStoreManager pageStoreManager
+ GridCacheProcessor cacheProcessor
) throws IgniteCheckedException {
CheckpointReadWriteLock lock = new CheckpointReadWriteLock(logger);
@@ -139,7 +137,7 @@ public class LightweightCheckpointManager {
logger,
snapshotMgr,
(pageMemEx, fullPage, buf, tag) ->
- pageStoreManager.write(fullPage.groupId(), fullPage.pageId(), buf, tag, true),
+ pageMemEx.pageManager().write(fullPage.groupId(), fullPage.pageId(), buf, tag, true),
persStoreMetrics,
throttlingPolicy,
threadBuf,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
new file mode 100644
index 0000000..006fa8e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
@@ -0,0 +1,827 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongConsumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.GridCacheDataStore;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointTimeoutLock;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.LightweightCheckpointManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
+import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV3;
+import org.apache.ignite.internal.processors.cache.tree.AbstractDataLeafIO;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
+import org.apache.ignite.internal.processors.cache.tree.DataRow;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.PendingRow;
+import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.collection.IntHashMap;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
+
+import static java.util.stream.StreamSupport.stream;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_MAPPING_REGION_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_PART_REGION_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.batchRenameDefragmentedCacheGroupPartitions;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedIndexTmpFile;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartFile;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartMappingFile;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartTmpFile;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.renameTempIndexFile;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.renameTempPartitionFile;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.skipAlreadyDefragmentedCacheGroup;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.skipAlreadyDefragmentedPartition;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.writeDefragmentationCompletionMarker;
+
+/**
+ * Defragmentation manager is the core class that contains main defragmentation procedure.
+ */
+public class CachePartitionDefragmentationManager {
+ /** */
+ public static final String DEFRAGMENTATION_MNTC_TASK_NAME = "defragmentationMaintenanceTask";
+
+ /** */
+ private final Set<Integer> cacheGroupsForDefragmentation;
+
+ /** Cache shared context. */
+ private final GridCacheSharedContext<?, ?> sharedCtx;
+
+ /** Maintenance registry. */
+ private final MaintenanceRegistry mntcReg;
+
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Database shared manager. */
+ private final GridCacheDatabaseSharedManager dbMgr;
+
+ /** File page store manager. */
+ private final FilePageStoreManager filePageStoreMgr;
+
+ /**
+ * Checkpoint for specific defragmentation regions which would store the data to new partitions
+ * during the defragmentation.
+ */
+ private final LightweightCheckpointManager defragmentationCheckpoint;
+
+ /** Default checkpoint for current node. */
+ private final CheckpointManager nodeCheckpoint;
+
+ /** Page size. */
+ private final int pageSize;
+
+ /** */
+ private final DataRegion partDataRegion;
+
+ /** */
+ private final DataRegion mappingDataRegion;
+
+ /**
+ * @param cacheGrpIds
+ * @param sharedCtx Cache shared context.
+ * @param dbMgr Database manager.
+ * @param filePageStoreMgr File page store manager.
+ * @param nodeCheckpoint Default checkpoint for this node.
+ * @param defragmentationCheckpoint Specific checkpoint for defragmentation.
+ * @param pageSize Page size.
+ */
+ public CachePartitionDefragmentationManager(
+ List<Integer> cacheGrpIds,
+ GridCacheSharedContext<?, ?> sharedCtx,
+ GridCacheDatabaseSharedManager dbMgr,
+ FilePageStoreManager filePageStoreMgr,
+ CheckpointManager nodeCheckpoint,
+ LightweightCheckpointManager defragmentationCheckpoint,
+ int pageSize
+ ) throws IgniteCheckedException {
+ cacheGroupsForDefragmentation = new HashSet<>(cacheGrpIds);
+
+ this.dbMgr = dbMgr;
+ this.filePageStoreMgr = filePageStoreMgr;
+ this.pageSize = pageSize;
+ this.sharedCtx = sharedCtx;
+
+ this.mntcReg = sharedCtx.kernalContext().maintenanceRegistry();
+ this.log = sharedCtx.logger(getClass());
+ this.defragmentationCheckpoint = defragmentationCheckpoint;
+ this.nodeCheckpoint = nodeCheckpoint;
+
+ partDataRegion = dbMgr.dataRegion(DEFRAGMENTATION_PART_REGION_NAME);
+ mappingDataRegion = dbMgr.dataRegion(DEFRAGMENTATION_MAPPING_REGION_NAME);
+ }
+
+ /** */
+ public void executeDefragmentation() throws IgniteCheckedException {
+ log.info("Defragmentation started.");
+
+ try {
+ // Checkpointer must be enabled so all pages on disk are in their latest valid state.
+ dbMgr.resumeWalLogging();
+
+ dbMgr.onStateRestored(null);
+
+ nodeCheckpoint.forceCheckpoint("beforeDefragmentation", null).futureFor(FINISHED).get();
+
+ sharedCtx.wal().onDeActivate(sharedCtx.kernalContext());
+
+ // Now the actual process starts.
+ TreeIterator treeIter = new TreeIterator(pageSize);
+
+ IgniteInternalFuture<?> idxDfrgFut = null;
+ DataPageEvictionMode prevPageEvictionMode = null;
+
+ for (CacheGroupContext oldGrpCtx : sharedCtx.cache().cacheGroups()) {
+ if (!oldGrpCtx.userCache())
+ continue;
+
+ int grpId = oldGrpCtx.groupId();
+
+ if (!cacheGroupsForDefragmentation.isEmpty() && !cacheGroupsForDefragmentation.contains(grpId))
+ continue;
+
+ File workDir = filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), oldGrpCtx.cacheOrGroupName());
+
+ if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log))
+ continue;
+
+ GridCacheOffheapManager offheap = (GridCacheOffheapManager)oldGrpCtx.offheap();
+
+ List<CacheDataStore> oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false)
+ .filter(store -> {
+ try {
+ return filePageStoreMgr.exists(grpId, store.partId());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
+ if (workDir != null && !oldCacheDataStores.isEmpty()) {
+ // We can't start defragmentation of new group on the region that has wrong eviction mode.
+ // So waiting of the previous cache group defragmentation is inevitable.
+ DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode();
+
+ if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) {
+ prevPageEvictionMode = curPageEvictionMode;
+
+ partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
+
+ if (idxDfrgFut != null)
+ idxDfrgFut.get();
+ }
+
+ IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>();
+
+ for (CacheDataStore store : offheap.cacheDataStores()) {
+ // Tree can be null for not yet initialized partitions.
+ // This would mean that these partitions are empty.
+ assert store.tree() == null || store.tree().groupId() == grpId;
+
+ if (store.tree() != null)
+ cacheDataStores.put(store.partId(), store);
+ }
+
+ dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion());
+
+ // Another cheat. Ttl cleanup manager knows too much shit.
+ oldGrpCtx.caches().stream()
+ .filter(cacheCtx -> cacheCtx.groupId() == grpId)
+ .forEach(cacheCtx -> cacheCtx.ttl().unregister());
+
+ // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care and
+ // WAL records will be allocated anyway just to be ignored later if we don't disable WAL for
+ // cache group explicitly.
+ oldGrpCtx.localWalEnabled(false, false);
+
+ boolean encrypted = oldGrpCtx.config().isEncryptionEnabled();
+
+ FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted);
+
+ createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, val -> {
+ }); //TODO Allocated tracker.
+
+ GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>();
+
+ PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory();
+
+ CacheGroupContext newGrpCtx = new CacheGroupContext(
+ sharedCtx,
+ grpId,
+ oldGrpCtx.receivedFrom(),
+ CacheType.USER,
+ oldGrpCtx.config(),
+ oldGrpCtx.affinityNode(),
+ partDataRegion,
+ oldGrpCtx.cacheObjectContext(),
+ null,
+ null,
+ oldGrpCtx.localStartVersion(),
+ true,
+ false,
+ true
+ );
+
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+
+ try {
+ // This will initialize partition meta in index partition - meta tree and reuse list.
+ newGrpCtx.start();
+ }
+ finally {
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+ }
+
+ IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
+
+ for (CacheDataStore oldCacheDataStore : oldCacheDataStores) {
+ int partId = oldCacheDataStore.partId();
+
+ PartitionContext partCtx = new PartitionContext(
+ workDir,
+ grpId,
+ partId,
+ partDataRegion,
+ mappingDataRegion,
+ oldGrpCtx,
+ newGrpCtx,
+ cacheDataStores.get(partId),
+ pageStoreFactory
+ );
+
+ if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
+ partCtx.createPageStore(
+ () -> defragmentedPartMappingFile(workDir, partId).toPath(),
+ partCtx.mappingPagesAllocated,
+ partCtx.mappingPageMemory
+ );
+
+ linkMapByPart.put(partId, partCtx.createLinkMapTree(false));
+
+ continue;
+ }
+
+ partCtx.createPageStore(
+ () -> defragmentedPartMappingFile(workDir, partId).toPath(),
+ partCtx.mappingPagesAllocated,
+ partCtx.mappingPageMemory
+ );
+
+ linkMapByPart.put(partId, partCtx.createLinkMapTree(true));
+
+ partCtx.createPageStore(
+ () -> defragmentedPartTmpFile(workDir, partId).toPath(),
+ partCtx.partPagesAllocated,
+ partCtx.partPageMemory
+ );
+
+ partCtx.createNewCacheDataStore(offheap);
+
+ copyPartitionData(partCtx, treeIter);
+
+ IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
+ if (fut.error() != null)
+ return;
+
+ PageStore oldPageStore = null;
+
+ try {
+ oldPageStore = filePageStoreMgr.getStore(grpId, partId);
+ }
+ catch (IgniteCheckedException ignore) {
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(S.toString(
+ "Partition defragmented",
+ "grpId", grpId, false,
+ "partId", partId, false,
+ "oldPages", oldPageStore.pages(), false,
+ "newPages", partCtx.partPagesAllocated.get() + 1, false,
+ "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
+ "pageSize", pageSize, false,
+ "partFile", defragmentedPartFile(workDir, partId).getName(), false,
+ "workDir", workDir, false
+ ));
+ }
+
+ oldPageMem.invalidate(grpId, partId);
+
+ partCtx.partPageMemory.invalidate(grpId, partId);
+
+ DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
+
+ pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.
+
+ renameTempPartitionFile(workDir, partId);
+ };
+
+ GridFutureAdapter<?> cpFut = defragmentationCheckpoint
+ .forceCheckpoint("partition defragmented", null)
+ .futureFor(CheckpointState.FINISHED);
+
+ cpFut.listen(cpLsnr);
+
+ cmpFut.add((IgniteInternalFuture<Object>)cpFut);
+ }
+
+ // A bit too general for now, but I like it more then saving only the last checkpoint future.
+ cmpFut.markInitialized().get();
+
+ idxDfrgFut = new GridFinishedFuture<>();
+
+ if (filePageStoreMgr.hasIndexStore(grpId)) {
+ defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart);
+
+ idxDfrgFut = defragmentationCheckpoint
+ .forceCheckpoint("index defragmented", null)
+ .futureFor(CheckpointState.FINISHED);
+ }
+
+ idxDfrgFut.listen(fut -> {
+ oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
+
+ PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory();
+
+ partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION);
+
+ DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager();
+
+ pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION);
+
+ PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory();
+
+ pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager();
+
+ pageMgr.pageStoreMap().clear(grpId);
+
+ renameTempIndexFile(workDir);
+
+ writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log);
+
+ batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+ });
+ }
+
+ // I guess we should wait for it?
+ if (idxDfrgFut != null)
+ idxDfrgFut.get();
+ }
+
+ mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
+
+ log.info("Defragmentation completed. All partitions are defragmented.");
+ }
+ finally {
+ defragmentationCheckpoint.stop(true);
+ }
+ }
+
+ /** */
+ public void createIndexPageStore(
+ int grpId,
+ File workDir,
+ FilePageStoreFactory pageStoreFactory,
+ DataRegion partRegion,
+ LongConsumer allocatedTracker
+ ) throws IgniteCheckedException {
+ // Index partition file has to be deleted before we begin, otherwise there's a chance of reading corrupted file.
+ // There is a time period when index is already defragmented but marker file is not created yet. If node is
+ // failed in that time window then index will be deframented once again. That's fine, situation is rare but code
+ // to fix that would add unnecessary complications.
+ U.delete(defragmentedIndexTmpFile(workDir));
+
+ PageStore idxPageStore;
+
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+ try {
+ idxPageStore = pageStoreFactory.createPageStore(
+ FLAG_IDX,
+ () -> defragmentedIndexTmpFile(workDir).toPath(),
+ allocatedTracker
+ );
+ }
+ finally {
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+ }
+
+ idxPageStore.sync();
+
+ PageMemoryEx partPageMem = (PageMemoryEx)partRegion.pageMemory();
+
+ DefragmentationPageReadWriteManager partMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager();
+
+ partMgr.pageStoreMap().addPageStore(grpId, PageIdAllocator.INDEX_PARTITION, idxPageStore);
+ }
+
+ /**
+ * Defragmentate partition.
+ *
+ * @param partCtx
+ * @param treeIter
+ * @throws IgniteCheckedException If failed.
+ */
+ private void copyPartitionData(
+ PartitionContext partCtx,
+ TreeIterator treeIter
+ ) throws IgniteCheckedException {
+ CacheDataTree tree = partCtx.oldCacheDataStore.tree();
+
+ CacheDataTree newTree = partCtx.newCacheDataStore.tree();
+ PendingEntriesTree newPendingTree = partCtx.newCacheDataStore.pendingTree();
+ AbstractFreeList<CacheDataRow> freeList = partCtx.newCacheDataStore.getCacheStoreFreeList();
+
+ long cpLockThreshold = 150L;
+
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+
+ try {
+ AtomicLong lastCpLockTs = new AtomicLong(System.currentTimeMillis());
+ AtomicInteger entriesProcessed = new AtomicInteger();
+
+ treeIter.iterate(tree, partCtx.cachePageMemory, (tree0, io, pageAddr, idx) -> {
+ if (System.currentTimeMillis() - lastCpLockTs.get() >= cpLockThreshold) {
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+
+ lastCpLockTs.set(System.currentTimeMillis());
+ }
+
+ AbstractDataLeafIO leafIo = (AbstractDataLeafIO)io;
+ CacheDataRow row = tree.getRow(io, pageAddr, idx);
+
+ int cacheId = row.cacheId();
+
+ // Reuse row that we just read.
+ row.link(0);
+
+ // "insertDataRow" will corrupt page memory if we don't do this.
+ if (row instanceof DataRow && !partCtx.oldGrpCtx.storeCacheIdInDataPage())
+ ((DataRow)row).cacheId(CU.UNDEFINED_CACHE_ID);
+
+ freeList.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE);
+
+ // Put it back.
+ if (row instanceof DataRow)
+ ((DataRow)row).cacheId(cacheId);
+
+ newTree.putx(row);
+
+ long newLink = row.link();
+
+ partCtx.linkMap.put(leafIo.getLink(pageAddr, idx), newLink);
+
+ if (row.expireTime() != 0)
+ newPendingTree.putx(new PendingRow(cacheId, row.expireTime(), newLink));
+
+ entriesProcessed.incrementAndGet();
+
+ return true;
+ });
+
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+
+ freeList.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
+
+ copyCacheMetadata(partCtx);
+ }
+ finally {
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+ }
+ }
+
+ /** */
+ private void copyCacheMetadata(
+ PartitionContext partCtx
+ ) throws IgniteCheckedException {
+ // Same for all page memories. Why does it need to be in PageMemory?
+ long partMetaPageId = partCtx.cachePageMemory.partitionMetaPageId(partCtx.grpId, partCtx.partId);
+
+ long oldPartMetaPage = partCtx.cachePageMemory.acquirePage(partCtx.grpId, partMetaPageId);
+
+ try {
+ long oldPartMetaPageAddr = partCtx.cachePageMemory.readLock(partCtx.grpId, partMetaPageId, oldPartMetaPage);
+
+ try {
+ PagePartitionMetaIO oldPartMetaIo = PageIO.getPageIO(oldPartMetaPageAddr);
+
+ // Newer meta versions may contain new data that we don't copy during defragmentation.
+ assert Arrays.asList(1, 2, 3).contains(oldPartMetaIo.getVersion())
+ : "IO version " + oldPartMetaIo.getVersion() + " is not supported by current defragmentation algorithm." +
+ " Please implement copying of all data added in new version.";
+
+ long newPartMetaPage = partCtx.partPageMemory.acquirePage(partCtx.grpId, partMetaPageId);
+
+ try {
+ long newPartMetaPageAddr = partCtx.partPageMemory.writeLock(partCtx.grpId, partMetaPageId, newPartMetaPage);
+
+ try {
+ PagePartitionMetaIOV3 newPartMetaIo = PageIO.getPageIO(newPartMetaPageAddr);
+
+ // Copy partition state.
+ byte partState = oldPartMetaIo.getPartitionState(oldPartMetaPageAddr);
+ newPartMetaIo.setPartitionState(newPartMetaPageAddr, partState);
+
+ // Copy cache size for single cache group.
+ long size = oldPartMetaIo.getSize(oldPartMetaPageAddr);
+ newPartMetaIo.setSize(newPartMetaPageAddr, size);
+
+ // Copy update counter value.
+ long updateCntr = oldPartMetaIo.getUpdateCounter(oldPartMetaPageAddr);
+ newPartMetaIo.setUpdateCounter(newPartMetaPageAddr, updateCntr);
+
+ // Copy global remove Id.
+ long rmvId = oldPartMetaIo.getGlobalRemoveId(oldPartMetaPageAddr);
+ newPartMetaIo.setGlobalRemoveId(newPartMetaPageAddr, rmvId);
+
+ // Copy cache sizes for shared cache group.
+ long oldCountersPageId = oldPartMetaIo.getCountersPageId(oldPartMetaPageAddr);
+ if (oldCountersPageId != 0L) {
+ Map<Integer, Long> sizes = GridCacheOffheapManager.readSharedGroupCacheSizes(
+ partCtx.cachePageMemory,
+ partCtx.grpId,
+ oldCountersPageId
+ );
+
+ long newCountersPageId = GridCacheOffheapManager.writeSharedGroupCacheSizes(
+ partCtx.partPageMemory,
+ partCtx.grpId,
+ 0L,
+ partCtx.partId,
+ sizes
+ );
+
+ newPartMetaIo.setCountersPageId(newPartMetaPageAddr, newCountersPageId);
+ }
+
+ // Copy counter gaps.
+ long oldGapsLink = oldPartMetaIo.getGapsLink(oldPartMetaPageAddr);
+ if (oldGapsLink != 0L) {
+ byte[] gapsBytes = partCtx.oldCacheDataStore.partStorage().readRow(oldGapsLink);
+
+ SimpleDataRow gapsDataRow = new SimpleDataRow(partCtx.partId, gapsBytes);
+
+ partCtx.newCacheDataStore.partStorage().insertDataRow(gapsDataRow, IoStatisticsHolderNoOp.INSTANCE);
+
+ newPartMetaIo.setGapsLink(newPartMetaPageAddr, gapsDataRow.link());
+ }
+
+ // Encryption stuff.
+ newPartMetaIo.setEncryptedPageCount(newPartMetaPageAddr, 0);
+ newPartMetaIo.setEncryptedPageIndex(newPartMetaPageAddr, 0);
+ }
+ finally {
+ partCtx.partPageMemory.writeUnlock(partCtx.grpId, partMetaPageId, newPartMetaPage, null, true);
+ }
+ }
+ finally {
+ partCtx.partPageMemory.releasePage(partCtx.grpId, partMetaPageId, newPartMetaPage);
+ }
+ }
+ finally {
+ partCtx.cachePageMemory.readUnlock(partCtx.grpId, partMetaPageId, oldPartMetaPage);
+ }
+ }
+ finally {
+ partCtx.cachePageMemory.releasePage(partCtx.grpId, partMetaPageId, oldPartMetaPage);
+ }
+ }
+
+ /**
+ * Defragmentate indexing partition.
+ *
+ * @param grpCtx
+ * @param mappingByPartition
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ private void defragmentIndexPartition(
+ CacheGroupContext grpCtx,
+ CacheGroupContext newCtx,
+ IntMap<LinkMap> mappingByPartition
+ ) throws IgniteCheckedException {
+ GridQueryProcessor query = grpCtx.caches().get(0).kernalContext().query();
+
+ if (!query.moduleEnabled())
+ return;
+
+ final GridQueryIndexing idx = query.getIndexing();
+
+ CheckpointTimeoutLock cpLock = defragmentationCheckpoint.checkpointTimeoutLock();
+
+ idx.defragment(
+ grpCtx,
+ newCtx,
+ (PageMemoryEx)partDataRegion.pageMemory(),
+ mappingByPartition,
+ cpLock
+ );
+ }
+
+ /** */
+ @SuppressWarnings("PublicField")
+ private class PartitionContext {
+ /** */
+ public final File workDir;
+
+ /** */
+ public final int grpId;
+
+ /** */
+ public final int partId;
+
+ /** */
+ public final DataRegion cacheDataRegion;
+
+ /** */
+ public final PageMemoryEx cachePageMemory;
+
+ /** */
+ public final PageMemoryEx partPageMemory;
+
+ /** */
+ public final PageMemoryEx mappingPageMemory;
+
+ /** */
+ public final CacheGroupContext oldGrpCtx;
+
+ /** */
+ public final CacheGroupContext newGrpCtx;
+
+ /** */
+ public final CacheDataStore oldCacheDataStore;
+
+ /** */
+ private GridCacheDataStore newCacheDataStore;
+
+ /** */
+ public final FilePageStoreFactory pageStoreFactory;
+
+ /** */
+ public final AtomicLong partPagesAllocated = new AtomicLong();
+
+ /** */
+ public final AtomicLong mappingPagesAllocated = new AtomicLong();
+
+ /** */
+ private LinkMap linkMap;
+
+ /** */
+ public PartitionContext(
+ File workDir,
+ int grpId,
+ int partId,
+ DataRegion partDataRegion,
+ DataRegion mappingDataRegion,
+ CacheGroupContext oldGrpCtx,
+ CacheGroupContext newGrpCtx,
+ CacheDataStore oldCacheDataStore,
+ FilePageStoreFactory pageStoreFactory
+ ) {
+ this.workDir = workDir;
+ this.grpId = grpId;
+ this.partId = partId;
+ cacheDataRegion = oldGrpCtx.dataRegion();
+
+ cachePageMemory = (PageMemoryEx)cacheDataRegion.pageMemory();
+ partPageMemory = (PageMemoryEx)partDataRegion.pageMemory();
+ mappingPageMemory = (PageMemoryEx)mappingDataRegion.pageMemory();
+
+ this.oldGrpCtx = oldGrpCtx;
+ this.newGrpCtx = newGrpCtx;
+ this.oldCacheDataStore = oldCacheDataStore;
+ this.pageStoreFactory = pageStoreFactory;
+ }
+
+ /** */
+ public PageStore createPageStore(IgniteOutClosure<Path> pathProvider, AtomicLong pagesAllocated, PageMemoryEx pageMemory) throws IgniteCheckedException {
+ PageStore partPageStore;
+
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+ try {
+ partPageStore = pageStoreFactory.createPageStore(
+ FLAG_DATA,
+ pathProvider,
+ pagesAllocated::addAndGet
+ );
+ }
+ finally {
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+ }
+
+ partPageStore.sync();
+
+ DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)pageMemory.pageManager();
+
+ pageMgr.pageStoreMap().addPageStore(grpId, partId, partPageStore);
+
+ return partPageStore;
+ }
+
+ /** */
+ public LinkMap createLinkMapTree(boolean initNew) throws IgniteCheckedException {
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+
+ try {
+ long mappingMetaPageId = initNew
+ ? mappingPageMemory.allocatePage(grpId, partId, FLAG_DATA)
+ : PageIdUtils.pageId(partId, FLAG_DATA, LinkMap.META_PAGE_IDX);
+
+ assert PageIdUtils.pageIndex(mappingMetaPageId) == LinkMap.META_PAGE_IDX
+ : PageIdUtils.toDetailString(mappingMetaPageId);
+
+ linkMap = new LinkMap(newGrpCtx, mappingPageMemory, mappingMetaPageId, initNew);
+ }
+ finally {
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+ }
+
+ return linkMap;
+ }
+
+ /** */
+ public void createNewCacheDataStore(GridCacheOffheapManager offheap) {
+ GridCacheDataStore newCacheDataStore = offheap.createGridCacheDataStore(
+ newGrpCtx,
+ partId,
+ true,
+ log
+ );
+
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+
+ try {
+ newCacheDataStore.init();
+ }
+ finally {
+ defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+ }
+
+ this.newCacheDataStore = newCacheDataStore;
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
new file mode 100644
index 0000000..b4273cd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
+
+/**
+ * Everything related to file management during defragmentation process.
+ */
+public class DefragmentationFileUtils {
+ /** Prefix for link mapping files. */
+ private static final String DFRG_LINK_MAPPING_FILE_PREFIX = PART_FILE_PREFIX + "map-";
+
+ /** Link mapping file template. */
+ private static final String DFRG_LINK_MAPPING_FILE_TEMPLATE = DFRG_LINK_MAPPING_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+ /** Defragmentation complation marker file name. */
+ private static final String DFRG_COMPLETION_MARKER_FILE_NAME = "dfrg-completion-marker";
+
+ /** Name of defragmentated index partition file. */
+ private static final String DFRG_INDEX_FILE_NAME = INDEX_FILE_PREFIX + "-dfrg" + FILE_SUFFIX;
+
+ /** Name of defragmentated index partition temporary file. */
+ private static final String DFRG_INDEX_TMP_FILE_NAME = DFRG_INDEX_FILE_NAME + TMP_SUFFIX;
+
+ /** Prefix for defragmented partition files. */
+ private static final String DFRG_PARTITION_FILE_PREFIX = PART_FILE_PREFIX + "dfrg-";
+
+ /** Defragmented partition file template. */
+ private static final String DFRG_PARTITION_FILE_TEMPLATE = DFRG_PARTITION_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+ /** Defragmented partition temp file template. */
+ private static final String DFRG_PARTITION_TMP_FILE_TEMPLATE = DFRG_PARTITION_FILE_TEMPLATE + TMP_SUFFIX;
+
+ /**
+ * Performs cleanup of work dir before initializing file page stores.
+ * Will finish batch renaming if defragmentation was completed or delete garbage if it wasn't.
+ *
+ * @param workDir Cache group working directory.
+ * @param log Logger to write messages.
+ * @throws IgniteCheckedException If {@link IOException} occurred.
+ */
+ public static void beforeInitPageStores(File workDir, IgniteLogger log) throws IgniteCheckedException {
+ try {
+ batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+ U.delete(defragmentationCompletionMarkerFile(workDir));
+
+ for (File file : workDir.listFiles()) {
+ String fileName = file.getName();
+
+ if (
+ fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
+ || fileName.startsWith(DFRG_INDEX_FILE_NAME)
+ || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
+ )
+ U.delete(file);
+ }
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /**
+ * Checks whether cache group defragmentation completed or not. Completes it if all that's left is renaming.
+ *
+ * @param workDir Cache group working directory.
+ * @param grpId Cache group Id of cache group belonging to the given working directory.
+ * @param log Logger to write messages.
+ * @return {@code true} if given cache group is already defragmented.
+ * @throws IgniteException If {@link IOException} occurred.
+ *
+ * @see DefragmentationFileUtils#defragmentationCompletionMarkerFile(File)
+ */
+ public static boolean skipAlreadyDefragmentedCacheGroup(File workDir, int grpId, IgniteLogger log) throws IgniteException {
+ File completionMarkerFile = defragmentationCompletionMarkerFile(workDir);
+
+ if (completionMarkerFile.exists()) {
+ if (log.isInfoEnabled()) {
+ log.info(S.toString(
+ "Skipping already defragmented page group",
+ "grpId", grpId, false,
+ "markerFileName", completionMarkerFile.getName(), false,
+ "workDir", workDir.getAbsolutePath(), false
+ ));
+ }
+
+ batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Checks whether partition has already been defragmented or not. Cleans corrupted data if previous failed
+ * defragmentation attempt was found.
+ *
+ * @param workDir Cache group working directory.
+ * @param grpId Cache group Id of cache group belonging to the given working directory.
+ * @param partId Partition index to check.
+ * @param log Logger to write messages.
+ * @return {@code true} if given partition is already defragmented.
+ * @throws IgniteException If {@link IOException} occurred.
+ *
+ * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+ * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+ * @see DefragmentationFileUtils#defragmentedPartMappingFile(File, int)
+ */
+ public static boolean skipAlreadyDefragmentedPartition(File workDir, int grpId, int partId, IgniteLogger log) throws IgniteException {
+ File defragmentedPartFile = defragmentedPartFile(workDir, partId);
+ File defragmentedPartMappingFile = defragmentedPartMappingFile(workDir, partId);
+
+ if (defragmentedPartFile.exists() && defragmentedPartMappingFile.exists()) {
+ if (log.isInfoEnabled()) {
+ log.info(S.toString(
+ "Skipping already defragmented partition",
+ "grpId", grpId, false,
+ "partId", partId, false,
+ "partFileName", defragmentedPartFile.getName(), false,
+ "mappingFileName", defragmentedPartMappingFile.getName(), false,
+ "workDir", workDir.getAbsolutePath(), false
+ ));
+ }
+
+ return true;
+ }
+
+ File defragmentedPartTmpFile = defragmentedPartTmpFile(workDir, partId);
+
+ try {
+ Files.deleteIfExists(defragmentedPartTmpFile.toPath());
+
+ Files.deleteIfExists(defragmentedPartFile.toPath());
+
+ Files.deleteIfExists(defragmentedPartMappingFile.toPath());
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+
+ return false;
+ }
+
+ /**
+ * Failure-tolerant batch rename of defragmented partition files.
+ *
+ * Deletes all link mapping files old partition and index files, renaming defragmentated files in the process. Can
+ * be run on the same folder multiple times if failed for some reason.
+ *
+ * Does something only if completion marker is present in the folder. This marker won't be deleted in the end.
+ * Deletion of the marker must be done outside of defragmentation mode to prevent cache groups to be defragmentated
+ * several times in case of failures.
+ *
+ * @param workDir Cache group working directory.
+ * @param log Logger to write messages.
+ * @throws IgniteException If {@link IOException} occurred.
+ *
+ * @see DefragmentationFileUtils#writeDefragmentationCompletionMarker(FileIOFactory, File, IgniteLogger)
+ */
+ public static void batchRenameDefragmentedCacheGroupPartitions(File workDir, IgniteLogger log) throws IgniteException {
+ File completionMarkerFile = defragmentationCompletionMarkerFile(workDir);
+
+ if (!completionMarkerFile.exists())
+ return;
+
+ try {
+ for (File mappingFile : workDir.listFiles((dir, name) -> name.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)))
+ Files.delete(mappingFile.toPath());
+
+ for (File partFile : workDir.listFiles((dir, name) -> name.startsWith(DFRG_PARTITION_FILE_PREFIX))) {
+ int partId = extractPartId(partFile.getName());
+
+ File oldPartFile = new File(workDir, String.format(PART_FILE_TEMPLATE, partId));
+
+ Files.move(partFile.toPath(), oldPartFile.toPath(), ATOMIC_MOVE, REPLACE_EXISTING);
+ }
+
+ File idxFile = new File(workDir, DFRG_INDEX_FILE_NAME);
+
+ if (idxFile.exists()) {
+ File oldIdxFile = new File(workDir, INDEX_FILE_NAME);
+
+ Files.move(idxFile.toPath(), oldIdxFile.toPath(), ATOMIC_MOVE, REPLACE_EXISTING);
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Extracts partition number from file names like {@code part-dfrg-%d.bin}.
+ *
+ * @param dfrgPartFileName Defragmented partition file name.
+ * @return Partition index.
+ *
+ * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+ */
+ private static int extractPartId(String dfrgPartFileName) {
+ assert dfrgPartFileName.startsWith(DFRG_PARTITION_FILE_PREFIX) : dfrgPartFileName;
+ assert dfrgPartFileName.endsWith(FILE_SUFFIX) : dfrgPartFileName;
+
+ String partIdStr = dfrgPartFileName.substring(
+ DFRG_PARTITION_FILE_PREFIX.length(),
+ dfrgPartFileName.length() - FILE_SUFFIX.length()
+ );
+
+ return Integer.parseInt(partIdStr);
+ }
+
+ /**
+ * Return file named {@code index-dfrg.bin.tmp} in given folder. It will be used for storing defragmented index
+ * partition during the process.
+ *
+ * @param workDir Cache group working directory.
+ * @return File.
+ *
+ * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+ */
+ public static File defragmentedIndexTmpFile(File workDir) {
+ return new File(workDir, DFRG_INDEX_TMP_FILE_NAME);
+ }
+
+ /**
+ * Return file named {@code index-dfrg.bin} in given folder. It will be used for storing defragmented index
+ * partition when the process is over.
+ *
+ * @param workDir Cache group working directory.
+ * @return File.
+ *
+ * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+ */
+ public static File defragmentedIndexFile(File workDir) {
+ return new File(workDir, DFRG_INDEX_FILE_NAME);
+ }
+
+ /**
+ * Rename temporary index defragmentation file to a finalized one.
+ *
+ * @param workDir Cache group working directory.
+ * @throws IgniteException If {@link IOException} occurred.
+ *
+ * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+ * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+ */
+ public static void renameTempIndexFile(File workDir) throws IgniteException {
+ File defragmentedIdxTmpFile = defragmentedIndexTmpFile(workDir);
+ File defragmentedIdxFile = defragmentedIndexFile(workDir);
+
+ try {
+ Files.move(defragmentedIdxTmpFile.toPath(), defragmentedIdxFile.toPath(), ATOMIC_MOVE);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Return file named {@code part-dfrg-%d.bin.tmp} in given folder. It will be used for storing defragmented data
+ * partition during the process.
+ *
+ * @param workDir Cache group working directory.
+ * @param partId Partition index, will be substituted into file name.
+ * @return File.
+ *
+ * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+ */
+ public static File defragmentedPartTmpFile(File workDir, int partId) {
+ return new File(workDir, String.format(DFRG_PARTITION_TMP_FILE_TEMPLATE, partId));
+ }
+
+ /**
+ * Return file named {@code part-dfrg-%d.bin} in given folder. It will be used for storing defragmented data
+ * partition when the process is over.
+ *
+ * @param workDir Cache group working directory.
+ * @param partId Partition index, will be substituted into file name.
+ * @return File.
+ *
+ * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+ */
+ public static File defragmentedPartFile(File workDir, int partId) {
+ return new File(workDir, String.format(DFRG_PARTITION_FILE_TEMPLATE, partId));
+ }
+
+ /**
+ * Rename temporary partition defragmentation file to a finalized one.
+ *
+ * @param workDir Cache group working directory.
+ * @param partId Partition index.
+ * @throws IgniteException If {@link IOException} occurred.
+ *
+ * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+ * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+ */
+ public static void renameTempPartitionFile(File workDir, int partId) throws IgniteException {
+ File defragmentedPartTmpFile = defragmentedPartTmpFile(workDir, partId);
+ File defragmentedPartFile = defragmentedPartFile(workDir, partId);
+
+ assert !defragmentedPartFile.exists() : defragmentedPartFile;
+
+ try {
+ Files.move(defragmentedPartTmpFile.toPath(), defragmentedPartFile.toPath(), ATOMIC_MOVE);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Return file named {@code part-map-%d.bin} in given folder. It will be used for storing defragmention links
+ * mapping for given partition during and after defragmentation process. No temporary counterpart is required here.
+ *
+ * @param workDir Cache group working directory.
+ * @param partId Partition index, will be substituted into file name.
+ * @return File.
+ *
+ * @see LinkMap
+ */
+ public static File defragmentedPartMappingFile(File workDir, int partId) {
+ return new File(workDir, String.format(DFRG_LINK_MAPPING_FILE_TEMPLATE, partId));
+ }
+
+ /**
+ * Return defragmentation completion marker file. This file can only be created when all partitions and index are
+ * defragmented and renamed from their original {@code *.tmp} versions. Presence of this file signals that no data
+ * will be lost if original partitions are deleted and batch rename process can be safely initiated.
+ *
+ * @param workDir Cache group working directory.
+ * @return File.
+ *
+ * @see DefragmentationFileUtils#writeDefragmentationCompletionMarker(FileIOFactory, File, IgniteLogger)
+ * @see DefragmentationFileUtils#batchRenameDefragmentedCacheGroupPartitions(File, IgniteLogger)
+ */
+ public static File defragmentationCompletionMarkerFile(File workDir) {
+ return new File(workDir, DFRG_COMPLETION_MARKER_FILE_NAME);
+ }
+
+ /**
+ * Creates empty completion marker file in given directory.
+ *
+ * @param ioFactory File IO factory.
+ * @param workDir Cache group working directory.
+ * @param log Logger to write messages.
+ * @throws IgniteException If {@link IOException} occurred.
+ *
+ * @see DefragmentationFileUtils#defragmentationCompletionMarkerFile(File)
+ */
+ public static void writeDefragmentationCompletionMarker(
+ FileIOFactory ioFactory,
+ File workDir,
+ IgniteLogger log
+ ) throws IgniteException {
+ File completionMarker = defragmentationCompletionMarkerFile(workDir);
+
+ try (FileIO io = ioFactory.create(completionMarker, CREATE_NEW, WRITE)) {
+ io.force(true);
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationPageReadWriteManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationPageReadWriteManager.java
new file mode 100644
index 0000000..2ed7c91
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationPageReadWriteManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageReadWriteManagerImpl;
+
+/** */
+public class DefragmentationPageReadWriteManager extends PageReadWriteManagerImpl {
+ /**
+ * @param ctx Kernal context.
+ * @param name name.
+ */
+ public DefragmentationPageReadWriteManager(GridKernalContext ctx, String name) {
+ super(ctx, new PageStoreMap(), name);
+ }
+
+ /** */
+ public PageStoreMap pageStoreMap() {
+ return (PageStoreMap)pageStores;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/LinkMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/LinkMap.java
new file mode 100644
index 0000000..a796ab9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/LinkMap.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageLockListener;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_AUX;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
+
+/**
+ * Class that holds mappings of old links to new links.
+ */
+public class LinkMap {
+ /** Tree meta page index. */
+ public static final int META_PAGE_IDX = 2;
+
+ /** */
+ public static final IOVersions<? extends BPlusLeafIO<?>> LEAF_IO_VERSIONS = new IOVersions<>(
+ new LinkMappingLeafIO()
+ );
+
+ /** */
+ public static final IOVersions<? extends BPlusInnerIO<?>> INNER_IO_VERSIONS = new IOVersions<>(
+ new LinkMappingInnerIO()
+ );
+
+ /** Mapping tree. */
+ private final LinkTree tree;
+
+ /**
+ * @param ctx Cache group context.
+ * @param pageMem Page memory.
+ * @param metaPageId Meta page id.
+ * @param initNew If tree should be (re)created.
+ */
+ public LinkMap(
+ CacheGroupContext ctx,
+ PageMemory pageMem,
+ long metaPageId,
+ boolean initNew
+ ) throws IgniteCheckedException {
+ this(ctx.groupId(), ctx.name(), pageMem, metaPageId, initNew);
+ }
+
+ /**
+ * @param grpId Cache group id.
+ * @param grpName Cache group name.
+ * @param pageMem Page memory.
+ * @param metaPageId Meta page id.
+ * @param initNew If tree should be (re)created.
+ */
+ public LinkMap(
+ int grpId,
+ String grpName,
+ PageMemory pageMem,
+ long metaPageId,
+ boolean initNew
+ ) throws IgniteCheckedException {
+ tree = new LinkTree(
+ "link-map",
+ grpId,
+ grpName,
+ pageMem,
+ null,
+ new AtomicLong(),
+ metaPageId,
+ null,
+ (IOVersions<LinkMappingInnerIO>)INNER_IO_VERSIONS,
+ (IOVersions<LinkMappingLeafIO>)LEAF_IO_VERSIONS,
+ null,
+ null,
+ initNew
+ );
+ }
+
+ /**
+ * Add link mapping.
+ *
+ * @param oldLink Old link.
+ * @param newLink New link.
+ */
+ public void put(long oldLink, long newLink) throws IgniteCheckedException {
+ tree.put(new LinkMapping(oldLink, newLink));
+ }
+
+ /**
+ * Get new link by old link.
+ *
+ * @param oldLink Old link.
+ */
+ public long get(long oldLink) throws IgniteCheckedException {
+ LinkMapping get = new LinkMapping(oldLink, 0);
+ LinkMapping found = tree.findOne(get);
+
+ return found.getNewLink();
+ }
+
+ /** */
+ private static class LinkTree extends BPlusTree<LinkMapping, LinkMapping> {
+ /**
+ * @param name Tree name.
+ * @param cacheGrpId Cache group ID.
+ * @param cacheGrpName Cache group name.
+ * @param pageMem Page memory.
+ * @param wal Write ahead log manager.
+ * @param globalRmvId Remove ID.
+ * @param metaPageId Meta page ID.
+ * @param reuseList Reuse list.
+ * @param innerIos Inner IO versions.
+ * @param leafIos Leaf IO versions.
+ * @param failureProcessor if the tree is corrupted.
+ * @param initNew If tree should be (re)created.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ protected LinkTree(
+ String name,
+ int cacheGrpId,
+ String cacheGrpName,
+ PageMemory pageMem,
+ IgniteWriteAheadLogManager wal,
+ AtomicLong globalRmvId,
+ long metaPageId,
+ ReuseList reuseList,
+ IOVersions<? extends BPlusInnerIO<LinkMapping>> innerIos,
+ IOVersions<? extends BPlusLeafIO<LinkMapping>> leafIos,
+ @Nullable FailureProcessor failureProcessor,
+ @Nullable PageLockListener lockLsnr,
+ boolean initNew
+ ) throws IgniteCheckedException {
+ super(name, cacheGrpId, cacheGrpName, pageMem, wal, globalRmvId, metaPageId, reuseList, innerIos, leafIos, FLAG_AUX, failureProcessor, lockLsnr);
+
+ PageIO.registerTest(latestInnerIO(), latestLeafIO());
+
+ initTree(initNew);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int compare(BPlusIO<LinkMapping> io, long pageAddr, int idx, LinkMapping row) throws IgniteCheckedException {
+ LinkMapping lookupRow = io.getLookupRow(this, pageAddr, idx);
+
+ return Long.compare(lookupRow.getOldLink(), row.getOldLink());
+ }
+
+ /** {@inheritDoc} */
+ @Override public LinkMapping getRow(BPlusIO<LinkMapping> io, long pageAddr, int idx, Object x) throws IgniteCheckedException {
+ return io.getLookupRow(this, pageAddr, idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long allocatePageNoReuse() throws IgniteCheckedException {
+ return pageMem.allocatePage(grpId, PageIdUtils.partId(metaPageId), FLAG_DATA);
+ }
+ }
+
+ /**
+ * Class holding mapping from old link to new link.
+ */
+ private static class LinkMapping {
+ /** Old link. */
+ private final long oldLink;
+
+ /** New link. */
+ private final long newLink;
+
+ /**
+ * @param oldLink Old link.
+ * @param newLink New link.
+ */
+ public LinkMapping(long oldLink, long newLink) {
+ this.oldLink = oldLink;
+ this.newLink = newLink;
+ }
+
+ /** */
+ public long getOldLink() {
+ return oldLink;
+ }
+
+ /** */
+ public long getNewLink() {
+ return newLink;
+ }
+ }
+
+ /** */
+ private static class LinkMappingInnerIO extends BPlusInnerIO<LinkMapping> {
+ /** */
+ protected LinkMappingInnerIO() {
+ super(PageIO.T_DEFRAG_LINK_MAPPING_INNER, 1, true, Long.BYTES * 2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void storeByOffset(long pageAddr, int off, LinkMapping row) {
+ PageUtils.putLong(pageAddr, off, row.getOldLink());
+ PageUtils.putLong(pageAddr, off + Long.BYTES, row.getNewLink());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dst, int dstIdx, BPlusIO<LinkMapping> srcIo, long src, int srcIdx)
+ throws IgniteCheckedException {
+ assert srcIo == this;
+
+ storeByOffset(dst, offset(dstIdx), srcIo.getLookupRow(null, src, srcIdx));
+ }
+
+ /** {@inheritDoc} */
+ @Override public LinkMapping getLookupRow(BPlusTree<LinkMapping, ?> tree, long pageAddr, int idx) {
+ long oldLink = PageUtils.getLong(pageAddr, offset(idx));
+ long newLink = PageUtils.getLong(pageAddr, offset(idx) + Long.BYTES);
+
+ return new LinkMapping(oldLink, newLink);
+ }
+ }
+
+ /** */
+ private static class LinkMappingLeafIO extends BPlusLeafIO<LinkMapping> {
+ /** */
+ protected LinkMappingLeafIO() {
+ super(PageIO.T_DEFRAG_LINK_MAPPING_LEAF, 1, Long.BYTES * 2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void storeByOffset(long pageAddr, int off, LinkMapping row) {
+ PageUtils.putLong(pageAddr, off, row.getOldLink());
+ PageUtils.putLong(pageAddr, off + Long.BYTES, row.getNewLink());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dst, int dstIdx, BPlusIO<LinkMapping> srcIo, long src, int srcIdx)
+ throws IgniteCheckedException {
+ assert srcIo == this;
+
+ storeByOffset(dst, offset(dstIdx), srcIo.getLookupRow(null, src, srcIdx));
+ }
+
+ /** {@inheritDoc} */
+ @Override public LinkMapping getLookupRow(BPlusTree<LinkMapping, ?> tree, long pageAddr, int idx) {
+ long oldLink = PageUtils.getLong(pageAddr, offset(idx));
+ long newLink = PageUtils.getLong(pageAddr, offset(idx) + Long.BYTES);
+
+ return new LinkMapping(oldLink, newLink);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/PageStoreMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/PageStoreMap.java
new file mode 100644
index 0000000..946fea1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/PageStoreMap.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.pagemem.store.PageStoreCollection;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.collection.IntRWHashMap;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/** */
+class PageStoreMap implements PageStoreCollection {
+ /** GroupId -> PartId -> PageStore */
+ private final IntMap<IntMap<PageStore>> grpPageStoresMap = new IntRWHashMap<>();
+
+ /** */
+ public void addPageStore(
+ int grpId,
+ int partId,
+ PageStore pageStore
+ ) {
+ IntMap<PageStore> pageStoresMap = grpPageStoresMap.get(grpId);
+
+ //This code cannot be used concurrently. If we decide to parallel defragmentation then we should correct current class.
+ if (pageStoresMap == null)
+ grpPageStoresMap.put(grpId, pageStoresMap = new IntRWHashMap<>());
+
+ pageStoresMap.put(partId, pageStore);
+ }
+
+ /** */
+ public void removePageStore(
+ int grpId,
+ int partId
+ ) {
+ IntMap<PageStore> pageStoresMap = grpPageStoresMap.get(grpId);
+
+ if (pageStoresMap != null)
+ pageStoresMap.remove(partId);
+ }
+
+ /** */
+ public void clear(int grpId) {
+ grpPageStoresMap.remove(grpId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public PageStore getStore(int grpId, int partId) throws IgniteCheckedException {
+ IntMap<PageStore> partPageStoresMap = grpPageStoresMap.get(grpId);
+
+ if (partPageStoresMap == null) {
+ throw new IgniteCheckedException(S.toString("Page store map not found. ",
+ "grpId", grpId, false,
+ "partId", partId, false,
+ "keys", Arrays.toString(grpPageStoresMap.keys()), false,
+ "this", hashCode(), false
+ ));
+ }
+
+ PageStore pageStore = partPageStoresMap.get(partId);
+
+ if (pageStore == null) {
+ throw new IgniteCheckedException(S.toString("Page store not found. ",
+ "grpId", grpId, false,
+ "partId", partId, false,
+ "keys", Arrays.toString(partPageStoresMap.keys()), false,
+ "this", hashCode(), false
+ ));
+ }
+
+ return pageStore;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<PageStore> getStores(int grpId) throws IgniteCheckedException {
+ IntMap<PageStore> partPageStoresMap = grpPageStoresMap.get(grpId);
+
+ if (partPageStoresMap == null) {
+ throw new IgniteCheckedException(S.toString("Page store map not found. ",
+ "grpId", grpId, false,
+ "keys", Arrays.toString(grpPageStoresMap.keys()), false,
+ "this", hashCode(), false
+ ));
+ }
+
+ return Arrays.asList(partPageStoresMap.values());
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/TreeIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/TreeIterator.java
new file mode 100644
index 0000000..90e47c9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/TreeIterator.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/** */
+public class TreeIterator {
+ /** Direct memory buffer with a size of one page. */
+ private final ByteBuffer pageBuf;
+
+ /** Offheap page size. */
+ private final int pageSize;
+
+ /** */
+ public TreeIterator(int size) {
+ pageSize = size;
+
+ pageBuf = ByteBuffer.allocateDirect(pageSize);
+ }
+
+ /** */
+ public <L, T extends L> void iterate(
+ BPlusTree<L, T> tree,
+ PageMemoryEx pageMemory,
+ BPlusTree.TreeRowClosure<L, T> c
+ ) throws IgniteCheckedException {
+ int grpId = tree.groupId();
+
+ long leafId = findFirstLeafId(grpId, tree.getMetaPageId(), pageMemory);
+
+ long bufAddr = GridUnsafe.bufferAddress(pageBuf);
+
+ while (leafId != 0L) {
+ long leafPage = pageMemory.acquirePage(grpId, leafId);
+
+ BPlusIO<L> io;
+
+ try {
+ long leafPageAddr = pageMemory.readLock(grpId, leafId, leafPage);
+
+ try {
+ io = PageIO.getBPlusIO(leafPageAddr);
+
+ assert io instanceof BPlusLeafIO : io;
+
+ GridUnsafe.copyMemory(leafPageAddr, bufAddr, pageSize);
+ }
+ finally {
+ pageMemory.readUnlock(grpId, leafId, leafPage);
+ }
+ }
+ finally {
+ pageMemory.releasePage(grpId, leafId, leafPage);
+ }
+
+ int cnt = io.getCount(bufAddr);
+
+ for (int idx = 0; idx < cnt; idx++)
+ c.apply(tree, io, bufAddr, idx);
+
+ leafId = io.getForward(bufAddr);
+ }
+ }
+
+ /** */
+ private long findFirstLeafId(int grpId, long metaPageId, PageMemoryEx partPageMemory) throws IgniteCheckedException {
+ long metaPage = partPageMemory.acquirePage(grpId, metaPageId);
+
+ try {
+ long metaPageAddr = partPageMemory.readLock(grpId, metaPageId, metaPage);
+
+ try {
+ BPlusMetaIO metaIO = PageIO.getPageIO(metaPageAddr);
+
+ return metaIO.getFirstPageId(metaPageAddr, 0);
+ }
+ finally {
+ partPageMemory.readUnlock(grpId, metaPageId, metaPage);
+ }
+ }
+ finally {
+ partPageMemory.releasePage(grpId, metaPageId, metaPage);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java
new file mode 100644
index 0000000..6bc3ddc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationParameters.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.maintenance.MaintenanceTask;
+
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.DEFRAGMENTATION_MNTC_TASK_NAME;
+
+/**
+ * Maintenance task for defragmentation.
+ */
+public class DefragmentationParameters {
+ /** */
+ public static final String CACHE_GROUP_ID_SEPARATOR = ",";
+
+ /** */
+ private final List<Integer> cacheGrpIds;
+
+ /**
+ * @param cacheGrpIds Id of cache group for defragmentations.
+ */
+ private DefragmentationParameters(List<Integer> cacheGrpIds) {
+ this.cacheGrpIds = cacheGrpIds;
+ }
+
+ /**
+ * Convert parameter to maintenance storage.
+ *
+ * @param cacheGroupIds Cache group ids for defragmentation.
+ * @return Maintenance task.
+ */
+ public static MaintenanceTask toStore(List<Integer> cacheGroupIds) {
+ return new MaintenanceTask(
+ DEFRAGMENTATION_MNTC_TASK_NAME,
+ "Cache group defragmentation",
+ cacheGroupIds.stream()
+ .map(String::valueOf)
+ .collect(Collectors.joining(CACHE_GROUP_ID_SEPARATOR))
+ );
+ }
+
+ /**
+ * @param rawTask Task from maintenance storage.
+ * @return Defragmentation parameters.
+ */
+ public static DefragmentationParameters fromStore(MaintenanceTask rawTask) {
+ return new DefragmentationParameters(Arrays.stream(rawTask.parameters()
+ .split(CACHE_GROUP_ID_SEPARATOR))
+ .map(Integer::valueOf)
+ .collect(Collectors.toList())
+ );
+ }
+
+ /**
+ * @return Cache groups ids.
+ */
+ public List<Integer> cacheGroupIds() {
+ return cacheGrpIds;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java
new file mode 100644
index 0000000..a809579
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/DefragmentationWorkflowCallback.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
+import org.apache.ignite.maintenance.MaintenanceAction;
+import org.apache.ignite.maintenance.MaintenanceWorkflowCallback;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Defragmentation specific callback for maintenance mode.
+ */
+public class DefragmentationWorkflowCallback implements MaintenanceWorkflowCallback {
+ /** Defragmentation manager. */
+ private final CachePartitionDefragmentationManager defrgMgr;
+
+ /** Logger provider. */
+ private final Function<Class<?>, IgniteLogger> logProvider;
+
+ /**
+ * @param logProvider Logger provider.
+ * @param defrgMgr Defragmentation manager.
+ */
+ public DefragmentationWorkflowCallback(
+ Function<Class<?>, IgniteLogger> logProvider,
+ CachePartitionDefragmentationManager defrgMgr
+ ) {
+ this.defrgMgr = defrgMgr;
+ this.logProvider = logProvider;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean shouldProceedWithMaintenance() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull List<MaintenanceAction<?>> allActions() {
+ return Collections.singletonList(automaticAction());
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable MaintenanceAction<Boolean> automaticAction() {
+ return new ExecuteDefragmentationAction(logProvider, defrgMgr);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java
new file mode 100644
index 0000000..42b2de7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/maintenance/ExecuteDefragmentationAction.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance;
+
+import java.util.function.Function;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager;
+import org.apache.ignite.maintenance.MaintenanceAction;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Action which allows to start the defragmentation process.
+ */
+class ExecuteDefragmentationAction implements MaintenanceAction<Boolean> {
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Defragmentation manager. */
+ private final CachePartitionDefragmentationManager defrgMgr;
+
+ /**
+ * @param logFunction Logger provider.
+ * @param defrgMgr Defragmentation manager.
+ */
+ public ExecuteDefragmentationAction(
+ Function<Class<?>, IgniteLogger> logFunction,
+ CachePartitionDefragmentationManager defrgMgr
+ ) {
+ this.log = logFunction.apply(ExecuteDefragmentationAction.class);
+ this.defrgMgr = defrgMgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean execute() {
+ try {
+ defrgMgr.executeDefragmentation();
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ log.error("Defragmentation is failed", e);
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull String name() {
+ return "execute";
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable String description() {
+ return "Starting the process of defragmentation.";
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 3f5f8a9..f8f28d8 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageReadWriteManager;
@@ -87,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
import org.apache.ignite.maintenance.MaintenanceTask;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
@@ -731,6 +733,13 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
try {
boolean dirExisted = checkAndInitCacheWorkDir(cacheWorkDir);
+ if (dirExisted) {
+ MaintenanceRegistry mntcReg = cctx.kernalContext().maintenanceRegistry();
+
+ if (!mntcReg.isMaintenanceMode())
+ DefragmentationFileUtils.beforeInitPageStores(cacheWorkDir, log);
+ }
+
File idxFile = new File(cacheWorkDir, INDEX_FILE_NAME);
if (dirExisted && !idxFile.exists())
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index 070d426..eb90b2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLogInnerIO;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLogLeafIO;
import org.apache.ignite.internal.processors.cache.persistence.IndexStorageImpl;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.LinkMap;
import org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListNodeIO;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageBPlusIO;
@@ -258,6 +259,12 @@ public abstract class PageIO {
/** */
public static final short T_MARKER_PAGE = 33;
+ /** */
+ public static final short T_DEFRAG_LINK_MAPPING_INNER = 34;
+
+ /** */
+ public static final short T_DEFRAG_LINK_MAPPING_LEAF = 35;
+
/** Index for payload == 1. */
public static final short T_H2_EX_REF_LEAF_START = 10_000;
@@ -799,6 +806,12 @@ public abstract class PageIO {
case T_DATA_REF_METASTORAGE_LEAF:
return (Q)MetastorageBPlusIO.LEAF_IO_VERSIONS.forVersion(ver);
+ case T_DEFRAG_LINK_MAPPING_INNER:
+ return (Q) LinkMap.INNER_IO_VERSIONS.forVersion(ver);
+
+ case T_DEFRAG_LINK_MAPPING_LEAF:
+ return (Q) LinkMap.LEAF_IO_VERSIONS.forVersion(ver);
+
default:
// For tests.
if (innerTestIO != null && innerTestIO.getType() == type && innerTestIO.getVersion() == ver)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index d282f4d..236de0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -33,16 +33,21 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.IgniteMBeansManager;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointTimeoutLock;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.LinkMap;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcParameterMeta;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.collection.IntMap;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
@@ -487,4 +492,23 @@ public interface GridQueryIndexing {
default Map<String, Integer> secondaryIndexesInlineSize() {
return Collections.emptyMap();
}
+
+ /**
+ * Defragment index partition.
+ *
+ * @param grpCtx Old group context.
+ * @param newCtx New group context.
+ * @param partPageMem Partition page memory.
+ * @param mappingByPart Mapping page memory.
+ * @param cpLock Defragmentation checkpoint read lock.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ void defragment(
+ CacheGroupContext grpCtx,
+ CacheGroupContext newCtx,
+ PageMemoryEx partPageMem,
+ IntMap<LinkMap> mappingByPart,
+ CheckpointTimeoutLock cpLock
+ ) throws IgniteCheckedException;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/collection/IntHashMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/collection/IntHashMap.java
index ada5276..2160590 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/collection/IntHashMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/collection/IntHashMap.java
@@ -181,6 +181,32 @@ public class IntHashMap<V> implements IntMap<V> {
}
/** {@inheritDoc} */
+ @Override public int[] keys() {
+ int[] keys = new int[size];
+
+ int idx = 0;
+
+ for (Entry<V> entry : entries)
+ if (entry != null)
+ keys[idx++] = entry.key;
+
+ return keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V[] values() {
+ V[] vals = (V[])new Object[size];
+
+ int idx = 0;
+
+ for (Entry<V> entry : entries)
+ if (entry != null)
+ vals[idx++] = entry.val;
+
+ return vals;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean containsKey(int key) {
return find(key) >= 0;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/collection/IntMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/collection/IntMap.java
index f1bbe51..c606003 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/collection/IntMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/collection/IntMap.java
@@ -82,4 +82,10 @@ public interface IntMap<V> {
* Returns <tt>true</tt> if this map contains no key-value mappings.
*/
boolean isEmpty();
+
+ /** Returns array of keys. */
+ int[] keys();
+
+ /** Return array of values. */
+ V[] values();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/collection/IntRWHashMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/collection/IntRWHashMap.java
index 8d379bb..52cffaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/collection/IntRWHashMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/collection/IntRWHashMap.java
@@ -107,6 +107,28 @@ public class IntRWHashMap<V> implements IntMap<V> {
}
/** {@inheritDoc} */
+ @Override public int[] keys() {
+ lock.readLock().lock();
+ try {
+ return delegate.keys();
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public V[] values() {
+ lock.readLock().lock();
+ try {
+ return delegate.values();
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public boolean containsKey(int key) {
lock.readLock().lock();
try {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java b/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java
index 3583138..a526764 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java
@@ -1679,6 +1679,52 @@ public class GridToStringBuilder {
}
/**
+ * Produces uniformed output of string with context properties
+ *
+ * @param str Output prefix or {@code null} if empty.
+ * @param triplets Triplets {@code {name, value, sencitivity}}.
+ * @return String presentation.
+ */
+ public static String toString(String str, Object... triplets) {
+ if (triplets.length % 3 != 0)
+ throw new IllegalArgumentException("Array length must be a multiple of 3");
+
+ int propCnt = triplets.length / 3;
+
+ Object[] propNames = new Object[propCnt];
+ Object[] propVals = new Object[propCnt];
+ boolean[] propSens = new boolean[propCnt];
+
+ for (int i = 0; i < propCnt; i++) {
+ Object name = triplets[i * 3];
+
+ assert name != null;
+
+ propNames[i] = name;
+
+ propVals[i] = triplets[i * 3 + 1];
+
+ Object sens = triplets[i * 3 + 2];
+
+ assert sens instanceof Boolean;
+
+ propSens[i] = (Boolean)sens;
+ }
+
+ SBLimitedLength sb = threadLocSB.get();
+
+ boolean newStr = sb.length() == 0;
+
+ try {
+ return toStringImpl(str, sb, propNames, propVals, propSens, propCnt);
+ }
+ finally {
+ if (newStr)
+ sb.reset();
+ }
+ }
+
+ /**
* Creates an uniformed string presentation for the binary-like object.
*
* @param str Output prefix or {@code null} if empty.
diff --git a/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java
index 3ce1aea..9cebef0 100644
--- a/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/maintenance/MaintenanceRegistry.java
@@ -20,6 +20,7 @@ package org.apache.ignite.maintenance;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.lang.IgniteExperimental;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -153,4 +154,22 @@ public interface MaintenanceRegistry {
* and their {@link MaintenanceAction maintenance actions} are not executed.
*/
public void prepareAndExecuteMaintenance();
+
+ /**
+ * Call the {@link #registerWorkflowCallback(String, MaintenanceWorkflowCallback)} if the active maintenance task
+ * with given name exists.
+ *
+ * @param maintenanceTaskName name of {@link MaintenanceTask} this callback is registered for.
+ * @param workflowCalProvider provider of {@link MaintenanceWorkflowCallback} which construct the callback by given
+ * task.
+ */
+ public default void registerWorkflowCallbackIfTaskExists(
+ @NotNull String maintenanceTaskName,
+ @NotNull IgniteThrowableFunction<MaintenanceTask, MaintenanceWorkflowCallback> workflowCalProvider
+ ) throws IgniteCheckedException {
+ MaintenanceTask task = activeMaintenanceTask(maintenanceTaskName);
+
+ if (task != null)
+ registerWorkflowCallback(maintenanceTaskName, workflowCalProvider.apply(task));
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationEncryptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationEncryptionTest.java
new file mode 100644
index 0000000..f1ef929
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationEncryptionTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.encryption.AbstractEncryptionTest;
+import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi;
+
+/** */
+public class IgnitePdsDefragmentationEncryptionTest extends IgnitePdsDefragmentationTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ KeystoreEncryptionSpi encSpi = new KeystoreEncryptionSpi();
+
+ encSpi.setKeyStorePath(AbstractEncryptionTest.KEYSTORE_PATH);
+ encSpi.setKeyStorePassword(AbstractEncryptionTest.KEYSTORE_PASSWORD.toCharArray());
+
+ cfg.setEncryptionSpi(encSpi);
+
+ for (CacheConfiguration<?, ?> ccfg : cfg.getCacheConfiguration())
+ ccfg.setEncryptionEnabled(true);
+
+ return cfg;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationRandomLruEvictionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationRandomLruEvictionTest.java
new file mode 100644
index 0000000..7709d76
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationRandomLruEvictionTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/** */
+public class IgnitePdsDefragmentationRandomLruEvictionTest extends IgnitePdsDefragmentationTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getDataStorageConfiguration()
+ .getDefaultDataRegionConfiguration()
+ .setPageEvictionMode(DataPageEvictionMode.RANDOM_LRU);
+
+ return cfg;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java
new file mode 100644
index 0000000..8f06a48
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsDefragmentationTest.java
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.FileVisitor;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+import java.util.stream.IntStream;
+import javax.cache.configuration.Factory;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.maintenance.MaintenanceFileStore;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentationCompletionMarkerFile;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedIndexFile;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartFile;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartMappingFile;
+import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters.toStore;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+
+/** */
+public class IgnitePdsDefragmentationTest extends GridCommonAbstractTest {
+ /** */
+ public static final String CACHE_2_NAME = "cache2";
+
+ /** */
+ public static final int PARTS = 5;
+
+ /** */
+ public static final int ADDED_KEYS_COUNT = 150;
+
+ /** */
+ protected static final String GRP_NAME = "group";
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids(true);
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids(true);
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String igniteInstanceName) {
+ return new StopNodeFailureHandler();
+ }
+
+ /** */
+ protected static class PolicyFactory implements Factory<ExpiryPolicy> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public ExpiryPolicy create() {
+ return new ExpiryPolicy() {
+ @Override public Duration getExpiryForCreation() {
+ return new Duration(TimeUnit.MILLISECONDS, 13000);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForAccess() {
+ return new Duration(TimeUnit.MILLISECONDS, 13000);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForUpdate() {
+ return new Duration(TimeUnit.MILLISECONDS, 13000);
+ }
+ };
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration();
+ dsCfg.setWalSegmentSize(4 * 1024 * 1024);
+
+ dsCfg.setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setInitialSize(100L * 1024 * 1024)
+ .setMaxSize(1024L * 1024 * 1024)
+ .setPersistenceEnabled(true)
+ );
+
+ cfg.setDataStorageConfiguration(dsCfg);
+
+ CacheConfiguration<?, ?> cache1Cfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setGroupName(GRP_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS));
+
+ CacheConfiguration<?, ?> cache2Cfg = new CacheConfiguration<>(CACHE_2_NAME)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setGroupName(GRP_NAME)
+ .setExpiryPolicyFactory(new PolicyFactory())
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS));
+
+ cfg.setCacheConfiguration(cache1Cfg, cache2Cfg);
+
+ return cfg;
+ }
+
+ /**
+ * Basic test scenario. Does following steps:
+ * - Start node;
+ * - Fill cache;
+ * - Remove part of data;
+ * - Stop node;
+ * - Start node in defragmentation mode;
+ * - Stop node;
+ * - Start node;
+ * - Check that partitions became smaller;
+ * - Check that cache is accessible and works just fine.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSuccessfulDefragmentation() throws Exception {
+ IgniteEx ig = startGrid(0);
+
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ fillCache(ig.cache(DEFAULT_CACHE_NAME));
+
+ forceCheckpoint(ig);
+
+ createMaintenanceRecord();
+
+ stopGrid(0);
+
+ File workDir = resolveCacheWorkDir(ig);
+
+ long[] oldPartLen = partitionSizes(workDir);
+
+ long oldIdxFileLen = new File(workDir, FilePageStoreManager.INDEX_FILE_NAME).length();
+
+ startGrid(0);
+
+ long[] newPartLen = partitionSizes(workDir);
+
+ for (int p = 0; p < PARTS; p++)
+ assertTrue(newPartLen[p] < oldPartLen[p]); //TODO Fails.
+
+ long newIdxFileLen = new File(workDir, FilePageStoreManager.INDEX_FILE_NAME).length();
+
+ assertTrue(newIdxFileLen <= oldIdxFileLen);
+
+ File completionMarkerFile = defragmentationCompletionMarkerFile(workDir);
+ assertTrue(completionMarkerFile.exists());
+
+ stopGrid(0);
+
+ IgniteEx ig0 = startGrid(0);
+
+ ig0.cluster().state(ClusterState.ACTIVE);
+
+ assertFalse(completionMarkerFile.exists());
+
+ validateCache(grid(0).cache(DEFAULT_CACHE_NAME));
+
+ validateLeftovers(workDir);
+ }
+
+ /**
+ * @return Working directory for cache group {@link IgnitePdsDefragmentationTest#GRP_NAME}.
+ * @throws IgniteCheckedException If failed for some reason, like if it's a file instead of directory.
+ */
+ private File resolveCacheWorkDir(IgniteEx ig) throws IgniteCheckedException {
+ File dbWorkDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false);
+
+ File nodeWorkDir = new File(dbWorkDir, U.maskForFileName(ig.name()));
+
+ return new File(nodeWorkDir, FilePageStoreManager.CACHE_GRP_DIR_PREFIX + GRP_NAME);
+ }
+
+ /**
+ * Force checkpoint and wait for it so all partitions will be in their final state after restart if no more data is
+ * uploaded.
+ *
+ * @param ig Ignite node.
+ * @throws IgniteCheckedException If checkpoint failed for some reason.
+ */
+ private void forceCheckpoint(IgniteEx ig) throws IgniteCheckedException {
+ ig.context().cache().context().database()
+ .forceCheckpoint("testDefrag")
+ .futureFor(CheckpointState.FINISHED)
+ .get();
+ }
+
+ /** */
+ protected void createMaintenanceRecord() throws IgniteCheckedException {
+ IgniteEx grid = grid(0);
+ MaintenanceRegistry mntcReg = grid.context().maintenanceRegistry();
+
+ mntcReg.registerMaintenanceTask(toStore(Collections.singletonList(groupIdForCache(grid, DEFAULT_CACHE_NAME))));
+ }
+
+ /**
+ * Returns array that contains sizes of partition files in gived working directories. Assumes that partitions
+ * {@code 0} to {@code PARTS - 1} exist in that dir.
+ *
+ * @param workDir Working directory.
+ * @return The array.
+ */
+ protected long[] partitionSizes(File workDir) {
+ return IntStream.range(0, PARTS)
+ .mapToObj(p -> new File(workDir, String.format(FilePageStoreManager.PART_FILE_TEMPLATE, p)))
+ .mapToLong(File::length)
+ .toArray();
+ }
+
+ /**
+ * Checks that plain node start after failed defragmentation will finish batch renaming.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFailoverRestartWithoutDefragmentation() throws Exception {
+ testFailover(workDir -> {
+ try {
+ File mntcRecFile = new File(workDir.getParent(), MaintenanceFileStore.MAINTENANCE_FILE_NAME);
+
+ assertTrue(mntcRecFile.exists());
+
+ Files.delete(mntcRecFile.toPath());
+
+ startGrid(0);
+
+ validateLeftovers(workDir);
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ finally {
+ createMaintenanceRecord();
+
+ stopGrid(0);
+ }
+ });
+ }
+
+ /**
+ * Checks that second start in defragmentation mode will finish defragmentation if no completion marker was found.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFailoverOnLastStage() throws Exception {
+ testFailover(workDir -> {});
+ }
+
+ /**
+ * Checks that second start in defragmentation mode will finish defragmentation if index was not defragmented.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFailoverIncompletedIndex() throws Exception {
+ testFailover(workDir -> move(
+ DefragmentationFileUtils.defragmentedIndexFile(workDir),
+ DefragmentationFileUtils.defragmentedIndexTmpFile(workDir)
+ ));
+ }
+
+ /**
+ * Checks that second start in defragmentation mode will finish defragmentation if partition was not defragmented.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFailoverIncompletedPartition1() throws Exception {
+ testFailover(workDir -> {
+ DefragmentationFileUtils.defragmentedIndexFile(workDir).delete();
+
+ move(
+ DefragmentationFileUtils.defragmentedPartFile(workDir, PARTS - 1),
+ DefragmentationFileUtils.defragmentedPartTmpFile(workDir, PARTS - 1)
+ );
+ });
+ }
+
+ /**
+ * Checks that second start in defragmentation mode will finish defragmentation if no mapping was found for partition.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFailoverIncompletedPartition2() throws Exception {
+ testFailover(workDir -> {
+ DefragmentationFileUtils.defragmentedIndexFile(workDir).delete();
+
+ DefragmentationFileUtils.defragmentedPartMappingFile(workDir, PARTS - 1).delete();
+ });
+ }
+
+ /** */
+ private void move(File from, File to) throws IgniteCheckedException {
+ try {
+ Files.move(from.toPath(), to.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ /** */
+ private void testFailover(IgniteThrowableConsumer<File> c) throws Exception {
+ IgniteEx ig = startGrid(0);
+
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ fillCache(ig.cache(DEFAULT_CACHE_NAME));
+
+ forceCheckpoint(ig);
+
+ createMaintenanceRecord();
+
+ stopGrid(0);
+
+ File workDir = resolveCacheWorkDir(ig);
+
+ String errMsg = "Failed to create defragmentation completion marker.";
+
+ AtomicBoolean errOccurred = new AtomicBoolean();
+
+ UnaryOperator<IgniteConfiguration> cfgOp = cfg -> {
+ DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
+
+ FileIOFactory delegate = dsCfg.getFileIOFactory();
+
+ dsCfg.setFileIOFactory((file, modes) -> {
+ if (file.equals(defragmentationCompletionMarkerFile(workDir))) {
+ errOccurred.set(true);
+
+ throw new IOException(errMsg);
+ }
+
+ return delegate.create(file, modes);
+ });
+
+ return cfg;
+ };
+
+ try {
+ startGrid(0, cfgOp);
+ }
+ catch (Exception ignore) {
+ // No-op.
+ }
+
+ // Failed node can leave interrupted status of the thread that needs to be cleared,
+ // otherwise following "wait" wouldn't work.
+ // This call can't be moved inside of "catch" block because interruption can actually be silent.
+ Thread.interrupted();
+
+ assertTrue(GridTestUtils.waitForCondition(errOccurred::get, 10_000L));
+
+ assertTrue(GridTestUtils.waitForCondition(() -> G.allGrids().isEmpty(), 10_000L));
+
+ c.accept(workDir);
+
+ startGrid(0);
+
+ stopGrid(0);
+
+ // Everything must be completed.
+ startGrid(0).cluster().state(ClusterState.ACTIVE);
+
+ validateCache(grid(0).cache(DEFAULT_CACHE_NAME));
+
+ validateLeftovers(workDir);
+ }
+
+ /** */
+ public void validateLeftovers(File workDir) {
+ assertFalse(defragmentedIndexFile(workDir).exists());
+
+ for (int p = 0; p < PARTS; p++) {
+ assertFalse(defragmentedPartMappingFile(workDir, p).exists());
+
+ assertFalse(defragmentedPartFile(workDir, p).exists());
+ }
+ }
+
+ /** */
+ @Test
+ public void testDefragmentedPartitionCreated() throws Exception {
+ IgniteEx ig = startGrid(0);
+
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ fillCache(ig.cache(DEFAULT_CACHE_NAME));
+
+ fillCache(ig.getOrCreateCache(CACHE_2_NAME));
+
+ createMaintenanceRecord();
+
+ stopGrid(0);
+
+ startGrid(0);
+
+ File workDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false);
+
+ AtomicReference<File> cachePartFile = new AtomicReference<>();
+ AtomicReference<File> defragCachePartFile = new AtomicReference<>();
+
+ Files.walkFileTree(workDir.toPath(), new FileVisitor<Path>() {
+ @Override public FileVisitResult preVisitDirectory(Path path, BasicFileAttributes basicFileAttributes) throws IOException {
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override public FileVisitResult visitFile(Path path, BasicFileAttributes basicFileAttributes) throws IOException {
+ if (path.toString().contains("cacheGroup-group")) {
+ File file = path.toFile();
+
+ if (file.getName().contains("part-dfrg-"))
+ cachePartFile.set(file);
+ else if (file.getName().contains("part-"))
+ defragCachePartFile.set(file);
+ }
+
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override public FileVisitResult visitFileFailed(Path path, IOException e) throws IOException {
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override public FileVisitResult postVisitDirectory(Path path, IOException e) throws IOException {
+ return FileVisitResult.CONTINUE;
+ }
+ });
+
+ assertNull(cachePartFile.get()); //TODO Fails.
+ assertNotNull(defragCachePartFile.get());
+ }
+
+ /**
+ * Fill cache using integer keys.
+ *
+ * @param cache
+ */
+ protected void fillCache(IgniteCache<Integer, Object> cache) {
+ fillCache(Function.identity(), cache);
+ }
+
+ /** */
+ protected <T> void fillCache(Function<Integer, T> keyMapper, IgniteCache<T, Object> cache) {
+ try (IgniteDataStreamer<T, Object> ds = grid(0).dataStreamer(cache.getName())) {
+ for (int i = 0; i < ADDED_KEYS_COUNT; i++) {
+ byte[] val = new byte[8192];
+ new Random().nextBytes(val);
+
+ ds.addData(keyMapper.apply(i), val);
+ }
+ }
+
+ try (IgniteDataStreamer<T, Object> ds = grid(0).dataStreamer(cache.getName())) {
+ ds.allowOverwrite(true);
+
+ for (int i = 0; i <= ADDED_KEYS_COUNT / 2; i++)
+ ds.removeData(keyMapper.apply(i * 2));
+ }
+ }
+
+ /** */
+ public void validateCache(IgniteCache<Object, Object> cache) {
+ for (int k = 0; k < ADDED_KEYS_COUNT; k++) {
+ Object val = cache.get(k);
+
+ if (k % 2 == 0)
+ assertNull(val);
+ else
+ assertNotNull(val);
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java
index 85da92c..1a67716 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/LightweightCheckpointTest.java
@@ -37,7 +37,6 @@ import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.LightweightCheckpointManager;
-import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
@@ -155,8 +154,7 @@ public class LightweightCheckpointTest extends GridCommonAbstractTest {
db.persistentStoreMetricsImpl(),
context.longJvmPauseDetector(),
context.failure(),
- context.cache(),
- (FilePageStoreManager)context.cache().context().pageStore()
+ context.cache()
);
//and: Add checkpoint listener for DEFAULT_CACHE in order of storing the meta pages.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/LinkMapTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/LinkMapTest.java
new file mode 100644
index 0000000..ee2d436
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/LinkMapTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Class for LinkMap tests.
+ */
+public class LinkMapTest extends GridCommonAbstractTest {
+ /** */
+ protected static final int PAGE_SIZE = 512;
+
+ /** */
+ protected static final long MB = 1024 * 1024;
+
+ /**
+ * Test that LinkMap works.
+ * @throws Exception
+ */
+ @Test
+ public void test() throws Exception {
+ PageMemory pageMem = createPageMemory();
+
+ int cacheGroupId = 1;
+
+ String groupName = "test";
+
+ FullPageId pageId = new FullPageId(pageMem.allocatePage(cacheGroupId, 0, PageIdAllocator.FLAG_DATA), cacheGroupId);
+
+ LinkMap map = new LinkMap(cacheGroupId, groupName, pageMem, pageId.pageId(), true);
+
+ for (int i = 0; i < 10_000; i++)
+ map.put(i, i + 1);
+
+ for (int i = 0; i < 10_000; i++)
+ assertEquals(i + 1, map.get(i));
+ }
+
+ /**
+ * Create page memory for LinkMap tree.
+ */
+ protected PageMemory createPageMemory() throws Exception {
+ DataRegionConfiguration plcCfg = new DataRegionConfiguration()
+ .setInitialSize(2 * MB)
+ .setMaxSize(2 * MB);
+
+ PageMemory pageMem = new PageMemoryNoStoreImpl(log,
+ new UnsafeMemoryProvider(log),
+ null,
+ PAGE_SIZE,
+ plcCfg,
+ new LongAdderMetric("NO_OP", null),
+ true);
+
+ pageMem.start();
+
+ return pageMem;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
index c5cfce1..5f0b04f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
@@ -30,16 +30,21 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.IgniteMBeansManager;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointTimeoutLock;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.LinkMap;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcParameterMeta;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.collection.IntMap;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
@@ -333,4 +338,15 @@ public class DummyQueryIndexing implements GridQueryIndexing {
String colNamePtrn) {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public void defragment(
+ CacheGroupContext grpCtx,
+ CacheGroupContext newCtx,
+ PageMemoryEx partPageMem,
+ IntMap<LinkMap> mappingByPart,
+ CheckpointTimeoutLock cpLock
+ ) throws IgniteCheckedException {
+ // No-op.
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index d548e38..93ca870 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.RebalanceWithDifferentThreadP
import org.apache.ignite.internal.processors.cache.SetTxTimeoutOnPartitionMapExchangeTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteRejectConnectOnNodeStopTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.EvictPartitionInLogTest;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.LinkMapTest;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagePoolTest;
import org.apache.ignite.internal.processors.cache.query.continuous.DiscoveryDataDeserializationFailureHanderTest;
import org.apache.ignite.internal.processors.cache.transactions.AtomicOperationsInTxTest;
@@ -295,6 +296,8 @@ import org.junit.runners.Suite;
ClusterActivationStartedEventTest.class,
IgniteThreadGroupNodeRestartTest.class,
+
+ LinkMapTest.class,
})
public class IgniteBasicTestSuite {
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java
index 9978761..23256a0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite4.java
@@ -22,6 +22,9 @@ import java.util.Set;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheEntriesExpirationTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDefragmentationEncryptionTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDefragmentationRandomLruEvictionTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDefragmentationTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPartitionPreloadTest;
import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.PageLockTrackerManagerTest;
@@ -68,6 +71,11 @@ public class IgnitePdsMvccTestSuite4 {
ignoredTests.add(OffHeapLockStackTest.class);
ignoredTests.add(IgnitePdsCacheEntriesExpirationTest.class);
+ // Defragmentation.
+ ignoredTests.add(IgnitePdsDefragmentationTest.class);
+ ignoredTests.add(IgnitePdsDefragmentationRandomLruEvictionTest.class);
+ ignoredTests.add(IgnitePdsDefragmentationEncryptionTest.class);
+
return IgnitePdsTestSuite4.suite(ignoredTests);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index be885e0..d634395 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -35,6 +35,9 @@ import org.apache.ignite.internal.processors.cache.persistence.CorruptedTreeFail
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheEntriesExpirationTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsConsistencyOnDelayedPartitionOwning;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDefragmentationEncryptionTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDefragmentationRandomLruEvictionTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDefragmentationTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRemoveDuringRebalancingTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRestartAfterFailedToWriteMetaPageTest;
@@ -122,6 +125,11 @@ public class IgnitePdsTestSuite4 {
GridTestUtils.addTestIfNeeded(suite, WarmUpSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, LoadAllWarmUpStrategySelfTest.class, ignoredTests);
+ // Defragmentation.
+ GridTestUtils.addTestIfNeeded(suite, IgnitePdsDefragmentationTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, IgnitePdsDefragmentationRandomLruEvictionTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, IgnitePdsDefragmentationEncryptionTest.class, ignoredTests);
+
GridTestUtils.addTestIfNeeded(suite, PendingTreeCorruptionTest.class, ignoredTests);
return suite;
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index d06418c..517bc69 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
@@ -82,6 +83,9 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.mvcc.StaticMvccQueryTracker;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.RootPage;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointTimeoutLock;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.LinkMap;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
@@ -128,6 +132,7 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccInnerIO;
import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccLeafIO;
+import org.apache.ignite.internal.processors.query.h2.defragmentation.IndexingDefragmentation;
import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateResultsIterator;
import org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateSingleEntryIterator;
@@ -164,6 +169,7 @@ import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.collection.IntMap;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
@@ -295,6 +301,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** Parser. */
private QueryParser parser;
+ private IndexingDefragmentation defragmentation = new IndexingDefragmentation(this);
+
/** */
private final IgniteInClosure<? super IgniteInternalFuture<?>> logger = new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
@@ -3186,4 +3194,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return map;
}
+
+ /** {@inheritDoc} */
+ @Override public void defragment(
+ CacheGroupContext grpCtx,
+ CacheGroupContext newCtx,
+ PageMemoryEx partPageMem,
+ IntMap<LinkMap> mappingByPart,
+ CheckpointTimeoutLock cpLock
+ ) throws IgniteCheckedException {
+ defragmentation.defragment(grpCtx, newCtx, partPageMem, mappingByPart, cpLock, log);
+ }
}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
new file mode 100644
index 0000000..c41f587
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.defragmentation;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointTimeoutLock;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.LinkMap;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIoResolver;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.InsertLast;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
+import org.apache.ignite.internal.processors.query.h2.database.InlineIndexColumn;
+import org.apache.ignite.internal.processors.query.h2.database.inlinecolumn.AbstractInlineIndexColumn;
+import org.apache.ignite.internal.processors.query.h2.database.io.AbstractH2ExtrasInnerIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.AbstractH2ExtrasLeafIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.AbstractH2InnerIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.AbstractH2LeafIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
+import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.h2.index.Index;
+import org.h2.value.Value;
+
+/**
+ *
+ */
+public class IndexingDefragmentation {
+ /** Indexing. */
+ private final IgniteH2Indexing indexing;
+
+ /** Constructor. */
+ public IndexingDefragmentation(IgniteH2Indexing indexing) {
+ this.indexing = indexing;
+ }
+
+ /**
+ * Defragment index partition.
+ *
+ * @param grpCtx Old group context.
+ * @param newCtx New group context.
+ * @param partPageMem Partition page memory.
+ * @param mappingByPartition Mapping page memory.
+ * @param cpLock Defragmentation checkpoint read lock.
+ * @param log Log.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void defragment(
+ CacheGroupContext grpCtx,
+ CacheGroupContext newCtx,
+ PageMemoryEx partPageMem,
+ IntMap<LinkMap> mappingByPartition,
+ CheckpointTimeoutLock cpLock,
+ IgniteLogger log
+ ) throws IgniteCheckedException {
+ int pageSize = grpCtx.cacheObjectContext().kernalContext().grid().configuration().getDataStorageConfiguration().getPageSize();
+
+ TreeIterator treeIterator = new TreeIterator(pageSize);
+
+ PageMemoryEx oldCachePageMem = (PageMemoryEx)grpCtx.dataRegion().pageMemory();
+
+ PageMemory newCachePageMemory = partPageMem;
+
+ Collection<GridH2Table> tables = indexing.schemaManager().dataTables();
+
+ long cpLockThreshold = 150L;
+
+ cpLock.checkpointReadLock();
+
+ try {
+ AtomicLong lastCpLockTs = new AtomicLong(System.currentTimeMillis());
+
+ for (GridH2Table table : tables) {
+ GridCacheContext<?, ?> cctx = table.cacheContext();
+
+ if (cctx.groupId() != grpCtx.groupId())
+ continue; // Not our index.
+
+ GridH2RowDescriptor rowDesc = table.rowDescriptor();
+
+ List<Index> indexes = table.getIndexes();
+ H2TreeIndex oldH2Idx = (H2TreeIndex)indexes.get(2);
+
+ int segments = oldH2Idx.segmentsCount();
+
+ H2Tree firstTree = oldH2Idx.treeForRead(0);
+
+ PageIoResolver pageIoRslvr = pageAddr -> {
+ PageIO io = PageIoResolver.DEFAULT_PAGE_IO_RESOLVER.resolve(pageAddr);
+
+ if (io instanceof BPlusMetaIO)
+ return io;
+
+ //noinspection unchecked,rawtypes,rawtypes
+ return wrap((BPlusIO)io);
+ };
+
+ H2TreeIndex newIdx = H2TreeIndex.createIndex(
+ cctx,
+ null,
+ table,
+ oldH2Idx.getName(),
+ firstTree.getPk(),
+ firstTree.getAffinityKey(),
+ Arrays.asList(firstTree.cols()),
+ Arrays.asList(firstTree.cols()),
+ oldH2Idx.inlineSize(),
+ segments,
+ newCachePageMemory,
+ newCtx.offheap(),
+ pageIoRslvr,
+ log
+ );
+
+ for (int i = 0; i < segments; i++) {
+ H2Tree tree = oldH2Idx.treeForRead(i);
+
+ treeIterator.iterate(tree, oldCachePageMem, (theTree, io, pageAddr, idx) -> {
+ if (System.currentTimeMillis() - lastCpLockTs.get() >= cpLockThreshold) {
+ cpLock.checkpointReadUnlock();
+
+ cpLock.checkpointReadLock();
+
+ lastCpLockTs.set(System.currentTimeMillis());
+ }
+
+ assert 1 == io.getVersion()
+ : "IO version " + io.getVersion() + " is not supported by current defragmentation algorithm." +
+ " Please implement copying of tree in a new format.";
+
+ BPlusIO<H2Row> h2IO = wrap(io);
+
+ H2Row row = theTree.getRow(h2IO, pageAddr, idx);
+
+ if (row instanceof H2CacheRowWithIndex) {
+ H2CacheRowWithIndex h2CacheRow = (H2CacheRowWithIndex)row;
+
+ CacheDataRow cacheDataRow = h2CacheRow.getRow();
+
+ int partition = cacheDataRow.partition();
+
+ long link = h2CacheRow.link();
+
+ LinkMap map = mappingByPartition.get(partition);
+
+ long newLink = map.get(link);
+
+ H2CacheRowWithIndex newRow = H2CacheRowWithIndex.create(
+ rowDesc,
+ newLink,
+ h2CacheRow,
+ ((H2RowLinkIO)io).storeMvccInfo()
+ );
+
+ newIdx.putx(newRow);
+ }
+
+ return true;
+ });
+ }
+ }
+ }
+ finally {
+ cpLock.checkpointReadUnlock();
+ }
+ }
+
+ /** */
+ private static <T extends BPlusIO<H2Row> & H2RowLinkIO> H2Row lookupRow(
+ BPlusTree<H2Row, ?> tree,
+ long pageAddr,
+ int idx,
+ T io
+ ) throws IgniteCheckedException {
+ long link = io.getLink(pageAddr, idx);
+
+ List<InlineIndexColumn> inlineIdxs = ((H2Tree) tree).inlineIndexes();
+
+ int off = io.offset(idx);
+
+ List<Value> values = new ArrayList<>();
+
+ if (inlineIdxs != null) {
+ int fieldOff = 0;
+
+ for (int i = 0; i < inlineIdxs.size(); i++) {
+ AbstractInlineIndexColumn inlineIndexColumn = (AbstractInlineIndexColumn) inlineIdxs.get(i);
+
+ Value value = inlineIndexColumn.get(pageAddr, off + fieldOff, io.getPayloadSize() - fieldOff);
+
+ fieldOff += inlineIndexColumn.inlineSizeOf(value);
+
+ values.add(value);
+ }
+ }
+
+ if (io.storeMvccInfo()) {
+ long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
+ long mvccCntr = io.getMvccCounter(pageAddr, idx);
+ int mvccOpCntr = io.getMvccOperationCounter(pageAddr, idx);
+
+ H2CacheRow row = (H2CacheRow) ((H2Tree) tree).createMvccRow(link, mvccCrdVer, mvccCntr, mvccOpCntr, CacheDataRowAdapter.RowData.LINK_ONLY);
+
+ return new H2CacheRowWithIndex(row.getDesc(), row.getRow(), values);
+ }
+
+ H2CacheRow row = (H2CacheRow) ((H2Tree) tree).createRow(link, false);
+
+ return new H2CacheRowWithIndex(row.getDesc(), row.getRow(), values);
+ }
+
+ /** */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private static BPlusIO<H2Row> wrap(BPlusIO<H2Row> io) {
+ assert io instanceof H2RowLinkIO;
+
+ if (io instanceof BPlusInnerIO) {
+ assert io instanceof AbstractH2ExtrasInnerIO
+ || io instanceof AbstractH2InnerIO;
+
+ return new BPlusInnerIoDelegate((BPlusInnerIO<H2Row>)io);
+ }
+ else {
+ assert io instanceof AbstractH2ExtrasLeafIO
+ || io instanceof AbstractH2LeafIO;
+
+ return new BPlusLeafIoDelegate((BPlusLeafIO<H2Row>)io);
+ }
+ }
+
+ /** */
+ private static class BPlusInnerIoDelegate<IO extends BPlusInnerIO<H2Row> & H2RowLinkIO>
+ extends BPlusInnerIO<H2Row> implements H2RowLinkIO {
+ /** */
+ private final IO io;
+
+ /** */
+ public BPlusInnerIoDelegate(IO io) {
+ super(io.getType(), io.getVersion(), io.canGetRow(), io.getItemSize());
+ this.io = io;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void storeByOffset(long pageAddr, int off, H2Row row) throws IgniteCheckedException {
+ io.storeByOffset(pageAddr, off, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<H2Row> srcIo, long srcPageAddr, int srcIdx)
+ throws IgniteCheckedException
+ {
+ io.store(dstPageAddr, dstIdx, srcIo, srcPageAddr, srcIdx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public H2Row getLookupRow(BPlusTree<H2Row, ?> tree, long pageAddr, int idx) throws IgniteCheckedException {
+ return lookupRow(tree, pageAddr, idx, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getLink(long pageAddr, int idx) {
+ return io.getLink(pageAddr, idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+ return io.getMvccCoordinatorVersion(pageAddr, idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCounter(long pageAddr, int idx) {
+ return io.getMvccCounter(pageAddr, idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMvccOperationCounter(long pageAddr, int idx) {
+ return io.getMvccOperationCounter(pageAddr, idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeMvccInfo() {
+ return io.storeMvccInfo();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getPayloadSize() {
+ return io.getPayloadSize();
+ }
+ }
+
+ /** */
+ private static class BPlusLeafIoDelegate<IO extends BPlusLeafIO<H2Row> & H2RowLinkIO>
+ extends BPlusLeafIO<H2Row> implements H2RowLinkIO {
+ /** */
+ private final IO io;
+
+ /** */
+ public BPlusLeafIoDelegate(IO io) {
+ super(io.getType(), io.getVersion(), io.getItemSize());
+ this.io = io;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void storeByOffset(long pageAddr, int off, H2Row row) throws IgniteCheckedException {
+ io.storeByOffset(pageAddr, off, row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<H2Row> srcIo, long srcPageAddr, int srcIdx)
+ throws IgniteCheckedException
+ {
+ io.store(dstPageAddr, dstIdx, srcIo, srcPageAddr, srcIdx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public H2Row getLookupRow(BPlusTree<H2Row, ?> tree, long pageAddr, int idx) throws IgniteCheckedException {
+ return lookupRow(tree, pageAddr, idx, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getLink(long pageAddr, int idx) {
+ return io.getLink(pageAddr, idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+ return io.getMvccCoordinatorVersion(pageAddr, idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMvccCounter(long pageAddr, int idx) {
+ return io.getMvccCounter(pageAddr, idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getMvccOperationCounter(long pageAddr, int idx) {
+ return io.getMvccOperationCounter(pageAddr, idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeMvccInfo() {
+ return io.storeMvccInfo();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getPayloadSize() {
+ return io.getPayloadSize();
+ }
+ }
+
+ /**
+ * H2CacheRow with stored index values
+ */
+ private static class H2CacheRowWithIndex extends H2CacheRow implements InsertLast {
+ /** List of index values. */
+ private final List<Value> values;
+
+ /** Constructor. */
+ public H2CacheRowWithIndex(GridH2RowDescriptor desc, CacheDataRow row, List<Value> values) {
+ super(desc, row);
+ this.values = values;
+ }
+
+ public static H2CacheRowWithIndex create(
+ GridH2RowDescriptor desc,
+ long newLink,
+ H2CacheRowWithIndex oldValue,
+ boolean storeMvcc
+ ) {
+ CacheDataRow row = oldValue.getRow();
+
+ CacheDataRow newDataRow;
+
+ if (storeMvcc) {
+ newDataRow = new MvccDataRow(newLink);
+ newDataRow.mvccVersion(row);
+ } else
+ newDataRow = new CacheDataRowAdapter(newLink);
+
+ return new H2CacheRowWithIndex(desc, newDataRow, oldValue.values);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Value getValue(int col) {
+ if (values.isEmpty())
+ return null;
+
+ return values.get(col);
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheUpdateSqlQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheUpdateSqlQuerySelfTest.java
index 95925ff..3607be0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheUpdateSqlQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheUpdateSqlQuerySelfTest.java
@@ -400,7 +400,7 @@ public class IgniteCacheUpdateSqlQuerySelfTest extends IgniteCacheAbstractSqlDml
/**
*
*/
- static final class AllTypes implements Serializable {
+ public static final class AllTypes implements Serializable {
/**
* Data Long.
*/
@@ -602,7 +602,7 @@ public class IgniteCacheUpdateSqlQuerySelfTest extends IgniteCacheAbstractSqlDml
}
/** */
- AllTypes(Long key) {
+ public AllTypes(Long key) {
this.init(key, Long.toString(key));
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
new file mode 100644
index 0000000..bbb69ae
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheUpdateSqlQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.visor.verify.ValidateIndexesClosure;
+import org.apache.ignite.internal.visor.verify.VisorValidateIndexesJobResult;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+
+/**
+ * Defragmentation tests with enabled ignite-indexing.
+ */
+public class IgnitePdsIndexingDefragmentationTest extends IgnitePdsDefragmentationTest {
+ /** Use MVCC in tests. */
+ private static final String USE_MVCC = "USE_MVCC";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ DataStorageConfiguration dsCfg = new DataStorageConfiguration();
+ dsCfg.setWalSegmentSize(4 * 1024 * 1024);
+
+ dsCfg.setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setInitialSize(100L * 1024 * 1024)
+ .setMaxSize(1024L * 1024 * 1024)
+ .setPersistenceEnabled(true)
+ );
+
+ cfg.setDataStorageConfiguration(dsCfg);
+
+ CacheConfiguration<?, ?> cache1Cfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setGroupName(GRP_NAME)
+ .setIndexedTypes(
+ IgniteCacheUpdateSqlQuerySelfTest.AllTypes.class, byte[].class,
+ Integer.class, byte[].class
+ )
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS));
+
+ CacheConfiguration<?, ?> cache2Cfg = new CacheConfiguration<>(CACHE_2_NAME)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setGroupName(GRP_NAME)
+ .setIndexedTypes(
+ IgniteCacheUpdateSqlQuerySelfTest.AllTypes.class, byte[].class,
+ Integer.class, byte[].class
+ )
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS));
+
+ if (Boolean.TRUE.toString().equals(System.getProperty(USE_MVCC))) {
+ cache1Cfg.setAtomicityMode(TRANSACTIONAL_SNAPSHOT);
+ cache2Cfg.setAtomicityMode(TRANSACTIONAL_SNAPSHOT);
+ } else
+ cache2Cfg.setExpiryPolicyFactory(new PolicyFactory());
+
+ cfg.setCacheConfiguration(cache1Cfg, cache2Cfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ GridQueryProcessor.idxCls = null;
+ }
+
+ /**
+ * Fill cache, remove half of the entries, defragmentate PDS and check index.
+ *
+ * @param keyMapper Function that provides key based on the index of entry.
+ * @param <T> Type of cache key.
+ *
+ * @throws Exception If failed.
+ */
+ private <T> void test(Function<Integer, T> keyMapper) throws Exception {
+ IgniteEx ig = startGrid(0);
+
+ ig.cluster().state(ClusterState.ACTIVE);
+
+ fillCache(keyMapper, ig.cache(DEFAULT_CACHE_NAME));
+
+ forceCheckpoint(ig);
+
+ createMaintenanceRecord();
+
+ stopGrid(0);
+
+ File dbWorkDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false);
+ File nodeWorkDir = new File(dbWorkDir, U.maskForFileName(ig.name()));
+ File workDir = new File(nodeWorkDir, FilePageStoreManager.CACHE_GRP_DIR_PREFIX + GRP_NAME);
+
+ long oldIdxFileLen = new File(workDir, FilePageStoreManager.INDEX_FILE_NAME).length();
+
+ startGrid(0);
+
+ long newIdxFileLen = new File(workDir, FilePageStoreManager.INDEX_FILE_NAME).length();
+
+ assertTrue(newIdxFileLen <= oldIdxFileLen);
+
+ File completionMarkerFile = DefragmentationFileUtils.defragmentationCompletionMarkerFile(workDir);
+ assertTrue(completionMarkerFile.exists());
+
+ stopGrid(0);
+
+ GridQueryProcessor.idxCls = CaptureRebuildGridQueryIndexing.class;
+
+ IgniteEx node = startGrid(0);
+
+ awaitPartitionMapExchange();
+
+ CaptureRebuildGridQueryIndexing indexing = (CaptureRebuildGridQueryIndexing) node.context().query().getIndexing();
+
+ assertFalse(indexing.didRebuildIndexes());
+
+ IgniteCache<Object, Object> cache = node.cache(DEFAULT_CACHE_NAME);
+
+ assertFalse(completionMarkerFile.exists());
+
+ validateIndexes(node);
+
+ for (int k = 0; k < ADDED_KEYS_COUNT; k++)
+ cache.get(keyMapper.apply(k));
+ }
+
+ /**
+ * Test that indexes are correct.
+ *
+ * @param node Node.
+ * @throws Exception If failed.
+ */
+ private static void validateIndexes(IgniteEx node) throws Exception {
+ ValidateIndexesClosure clo = new ValidateIndexesClosure(
+ Collections.singleton(DEFAULT_CACHE_NAME),
+ 0,
+ 0,
+ false,
+ true
+ );
+
+ node.context().resource().injectGeneric(clo);
+
+ VisorValidateIndexesJobResult call = clo.call();
+
+ assertFalse(call.hasIssues());
+ }
+
+ /**
+ * Test using integer keys.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testIndexingWithIntegerKey() throws Exception {
+ test(Function.identity());
+ }
+
+ /**
+ * Test using complex keys (integer and string).
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testIndexingWithComplexKey() throws Exception {
+ test(integer -> new IgniteCacheUpdateSqlQuerySelfTest.AllTypes((long)integer));
+ }
+
+ /**
+ * Test using integer keys.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = USE_MVCC, value = "true")
+ public void testIndexingWithIntegerKeyAndMVCC() throws Exception {
+ test(Function.identity());
+ }
+
+ /**
+ * Test using complex keys (integer and string).
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @WithSystemProperty(key = USE_MVCC, value = "true")
+ public void testIndexingWithComplexKeyAndMVCC() throws Exception {
+ test(integer -> new IgniteCacheUpdateSqlQuerySelfTest.AllTypes((long)integer));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMultipleIndexes() throws Exception {
+ startGrid(0).cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<?, ?> cache = grid(0).cache(DEFAULT_CACHE_NAME);
+
+ cache.query(new SqlFieldsQuery("CREATE TABLE TEST (ID INT PRIMARY KEY, VAL_INT INT, VAL_OBJ LONG)"));
+
+ cache.query(new SqlFieldsQuery("CREATE INDEX TEST_VAL_INT ON TEST(VAL_INT)"));
+
+ cache.query(new SqlFieldsQuery("CREATE INDEX TEST_VAL_OBJ ON TEST(VAL_OBJ)"));
+
+ for (int i = 0; i < ADDED_KEYS_COUNT; i++)
+ cache.query(new SqlFieldsQuery("INSERT INTO TEST VALUES (?, ?, ?)").setArgs(i, i, (long)i));
+
+ cache.query(new SqlFieldsQuery("DELETE FROM TEST WHERE MOD(ID, 2) = 0"));
+
+ createMaintenanceRecord();
+
+ // Restart first time.
+ stopGrid(0);
+
+ startGrid(0);
+
+ // Restart second time.
+ stopGrid(0);
+
+ startGrid(0);
+
+ // Reinit cache object.
+ cache = grid(0).cache(DEFAULT_CACHE_NAME);
+
+ assertTrue(explainQuery(cache, "EXPLAIN SELECT * FROM TEST WHERE ID > 0").contains("_key_pk_proxy"));
+
+ cache.query(new SqlFieldsQuery("SELECT * FROM TEST WHERE ID > 0")).getAll();
+
+ assertTrue(explainQuery(cache, "EXPLAIN SELECT * FROM TEST WHERE VAL_INT > 0").contains("test_val_int"));
+
+ cache.query(new SqlFieldsQuery("SELECT * FROM TEST WHERE VAL_INT > 0")).getAll();
+
+ assertTrue(explainQuery(cache, "EXPLAIN SELECT * FROM TEST WHERE VAL_OBJ > 0").contains("test_val_obj"));
+
+ cache.query(new SqlFieldsQuery("SELECT * FROM TEST WHERE VAL_OBJ > 0")).getAll();
+ }
+
+ /** */
+ private static String explainQuery(IgniteCache<?, ?> cache, String qry) {
+ return cache
+ .query(new SqlFieldsQuery(qry))
+ .getAll()
+ .get(0)
+ .get(0)
+ .toString()
+ .toLowerCase();
+ }
+
+ /**
+ * IgniteH2Indexing that captures index rebuild operations.
+ */
+ public static class CaptureRebuildGridQueryIndexing extends IgniteH2Indexing {
+ /**
+ * Whether index rebuild happened.
+ */
+ private boolean rebuiltIndexes;
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx) {
+ IgniteInternalFuture<?> future = super.rebuildIndexesFromHash(cctx);
+ rebuiltIndexes = future != null;
+ return future;
+ }
+
+ /**
+ * Get index rebuild flag.
+ *
+ * @return Whether index rebuild happened.
+ */
+ public boolean didRebuildIndexes() {
+ return rebuiltIndexes;
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index f4a0ac9..d018457 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.encryption.CacheGroupReencryptionTest;
import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingAndGroupPutGetPersistenceSelfTest;
import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest;
import org.apache.ignite.internal.processors.cache.index.ClientReconnectWithSqlTableConfiguredTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsIndexingDefragmentationTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgniteTcBotInitNewPageTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IndexingMultithreadedLoadContinuousRestartTest;
import org.apache.ignite.internal.processors.cache.persistence.db.LongDestroyDurableBackgroundTaskTest;
@@ -58,7 +59,8 @@ import org.junit.runners.Suite;
IgniteClusterSnapshotWithIndexesTest.class,
ClientReconnectWithSqlTableConfiguredTest.class,
MultipleParallelCacheDeleteDeadlockTest.class,
- CacheGroupReencryptionTest.class
+ CacheGroupReencryptionTest.class,
+ IgnitePdsIndexingDefragmentationTest.class
})
public class IgnitePdsWithIndexingTestSuite {
}